from django.db import models from datetime import datetime, timedelta from .functions import get_level_name from django.db.models import F # Create your models here. class ServiceTimeSlot(models.Model): ID = models.AutoField(primary_key=True) date = models.DateField(null=True, blank=True) start_time = models.TimeField(null=True, blank=True) end_time = models.TimeField(null=True, blank=True) address = models.CharField(max_length=256, null=True, blank=True) service_address = models.CharField(max_length=256, null=True, blank=True) service_content = models.CharField(max_length=256, null=True, blank=True) people_count = models.IntegerField(default=2) booked_count = models.IntegerField(default=0) # 目前已经注册人数,因为以前未统计,先收集一段时间,等到数据稳定后,在去除 @staticmethod def search_by_date(service_date): return ServiceTimeSlot.objects.filter(date=service_date) @staticmethod def search_future_timeslots(service_date): return ServiceTimeSlot.objects.filter(date__gt=service_date) @staticmethod def search_by_date_and_address(service_date, address): # if address in [None, 'false', 'all']: if address in [None, 'false']: return ServiceTimeSlot.objects.filter(date=service_date) else: return ServiceTimeSlot.objects.filter(date=service_date, address=address) @staticmethod def get_by_id(ts_id): return ServiceTimeSlot.objects.get(ID=ts_id) @staticmethod def search_by_year_month(year, month): return ServiceTimeSlot.objects.filter(date__year=year, date__month=month) @staticmethod def search_by_year_month_address(year, month, address): if address in ['all', 'false']: return ServiceTimeSlot.objects.filter(date__year=year, date__month=month).order_by('date') else: return ServiceTimeSlot.objects.filter(date__year=year, date__month=month, address=address).order_by('date') # 搜索目前可以报名的服务,返回结束日期小于当前日期的地址列表 @staticmethod def available_address(): return ServiceTimeSlot.objects.filter(date__gte=datetime.today().date()).values('address').distinct() class Volunteer(models.Model): ID = models.AutoField(primary_key=True) name = models.CharField(max_length=128) # 姓名 gender = models.CharField(max_length=8) # 性别 男,女,保密 mobile = models.BigIntegerField(unique=True) # 手机号码 social_id = models.CharField(max_length=32) # 身份证号码 political_outlook = models.CharField(max_length=32) # 政治面貌 organization = models.CharField(max_length=256) # 单位或学校 district_street = models.CharField(max_length=256) # 区或街道 status = models.CharField(max_length=128) # 状态 active,pending wechat_id = models.CharField(null=True, max_length=128) # 微信openid total_credit = models.IntegerField(default=0) current_credit = models.IntegerField(default=0) @staticmethod def volunteer_list(): return Volunteer.objects.all(); @staticmethod def get_by_mobile(mobile_number): try: return Volunteer.objects.get(mobile=mobile_number) except Exception as e: print(e) return None @staticmethod def get_by_mobile_social_id(mobile_number, social_id): try: volunteer = Volunteer.objects.get(mobile=mobile_number, social_id=social_id) return volunteer except Exception as e: print(e) return None @staticmethod def get_by_id(v_id): try: volunteer = Volunteer.objects.get(ID=v_id) volunteer.level_name = get_level_name(volunteer.total_credit) return volunteer except Exception as e: print(e) return None @staticmethod def get_by_wechat_id(wechat_id): try: volunteer = Volunteer.objects.get(wechat_id=wechat_id) volunteer.level_name = get_level_name(volunteer.total_credit) return volunteer except Exception as e: print(e) return None @staticmethod def pending_list(): return Volunteer.objects.filter(status='pending') @staticmethod def approve_volunteer(v_id): Volunteer.objects.filter(ID=v_id, status='pending').update(status='active') @staticmethod def reject(v_id): Volunteer.objects.filter(ID=v_id, status='pending').delete() class ServiceBooking(models.Model): ID = models.AutoField(primary_key=True) timeslot_id = models.ForeignKey(ServiceTimeSlot, on_delete=models.CASCADE, blank=True, null=True, related_name='service_booking_to_timeslot') volunteer_id = models.ForeignKey(Volunteer, on_delete=models.CASCADE, blank=True, null=True, related_name='service_booking_to_volunteer') register_time = models.DateTimeField(auto_now_add=True, null=True) # 记录创建时间 counted_in_credit = models.BooleanField(default=False) # 是否已经在credit中统计过 @staticmethod def get_by_ID(booking_id): try: service_booking = ServiceBooking.objects.get(ID=booking_id) return service_booking except Exception as e: print(e) return None @staticmethod def count_credit_by_yesterday(): # 统计今天以前,并且没有被统计时长的活动 return ServiceBooking.objects.select_related('timeslot_id') \ .filter(counted_in_credit=False, timeslot_id__date__lt=datetime.now().date()).order_by('timeslot_id__date') @staticmethod def search_by_volunteer(v_id): return ServiceBooking.objects.select_related('timeslot_id').filter(volunteer_id_id=v_id, timeslot_id__date__gte=datetime.now().date()).order_by( 'timeslot_id__date') # 根据给定的time_slot查询预定状态 @staticmethod def search_by_service_timeslot_list(service_list): if type(service_list) == ServiceTimeSlot: return ServiceBooking.objects.filter(timeslot_id=service_list) elif type(service_list) == models.query.QuerySet: return ServiceBooking.objects.filter(timeslot_id__in=service_list) @staticmethod def search_by_service_timeslot_list_with_volunteer_info(service_list): return ServiceBooking.objects.filter(timeslot_id__in=service_list).select_related('volunteer_id') @staticmethod def search_by_service_timeslot(service): return ServiceBooking.objects.filter(timeslot_id=service) @staticmethod def check_exist_booking(ts_id, v_id): if len(ServiceBooking.objects.filter(timeslot_id=ts_id, volunteer_id=v_id)) > 0: return True else: return False @staticmethod def add(service_booking): if ServiceBooking.check_exist_booking(service_booking.timeslot_id, service_booking.volunteer_id): return False else: service_booking.save() return True @staticmethod def delete(timeslot, volunteer): ServiceBooking.objects.filter(timeslot_id_id=timeslot, volunteer_id_id=volunteer).delete() @staticmethod def get_monthly_report(date): input_date = datetime.strptime(date, '%Y-%m-%d') first_date_of_month = datetime(year=input_date.year, month=input_date.month, day=1) month = input_date.month if month == 12: month = 1 else: month += 1 first_date_of_next_month = datetime(year=input_date.year, month=month, day=1) monthly_report = ServiceBooking.objects.select_related('timeslot_id', 'volunteer_id') \ .filter(timeslot_id__date__gte=first_date_of_month, timeslot_id__date__lt=first_date_of_next_month) \ .order_by('timeslot_id__date', 'timeslot_id__start_time') \ .values('timeslot_id__date', 'timeslot_id__start_time', 'timeslot_id__end_time', 'timeslot_id__address' , 'volunteer_id__name', 'volunteer_id__gender', 'volunteer_id__mobile', 'volunteer_id__social_id', 'volunteer_id__political_outlook', 'volunteer_id__organization', 'volunteer_id__district_street') return monthly_report # 寻找是否用时间上重叠的booking @staticmethod def has_conflict_booking(volunteer: Volunteer, service_time_slot: ServiceTimeSlot) -> bool: existing_booking = ServiceBooking.objects.select_related('timeslot_id', 'volunteer_id') \ .filter(timeslot_id__date=service_time_slot.date, timeslot_id__end_time__gt=service_time_slot.start_time, timeslot_id__start_time__lt=service_time_slot.end_time, volunteer_id__ID=volunteer.ID) if len(existing_booking) > 0: return True else: return False # 获取需要进行提醒的Service Booking List # 参数day表示提前n天,默认未1,表示或许下一天的所有service booking @staticmethod def service_booking_reminder_list(day: int = 1): today = datetime.today().date() check_date = today + timedelta(days=day) list = ServiceBooking.objects.select_related('timeslot_id', 'volunteer_id').filter(timeslot_id__date=check_date) return list class Admin(models.Model): id = models.AutoField(primary_key=True) name = models.CharField(max_length=128) # 姓名 password = models.CharField(max_length=128) # 密码 mobile = models.BigIntegerField(unique=True) # 手机号码 register_date = models.DateTimeField(auto_now_add=True) # 创建日期 status = models.CharField(max_length=128) # 状态 active,pending class CreditRecord(models.Model): id = models.AutoField(primary_key=True) volunteer = models.ForeignKey(Volunteer, on_delete=models.CASCADE, blank=True, null=True, related_name='credit_record_to_volunteer') admin = models.ForeignKey(Admin, on_delete=models.SET_NULL, blank=True, null=True, related_name='credit_record_to_admin') change = models.IntegerField(blank=True, null=True, default=0) reason = models.CharField(blank=True, null=True, max_length=256) record_datetime = models.DateTimeField(auto_now_add=True, name='datetime') class Bonus(models.Model): id = models.AutoField(primary_key=True) name = models.CharField(max_length=128) description = models.CharField(max_length=1024) picture = models.CharField(max_length=256, null=True, blank=True) type = models.CharField(max_length=128) total_qty = models.IntegerField() used_qty = models.IntegerField(default=0) cut_off_date = models.DateField() cost = models.DecimalField(max_digits=9999, decimal_places=2) # 兑换价格 admin = models.ForeignKey(to=Admin, on_delete=models.SET_NULL, blank=True, null=True, related_name='bonus_to_admin') @staticmethod def get_by_id(bonus_id): try: bonus = Bonus.objects.get(id=bonus_id) return bonus except Exception as e: print(e) return None @staticmethod def get_bonus_list(admin: Admin = None): if admin is None: bonus_list = Bonus.objects.filter().order_by('-cut_off_date') else: bonus_list = Bonus.objects.filter(admin=admin).order_by('-cut_off_date') return bonus_list @staticmethod def get_bonus(bonus_id): try: bonus = Bonus.objects.get(id=bonus_id) return bonus except Exception as e: print(e) return None @staticmethod def all_available_bonus(): bonus_list = Bonus.objects.filter(cut_off_date__gte=datetime.today().date(), used_qty__lt=F('total_qty')) \ .order_by('cut_off_date') return bonus_list @staticmethod def del_bonus(bonus_id): bonus = Bonus.objects.get(id=bonus_id) if bonus.used_qty > 0: return False else: bonus.delete() return True class BonusRecord(models.Model): id = models.AutoField(primary_key=True) bonus = models.ForeignKey(to=Bonus, on_delete=models.SET_NULL, blank=True, null=True, related_name='bonus_record_to_bonus') volunteer = models.ForeignKey(to=Volunteer, on_delete=models.SET_NULL, blank=True, null=True, related_name='bonus_record_to_volunteer') register_time = models.DateTimeField(auto_now_add=True, null=True) # 记录创建时间 @staticmethod def is_exist(volunteer: Volunteer, bonus: Bonus) -> bool: exist_record = BonusRecord.objects.filter(volunteer=volunteer, bonus=bonus) if len(exist_record) > 0: return True else: return False @staticmethod def search_bonus_with_volunteer(bonus): bonus_list = BonusRecord.objects.select_related('volunteer').filter(bonus=bonus). \ values('volunteer__name', 'volunteer__gender', 'volunteer__mobile', 'volunteer__social_id', 'volunteer__political_outlook', 'volunteer__organization', 'volunteer__district_street') return bonus_list @staticmethod def search_by_volunteer(volunteer: Volunteer): bonus_record_list = BonusRecord.objects.select_related('bonus').filter(volunteer=volunteer) return bonus_record_list # class VolunteerCredit(models.Model): # id = models.AutoField(primary_key=True) # volunteer = models.ForeignKey(to=Volunteer, on_delete=models.SET_NULL, blank=True, null=True, # related_name='volunteer_credit_to_volunteer') # total_credit = models.IntegerField() # current_credit = models.IntegerField() # # def __init__(self, volunteer: Volunteer, credit: int): # self.volunteer = volunteer # self.total_credit = credit # self.current_credit = credit # # @staticmethod # def exist_by_volunteer(volunteer) -> bool: # return VolunteerCredit.objects.filter(volunteer=volunteer).exists() # # @staticmethod # def get_by_volunteer(volunteer): # try: # return VolunteerCredit.objects.get(volunteer=volunteer) # except Exception as e: # print(e) # # 删除数据库中可能存在的错误 # VolunteerCredit.objects.filter(volunteer=volunteer).delete() # return None # class Organizer(models.Model): # id = models.AutoField(primary_key=True) # name = models.CharField(max_length=128) # password = models.CharField(max_length=128)