import string from django.http import HttpResponse,JsonResponse from django.shortcuts import render from django.views.decorators.csrf import csrf_exempt #在视图函数上添加@csrf_exempt装饰器来绕过CSRF保护。不引起csrf报错也可以在前端加上{%csrf_token%} from . import aidog_tools,aidog_request_tools,generalRequest,dbOperate_upload_Uvoice,login as login_func,register,puppyStateUpdate,initialPuppyState #引入请求声纹API方法 from datetime import datetime, timedelta from django.utils import timezone import random import json import time import traceback import os import jwt os.environ.setdefault("DJANGO_SETTINGS_MODULE", "aiDogProject.settings") import django django.setup() from aiDogApp.models import * from django.views.decorators.csrf import csrf_exempt import hashlib from django.core import serializers from django.db import transaction # from aiDogApp.settings import SECRET_KEY # Create your views here. # from rest_framework.decorators import api_view # from rest_framework.response import Response # APPId = "4522d502" # APISecret = "Zjc5MjJhMzBmMDYxNTg4MTNlMTg1MmQw" # APIKey = "f7a9f0ceae3ff7ebfc0c89efeb18810d" # APPId = "527f831f" # APISecret = "YWE5MjI0MzA4NmM3MTNmNTNiMWJkYzE4" # APIKey = "be5c37f3db1569737934240a0e3ad02d" APPId = "ed8eb862" APISecret = "YzEyMDYyMzljMDViNWJlZDdlOWJhYjVi" APIKey = "b4c831160d8933221e95bda817547e99" # file_path = './media/讯飞开放平台.mp3'#读取数据库路径下音源@@@@@@@@@@@@ file_path = './audio_cache/voice_JTYNZP9O_1729843681.mp3'#读取数据库路径下音源 res_pub={ "status":"", "date":"2024-5-6", "msg":"" } SECRET_KEY='#tdfnrcn1s610h4*csa2-p+=lfqz-ol+=uo$+n2sa' # # 生成access_token def generate_access_token(user_id): expiration_time= timezone.now() + timedelta(days=1) # 1天有效 payload = { 'user_id': user_id, 'exp': expiration_time } return jwt.encode(payload, SECRET_KEY, algorithm='HS256') # 生成login_token def generate_login_token(user_id): expiration_time = timezone.now() + timedelta(days=7) # 7天有效 payload = { 'user_id': user_id, 'exp': expiration_time } return jwt.encode(payload, SECRET_KEY, algorithm='HS256') # 统一的错误响应处理 def handle_error(error_message): # 打印错误堆栈信息到日志或控制台 traceback.print_exc() return JsonResponse({"status": "error", "message": error_message}, status=500) # 验证 Token 的函数 # def validate_token(token): # try: # decoded_token = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) # return decoded_token["uid"] # except jwt.ExpiredSignatureError: # raise ValueError("Token has expired.") # except jwt.InvalidTokenError: # raise ValueError("Invalid token.") #校验声纹接口:客户端上传用户id、wav格式文件流,转换文件流为MP3缓存在本地固定路径下 # 提交用户id到数据库遍历全部声纹id # 每个声纹id+用户id+mp3路径上传到科大校验接口,筛选出得分最高的声纹id,并返回其口令类型,删除此mp3文件 # 2024/12/02 无token # author Feng @csrf_exempt def check_Uvoice(request): user_uid=request.POST.get("user_id",'')# 用户ID file_stream = request.FILES.get("file_stream") # Get the uploaded file best_res = None # 存储符合条件的最大 score 结果 max_score = 0.3 # 初始为0.3, 只记录比0.3大的score # file_path = './audio_cache/voice_JTYNZP9O_1729843681.mp3' # 当前时间 date_now=datetime.now() # 校验上传的文件流 if not file_stream: return JsonResponse({ "status": "Error", "message": "File not uploaded.", "date": datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) # 确保用户id不为空 if not user_uid : return JsonResponse({ "status": "Error", "date": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "message": "Please ensure all required fields are provided!", "end_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) # 定义本地缓存路径 # TODO----------迁移到登录/注册逻辑内 确保缓存目录只在应用启动时创建 cache_directory = "audio_cache" os.makedirs(cache_directory, exist_ok=True) # 创建一个唯一的文件名 timestamp = int(time.time()) mp3_path = os.path.join(cache_directory, f"voice_check_{user_uid}_{timestamp}.mp3") # 将上传的文件流转换为 MP3 并保存 if not dbOperate_upload_Uvoice.process_audio_file(file_stream, mp3_path): return JsonResponse({ "status": "Error", "message": "Failed to process the audio file.", "date": datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) voices = VoiceFeatureInfo.objects.filter(uid=user_uid) for voice in voices: message_response = generalRequest.req_url( api_name='searchScoreFea', APPId=APPId, APIKey=APIKey, APISecret=APISecret, file_path=mp3_path, featureId=voice.voice_feature_id, groupId=user_uid ) if message_response.get('code') == 0: data_dict = json.loads(message_response.get('msg')) score = data_dict.get('score') # print(score) # 更新最大 score 和对应的结果 if score and score > max_score: max_score = score best_res = { "status": "success", "date": date_now.strftime('%Y-%m-%d %H:%M:%S'), "message": message_response, "score": score, "command_type": voice.command_type, "end_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S') } # 清理临时 mp3 文件 os.remove(mp3_path) # 检查是否找到符合条件的结果 if best_res: return JsonResponse(best_res) else: return JsonResponse({"message": "请重新发出口令."}) #上传声纹接口:客户端上传用户id、command_type、文件流 # 2024/12/02 无token # author Feng @csrf_exempt def upload_Uvoice(request): # if request.method != "POST": # return JsonResponse({ # "status": "Error", # "message": "Only POST method is allowed.", # "date": datetime.strftime(datetime.today(), '%Y-%m-%d %H:%M:%S') # }) user_uid = request.POST.get("user", '') # 用户ID command_type = request.POST.get("command_type", '') file_stream = request.FILES.get("file_stream") # Get the uploaded file feature_id = str(time.time())[:10] # 随机 voice feature ID res = None # user_uid = 'iFLYTEK_examples_groupId' # command_type = 3 if not file_stream: return JsonResponse({ "status": "Error", "message": "File not uploaded.", "date": datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) # 确保用户id和声纹id不为空 if not user_uid or not command_type: return JsonResponse({ "status": "Error", "date": datetime.strftime(datetime.today(), '%Y-%m-%d %H:%M:%S'), "message": "Please ensure all required fields are provided!", "end_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) try: # 尝试将 command_type 转换为整数 # 使用INT 类型 查询和索引也会更高效 command_type = int(command_type) except ValueError: # 如果转换失败,返回错误信息 return JsonResponse({ "status": "Error", "message": "Invalid value for command_type. It must be an integer.", "date": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "end_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) # 成功的情况下处理请求 res = { "status": "Success", "date": datetime.strftime(datetime.today(), '%Y-%m-%d %H:%M:%S'), "message": dbOperate_upload_Uvoice.process_voice_recording(user_uid, feature_id, command_type,file_stream), "end_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S') } return JsonResponse(res) #上传声纹接口:客户端上传用户id、command_type、文件流 # 2024/12/02 带token # author Feng @csrf_exempt def upload_Uvoice_token(request): try: # 验证请求方法 if request.method != "POST": return JsonResponse({ "status": "Error", "message": "Only POST method is allowed.", "date": datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) # 获取 Token 并验证 # token = request.META.get("HTTP_AUTHORIZATION") # 从请求头获取 Token # if not token: # return JsonResponse({"status": "Error", "message": "Token is missing."}, status=401) # uid = validate_token(token) uid="11111" # 获取请求参数 command_type = request.POST.get("command_type", '') file_stream = request.FILES.get("file_stream") # 上传的文件 feature_id = str(time.time())[:10] # 生成 voice feature ID # 验证必要参数 if not file_stream or not command_type: return JsonResponse({ "status": "Error", "message": "Missing required fields: file_stream or command_type.", "date": datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) # 处理上传 result = dbOperate_upload_Uvoice.process_voice_recording(uid, feature_id, command_type, file_stream) res = { "status": "Success", "date": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "message": result, } return JsonResponse(res) except ValueError as ve: return JsonResponse({"status": "Error", "message": str(ve)}, status=401) except Exception as e: import traceback traceback.print_exc() return JsonResponse({"status": "Error", "message": str(e)}, status=500) # @csrf_exempt def toLogin(request): return render(request, "login.html") #刷新请求令牌access_token:客户端上传用户uid、login_token, # 2025/1/25 # author Feng TODO------------不管accesstoken过没过期都给出一个新的返回 # 如果login_token有效,返回新的access_token @csrf_exempt def get_access_token(request): try: # 获取参数 # 获取请求参数 params = aidog_request_tools.handle_post_request_parameters(request, ["user_id","access_token","login_token"]) user_id,access_token,login_token = params["user_id"],params["access_token"],params["login_token"] if not user_id or not access_token or not login_token: raise ValueError("Missing required parameters") # 验证旧的login_token是否有效 try: decoded_token = jwt.decode(login_token, SECRET_KEY, algorithms=['HS256']) # decoded_expired_token = jwt.decode(access_token, SECRET_KEY, algorithms=['HS256']) user_id = decoded_token['user_id'] # if decoded_expired_token['user_id']!=user_id: # return JsonResponse({ # "status": "error", # "code": 400, # "message": "Wrong match, please check params" # }) # 取出数据库的uid: try: user = User.objects.get(uid=user_id) if user_id!=user.uid: return JsonResponse({ "status": "error", "message": "user_id does not match the user associated with the login_token" }) except User.DoesNotExist: return {"status": "error", "message": f"No user found with uid={user_id}"} except jwt.ExpiredSignatureError: # 旧token已过期,返回失败信息 return JsonResponse({ "status": "error", "code": 400, "message": "Old login_token expired, please login again" }) # 生成新的access_token # 存储token # 生成新access_token new_access_token = generate_access_token(user_id) try: user_login_token =User.objects.get(uid=user_id) user_login_token.access_token = new_access_token user_login_token.save() except Exception as e: print(f"Error: {e}") return JsonResponse({"status": "error", "message": str(e)}, status=500) return JsonResponse({ "status": "success", "code": 200, "message": "New access_token generated successfully", "access_token": new_access_token, }) except Exception as e: import traceback traceback.print_exc() return JsonResponse({"status": "error", "message": str(e)}, status=500) #Login UUID登录临时账户 POST方式 # 使用UUID登录 请求参数:login_token、uuid # 返回格式:result、message、login_token # 2025-2-13 author Feng @csrf_exempt def login_quick_start(request): try: # 获取请求参数 params = aidog_request_tools.handle_post_request_parameters(request, [ "UUID"]) UUID =params["UUID"] if not UUID: raise ValueError("Invalid input parameters: UUID is empty") try: user = User.objects.get(uuid=UUID) user_id = user.uid user_name=user.mobile except User.DoesNotExist: # 创建一条用户数据 # 请求注册接口 isRegUser=0 mobile_number=''.join(random.choices("0123456789", k=11)) user_name = '用户'+str(mobile_number) # 设置临时用户的用户名 random_password=''.join(random.choice(string.ascii_letters) for _ in range(6)) # 根据UUID、手机号和密码注册并返回用户信息 register_user = register.register_user(mobile_number, random_password,user_name,UUID,isRegUser) if register_user['code'] != 200: raise ValueError(register_user['msg']) user_id=register_user['msg']# 获取用户的uid # 如果注册成功,返回uid,uid不为空则创建声纹库 # 1.创建声纹特征库 createGroup message_response = generalRequest.req_url(api_name='createGroup', APPId=APPId, APIKey=APIKey, APISecret=APISecret, file_path=file_path, featureId='', groupId=str(register_user['msg'])) if message_response.get('code') == 0: # 声纹库创建成功,返回给前端注册结果 res = { "status": "success", "message": message_response, "user_id": register_user['msg'], "end_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S') } else: res = { "status": message_response['status'], "message": message_response, "end_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S') } # 存储token # 生成login_token login_token = generate_login_token(user_id) # 生成access_token access_token = generate_access_token(user_id) try: user_login_token =User.objects.get(uid=user_id) user_login_token.login_token=login_token user_login_token.access_token = access_token user_login_token.save() except Exception as e: print(f"Error: {e}") return JsonResponse({"status": "error", "message": str(e)}, status=500) res = { "status": "success", "message": "Login successful!", "login_token": login_token, "user_id": user_id, "access_token": access_token, "user_name":user_name, } return JsonResponse(res) except Exception as e: import traceback traceback.print_exc() return JsonResponse({"status": "error", "message": str(e)}, status=500) #检查并发放每日首次登录奖励 # 2025-5-28 author Feng def check_and_give_daily_login_reward(user_id): try: today = timezone.now().date() # 获取用户对象 user = User.objects.get(uid=user_id) # 检查今天是否已经登录过 if not hasattr(user, 'last_login_date') or user.last_login_date.date() != today: # 今天首次登录,发放100金币 if hasattr(user, 'coin'): user.coin += 100 else: # 更新coin金币 user.coin = getattr(user, 'coin', 0) + 100 # 更新最后登录日期 user.last_login_date = today user.save() return True, 100 # 返回是否发放奖励和奖励金额 return False, 0 # 今天已经登录过,不发放奖励 except Exception as e: print(f"发放每日登录奖励时出错: {e}") return False, 0 #Login Token登录 POST方式 # 使用logintoken登录 请求参数:login_token、uuid # 返回格式:result、message、user_id、login_token、access_token # 2025-2-9 author Feng @csrf_exempt def login_by_token(request): try: # 获取请求参数 params = aidog_request_tools.handle_post_request_parameters(request, ["login_token", "UUID"]) login_token, UUID = params["login_token"], params["UUID"] if not login_token : raise ValueError("Invalid input parameters: login_token is empty") # 验证旧的login_token是否有效 try: decoded_token = jwt.decode(login_token, SECRET_KEY, algorithms=['HS256']) user_id = decoded_token['user_id'] print(user_id) # 判断数据库uid是否有效 except jwt.ExpiredSignatureError: # 旧login_token已过期,返回失败信息 return JsonResponse({ "status": "error", "code": 400, "message": "login_token expired, please login again" }) # 如果login_token有效,生成新的 login_Token,并返回user_id,login_Token,access_token # 取出数据库的uid: try: user = User.objects.get(uid=user_id) if user.isRegUser == 0: # 检查uuid是否匹配 if user.uuid != UUID: return JsonResponse({ "status": "error", "message": "UUID does not match the user associated with the login_token" }) except User.DoesNotExist: return {"status": "error", "message": f"No user found with uid={user_id}"} # 检查并发放每日首次登录奖励 reward_given, reward_amount = check_and_give_daily_login_reward(user_id) # 存储token # 生成新login_token login_token = generate_login_token(user_id) # 生成新access_token access_token = generate_access_token(user_id) try: user_login_token =User.objects.get(uid=user_id) user_login_token.login_token=login_token user_login_token.access_token = access_token user_login_token.save() except Exception as e: print(f"Error: {e}") return JsonResponse({"status": "error", "message": str(e)}, status=500) res = { "status": "success", "message": "Login successful!", "login_token": login_token, "user_id": user_id, "access_token": access_token, # "daily_reward": { # "given": reward_given, # "amount": reward_amount # } } return JsonResponse(res) except Exception as e: import traceback traceback.print_exc() return JsonResponse({"status": "error", "message": str(e)}, status=500) # return JsonResponse(res) @csrf_exempt def login(request): # mobile_number = '1234567890' # password = 'ypwd121' try: # 获取请求参数 params = aidog_request_tools.handle_post_request_has_empty_parameters(request, ["mobile_number", "password", "email", "client_language"]) mobile_number, password,email, client_language = params[ "mobile_number"], params["password"],params["email"], params[ "client_language"] # mobile_number = '445445455' # password = 'tetst121' # res = None date_now = datetime.strftime(datetime.today(), '%Y-%m-%d %H:%M:%S') # 判断参数输入是否为空 if not client_language: raise ValueError("Invalid input parameters: client_language is empty") # 判断参数输入是否为空 if client_language == "zh-cn": # 使用mobile_number登录 if not mobile_number or not password : raise ValueError("Invalid input parameters: mobile_number or password or username is empty") else: # 使用Email登录 if not email or not password: raise ValueError("Invalid input parameters: Email or password or username is empty") # 请求登录接口 message_response = login_func.login_user(mobile_number,email, password,client_language) if message_response is None or message_response['code'] != 200 : raise ValueError('login failed!') # 如果登录成功,生成 login_Token,返回uid,login_Token,access_token uid = message_response['user_id'] # 假设返回数据中包含 uid # # 生成新的access_token # 检查并发放每日首次登录奖励 reward_given, reward_amount = check_and_give_daily_login_reward(uid) # 存储token # 生成新login_token login_token = generate_login_token(uid) # 生成新access_token access_token = generate_access_token(uid) try: user_login_token =User.objects.get(uid=uid) user_login_token.login_token=login_token user_login_token.access_token = access_token user_login_token.save() except Exception as e: print(f"Error: {e}") return JsonResponse({"status": "error", "message": str(e)}, status=500) res = { "status": "success", "message": "Login successful!", "login_token": login_token, "user_id": uid, "access_token": access_token, # "daily_reward": { # "given": reward_given, # "amount": reward_amount # } # "dog_list": message_response['dog_list'], # "response_end_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S') } print(res) return JsonResponse(res) except Exception as e: import traceback traceback.print_exc() return JsonResponse({"status": "error", "message": str(e)}, status=500) @csrf_exempt def toRegister(request): try: # 获取请求参数 params = aidog_request_tools.handle_post_request_has_empty_parameters(request, ["UUID","mobile_number","email","password","access_token","user_name","user_id","client_language" ]) UUID,mobile_number,email,password, access_token, user_name,user_id,client_language= params["UUID"],params["mobile_number"],params["email"],params["password"], params["access_token"],params["user_name"], params["user_id"], params["client_language"] # mobile_number = '445445455' # password = 'tetst121' print(f"Updated-----params------- UUID {UUID}, mobile_number: {mobile_number}, access_token: {access_token}, user_id: {user_id}") res = None date_now = datetime.strftime(datetime.today(), '%Y-%m-%d %H:%M:%S') isRegUser = 1 # 判断参数输入是否为空 if not client_language: raise ValueError("Invalid input parameters: client_language is empty") # 根据系统语言判断输入参数 if client_language == "zh-cn": # 使用mobile_number登录 if not mobile_number or not password or not user_name: raise ValueError("Invalid input parameters: mobile_number or password or username is empty") else: # 使用Email登录 if not email or not password or not user_name: raise ValueError("Invalid input parameters: Email or password or username is empty") if not user_id: #全新用户首次注册 # 请求注册接口 if client_language == "en": # 使用Email注册 register_user = register.register_user_email(email, password, user_name, '', isRegUser) # 正式注册后清空uuid # user.email = email else: # 使用mobile_number注册 register_user = register.register_user(mobile_number, password, user_name, '', isRegUser) if register_user['code'] != 200: raise ValueError(register_user['msg']) # register_user = register.register_user(mobile_number, password,user_name,'', isRegUser) # if register_user['code'] != 200: # raise ValueError(register_user['msg']) # 如果注册成功,返回uid,uid不为空则创建声纹库 # 1.创建声纹特征库 createGroup message_response = generalRequest.req_url(api_name='createGroup', APPId=APPId, APIKey=APIKey, APISecret=APISecret, file_path=file_path, featureId='', groupId=str(register_user['msg'])) if message_response.get('code') == 0: # 声纹库创建成功,返回给前端注册结果 res = { "status": "success", "message": message_response, "user_id": register_user['msg'], "end_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S') } else: res = { "status": message_response['status'], "message": message_response, "end_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S') } else: #临时用户注册为正式用户 # 验证access_token是否有效 try: decoded_token = jwt.decode(access_token, SECRET_KEY, algorithms=['HS256']) uid = decoded_token['user_id'] if uid!=user_id: # access_token匹配,返回失败信息 return JsonResponse({ "status": "error", "code": 400, "message": "AccessToken doesn't match!" }) # 判断数据库uid是否有效 except jwt.ExpiredSignatureError: # 旧access_token已过期,返回失败信息 return JsonResponse({ "status": "error", "code": 400, "message": "AccessToken expired, please refresh!" }) # 取出数据库的uid: try: user = User.objects.get(uid=user_id) if user.isRegUser==1: raise ValueError("You're already a regular user") # 检查email是否存在 if email and User.objects.filter(email=email).exists(): return JsonResponse({ "status": "error", "code": 400, "message": "duplicated email" }) # 检查手机号是否存在 if mobile_number and User.objects.filter(mobile=mobile_number).exists(): return JsonResponse({ "status": "error", "code": 400, "message": "duplicated mobile phone" }) # 哈希加密密码 hashed_password = hashlib.sha256(password.encode()).hexdigest() user.mobile = mobile_number user.user_name = user_name user.email = email user.password=hashed_password user.uuid=''# 正式注册后清空uuid user.isRegUser=isRegUser # 更新用户状态为正式 user.registration_time=datetime.now().strftime('%Y-%m-%d %H:%M:%S') user.save() # 新增:插入默认道具数据到 道具 表 # default_items = [ # {'item_id': 'food_00001'}, # {'item_id': 'food_00002'}, # {'item_id': 'food_00003'}, # {'item_id': 'water_00001'}, # {'item_id': 'water_00002'}, # {'item_id': 'toy_00001'}, # {'item_id': 'other_00001'}, # {'item_id': 'other_00001'} # ] current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # item_list = [] # for item in default_items: # new_item = UserInventory( # owner_id=user_id, # item_id=item['item_id'], # # update_time=current_time # ) # item_list.append(new_item) # # # 批量插入道具记录 # UserInventory.objects.bulk_create(item_list) res = { "status": "success", "message": "Register success!", "user_id": user_id, "user_name": user_name, "mobile_number": mobile_number, "email": email, } except User.DoesNotExist: return {"status": "error", "message": f"No user found with uid={user_id}"} return JsonResponse(res) except Exception as e: import traceback traceback.print_exc() return JsonResponse({"status": "error", "message": str(e)}, status=500) #启动更新狗状态和用户金币数量:客户端上传用户id、dog_id # 2024/12/31 # author Feng # type==1:启动更新 type==2:投喂更新 互动更新 type==3 :口令更新 @csrf_exempt def toUpdatePuppyStatus(request): try: # 获取请求参数 params = aidog_request_tools.handle_get_request_parameters(request, ["user_id", "dog_id"]) user_id, dog_id = params["user_id"], params["dog_id"] # user_id='JTYNZP9O' # dog_id='043341045' # 获取当前时间,并确保它是 aware datetime time_now = timezone.make_aware(datetime.now()) # 启动更新狗的状态 update_response = puppyStateUpdate.update_dog_state(user_id, dog_id, time_now) # 检查更新结果 # if update_response['code'] != 200: # raise ValueError(update_response['msg']) # 成功返回结果 res = { "status": "success", "date": time_now.strftime('%Y-%m-%d %H:%M:%S'), "dog_status": update_response['dog_status'], "dog_body_attrs":update_response['dog_body_attrs'], "dog_relationship_personality":update_response['dog_relationship_personality'], "dog_training":update_response['dog_training'], } return JsonResponse(res) except Exception as e: return handle_error(str(e)) # 初始化狗状态:客户端上传用户id、access_token、 dog_name、sex、breed、skin # 2024/12/31 # author Feng # 初始化狗属性信息 # 2025/6/13 新增:领取第二只狗扣费,首次领取免费 @csrf_exempt def initialPuppy(request): try: # 获取请求参数uid,dog_name,sex,breed,birthday,skin,got_time params = aidog_request_tools.handle_post_request_parameters(request, ["access_token", "user_id", "dog_name", "sex", "breed", "skin"]) access_token, uid, dog_name, sex, breed, skin = params["access_token"], params["user_id"], params["dog_name"], \ params["sex"], \ params["breed"], params["skin"] # 获取当前时间 # time_now = datetime.now() # 获取当前时间,并确保它是 aware datetime time_now = timezone.make_aware(datetime.now()) birthday = datetime.now() # 获取服务器当前时间作为出生日期 if not access_token or not uid or not dog_name or not sex or not breed or not skin: raise ValueError("Invalid input parameters: parameter is empty") # 验证旧的access_token是否有效 # 验证访问令牌 token_validation = aidog_tools.validate_access_token(access_token) if isinstance(token_validation, JsonResponse): return token_validation user_id = token_validation['user_id'] # 新增功能:检查用户已有狗的数量 # 查询用户已有的狗的数量 existing_dogs_count = Doginfo.objects.filter(owner_id=user_id).count() # 如果是第二条狗,需要检查金币并扣费 if existing_dogs_count >= 1: # 查询用户信息 try: user = User.objects.get(uid=user_id) except User.DoesNotExist: return JsonResponse({ "status": "error", "code": 404, "message": "User not found" }) # 第二条狗需要1000金币 dog_price = 1000 # 检查用户金币是否足够 if user.coin < dog_price: return JsonResponse({ "status": "error", "code": 402, "message": f"Insufficient coins. You need {dog_price} coins but only have {user.coin} coins." }) # 使用数据库事务确保原子性操作 with transaction.atomic(): # 调用函数初始化狗狗 initialPuppy_response = initialPuppyState.getAPuppy(uid, dog_name, sex, breed, birthday, skin, time_now) # 检查初始化结果 if initialPuppy_response.get('code') != 200: raise ValueError(initialPuppy_response.get('message', 'Failed to initialize puppy')) # 扣除用户金币 user.coin -= dog_price user.save() # 记录金币变化(可选,如果有金币变化记录表) # CoinTransaction.objects.create( # user_id=uid, # amount=-dog_price, # transaction_type='dog_purchase', # description=f'Purchase second dog: {dog_name}' # ) else: # 用户名下没有狗时,初始化一条狗免费 initialPuppy_response = initialPuppyState.getAPuppy(uid, dog_name, sex, breed, birthday, skin, time_now) # 检查初始化结果 if initialPuppy_response.get('code') != 200: raise ValueError(initialPuppy_response.get('message', 'Failed to initialize puppy')) # 成功返回结果 res = { "status": "success", "date": time_now.strftime('%Y-%m-%d %H:%M:%S'), "message_all": initialPuppy_response, "message": initialPuppy_response['message'], "is_first_dog": existing_dogs_count == 0, "coins_deducted": 1000 if existing_dogs_count >= 1 else 0, "end_time": time_now.strftime('%Y-%m-%d %H:%M:%S') } return JsonResponse(res) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) # 初始化狗状态:客户端上传用户id、access_token、 dog_name、sex、breed、skin # 2024/12/31 # author Feng # 初始化狗属性信息 # @csrf_exempt # def initialPuppy(request): # try: # # 获取请求参数uid,dog_name,sex,breed,birthday,skin,got_time # params = aidog_request_tools.handle_post_request_parameters(request, ["access_token","user_id", "dog_name", "sex", "breed","skin"]) # access_token,uid, dog_name, sex,breed,skin = params["access_token"],params["user_id"], params["dog_name"], params["sex"], \ # params["breed"],params["skin"] # # 获取当前时间 # # time_now = datetime.now() # # 获取当前时间,并确保它是 aware datetime # time_now = timezone.make_aware(datetime.now()) # # birthday=datetime.now()#获取服务器当前时间作为出生日期 # if not access_token or not uid or not dog_name or not sex or not breed or not skin: # raise ValueError("Invalid input parameters: parameter is empty") # # # 验证旧的access_token是否有效 # try: # decoded_token = jwt.decode(access_token, SECRET_KEY, algorithms=['HS256']) # user_id = decoded_token['user_id'] # print(user_id) # # 判断数据库uid是否有效 # # except jwt.ExpiredSignatureError: # # 旧access_token已过期,返回失败信息 # return JsonResponse({ # "status": "error", # "code": 400, # "message": "access_token expired, please refresh again" # }) # # # 调用函数初始化狗狗 # initialPuppy_response = initialPuppyState.getAPuppy(uid,dog_name,sex,breed,birthday,skin, time_now) # # # 检查初始化结果 # # if initialPuppy_response['code'] != 200: # # raise ValueError(initialPuppy_response['msg']) # # # 成功返回结果 # res = { # "status": "success", # "date": time_now.strftime('%Y-%m-%d %H:%M:%S'), # "message_all": initialPuppy_response, # "message": initialPuppy_response['message'], # "end_time": time_now.strftime('%Y-%m-%d %H:%M:%S') # } # # return JsonResponse(res) # # # except Exception as e: # return JsonResponse({"status": "error", "message": str(e)}, status=500) # 根据投喂交互更新狗状态:客户端上传用户id、dog_id # 2024/12/31 # author Feng # 投喂更新 互动更新 def interaction(request): try: # 获取请求参数 params = aidog_request_tools.handle_post_request_parameters(request, ["user_id", "dog_id", "interaction_type"]) user_id, dog_id, interaction_type = params["user_id"], params["dog_id"], params["interaction_type"] # 获取当前时间,并确保它是 aware datetime time_now = timezone.make_aware(datetime.now()) # 口令更新 # 更新狗的状态 if interaction_type == "feedInteraction": # 投喂更新 update_response = puppyStateUpdate.voice_gain_state(user_id, dog_id, time_now) elif interaction_type == "touchInteraction": # 抚摸交互更新 update_response = puppyStateUpdate.voice_gain_state(user_id, dog_id, time_now) else: raise ValueError("Invalid type parameter") # 检查更新结果 if update_response['code'] != 200: raise ValueError(update_response['msg']) # 成功返回结果 res = { "status": "success", "date": time_now.strftime('%Y-%m-%d %H:%M:%S'), "message_all": update_response, "message": update_response['message'], "end_time": time_now.strftime('%Y-%m-%d %H:%M:%S') } return JsonResponse(res) except Exception as e: return handle_error(str(e)) # 使用道具接口:客户端上传access_token,用户id、item_id,dog_list,date_time # 2025/03/12 # author Feng # 使用道具后, 所有狗的数据全部更新 @csrf_exempt def use_props(request): try: # 获取请求参数 params = aidog_request_tools.handle_post_request_parameters(request, ["access_token", "user_id", "item_id", "dog_list", "date_time"]) access_token, user_id, item_id, dog_list, interaction_type = params["access_token"], params["user_id"], params[ "item_id"], params["dog_list"], params["date_time"] time_now = timezone.make_aware(datetime.now()) # 处理dog_list参数 - 它可能是字符串形式的列表或JSON字符串 # 验证旧的access_token是否有效 try: decoded_token = jwt.decode(access_token, SECRET_KEY, algorithms=['HS256']) user_id = decoded_token['user_id'] # 取出数据库的uid: try: user = User.objects.get(uid=user_id) except User.DoesNotExist: return {"status": "error", "message": f"No user found with uid={user_id}"} except jwt.ExpiredSignatureError: # 旧access_token已过期,返回失败信息 return JsonResponse({ "status": "error", "code": 400, "message": "access_token expired, please refresh again" }) # 确保dog_list是列表类型 # if not isinstance(dog_list, list): # dog_list = [dog_list] # 处理dog_list参数 - 它可能是字符串形式的列表或JSON字符串 if isinstance(dog_list, str): try: # 尝试解析为JSON dog_list = json.loads(dog_list) except json.JSONDecodeError: try: # 如果不是有效的JSON,尝试使用字面量评估 # 这种方法可以处理Python列表字符串表示 import ast dog_list = ast.literal_eval(dog_list) except (SyntaxError, ValueError): # 如果仍然失败,保持原样 pass # 在Doginfo表中过滤出dog_list中存在的d_id对应的数据 dog_info_list = Doginfo.objects.filter(owner_id=user_id, d_id__in=dog_list) # 检查是否有找到狗 if not dog_info_list.exists(): # 尝试不使用owner_id过滤,仅检查是否存在这些dog_id all_dogs = Doginfo.objects.filter(d_id__in=dog_list) if all_dogs.exists(): # 狗存在,但不属于这个用户 wrong_owner_dogs = [dog.d_id for dog in all_dogs] return JsonResponse({ "status": "error", "message": f"Dogs found but don't belong to user {user_id}: {wrong_owner_dogs}" }) else: # 狗完全不存在 return JsonResponse({ "status": "error", "message": f"No dogs found with the provided IDs: {dog_list}" }) # 获取用户道具库存 try: user_inventory = UserInventory.objects.get(owner_id=user_id, item_id=item_id) # 检查用户是否有足够的道具数量 # if user_inventory.quantity < len(dog_info_list): if user_inventory.quantity < 1: return JsonResponse({ "status": "error", "message": f"Insufficient item quantity. Required: {1}, Available: {user_inventory.quantity}" }) except UserInventory.DoesNotExist: return JsonResponse({ "status": "error", "message": f"User doesn't have this item in inventory: {item_id}" }) # 更新每只狗的状态 # 获取用户所有的狗 dog_all_list = Doginfo.objects.filter(owner_id=user_id) dogs = [] successful_updates = 0 # 提取需要更新的狗的ID列表 dog_info_d_ids = [dog.d_id for dog in dog_info_list] # 遍历所有狗数据 for dog in dog_all_list: dog_id = dog.d_id # 获取狗的信息并转换为字典 dog_serialized = serializers.serialize('json', [dog]) dog_data = json.loads(dog_serialized)[0]['fields'] # 如果当前狗在需要更新的列表中,调用更新接口 if dog_id in dog_info_d_ids: update_response = puppyStateUpdate.props_gain_state(user_id, dog_id, item_id, time_now) if update_response["status"] == "error": # 如果更新某只狗时出错,记录错误但继续处理其他狗 pass else: successful_updates += 1 # 获取狗的相关信息(无论是否更新,都获取最新状态) new_dog_status = aidog_tools.model_to_dict_without_id(DogStatus.objects.filter(d_id=dog_id).first()) new_dog_training = aidog_tools.model_to_dict_without_id(DogTraining.objects.filter(d_id=dog_id).first()) new_dog_body_attributes = aidog_tools.model_to_dict_without_id( DogBodyAttributes.objects.filter(d_id=dog_id).first()) new_dog_personality_relationship = aidog_tools.model_to_dict_without_id( DogPersonalityRelationship.objects.filter(d_id=dog_id).first()) # 合并所有相关信息 dog_info_combined = { **dog_data, **new_dog_status, **new_dog_training, **new_dog_body_attributes, **new_dog_personality_relationship, } # 只保留字段而不是数据库内部的元数据(如 '_state') dog_info_clean = {key: value for key, value in dog_info_combined.items() if not key.startswith('_')} dogs.append(dog_info_clean) # 更新道具数量 (只扣除成功更新的狗的数量) if successful_updates > 0 : # user_inventory.quantity -= successful_updates #---todo==========按狗的数量扣除多个道具 if not item_id.startswith('toy_'): user_inventory.quantity -= 1 #---todo==========无论几只狗每次只扣除一个道具 # if user_inventory.quantity <= 0: # 如果道具数量为0或负数,删除库存记录 # user_inventory.delete() # else: user_inventory.save() # 获取用户所有道具库存 user_inventory_all = UserInventory.objects.filter(owner_id=user_id) props_dict = { "food": {}, "toy": {}, "other": {} } for prop in user_inventory_all: prop_serialized = serializers.serialize('json', [prop]) prop_data = json.loads(prop_serialized)[0]['fields'] # Determine the category based on item_id prefix if prop_data['item_id'].startswith('food_') or prop_data['item_id'].startswith('water_'): category = 'food' elif prop_data['item_id'].startswith('toy_'): category = 'toy' else: category = 'other' # Add to the corresponding category dictionary props_dict[category][prop_data['item_id']] = prop_data['quantity'] # 处理isRegUser的布尔值转换 isRegUser = True if getattr(user, 'isRegUser', 0) == 1 else False # 成功返回结果 res = { "status": "success", "message": f"item:{str(item_id)} is used", "user_info": { "user_name": user.user_name, "coin": user.coin, "isRegUser": isRegUser, "mobile": user.mobile, "email": user.email, "level": user.level, "uuid": user.uuid, "registration_time": user.registration_time }, "status": "success", "dogs":dogs, "props": props_dict, } return JsonResponse(res) except Exception as e: return handle_error(str(e))