import pickle from multiprocessing.dummy import current_process from datetime import datetime from django.db.models.expressions import result from django.http import HttpResponse,JsonResponse from django.utils.text import normalize_newlines from django.views.decorators.csrf import csrf_exempt from datetime import datetime, timedelta from django.core import serializers import time import json import os import jwt from .models import DogTrainingNew os.environ.setdefault("DJANGO_SETTINGS_MODULE", "aiDogProject.settings") import django django.setup() from aiDogApp.models import * from . import aidog_tools,aidog_request_tools,wave_compare,generalRequest,dbOperate_upload_Uvoice # from .wave_compare_algorithm import wave_compare # APPId = "527f831f" # APISecret = "YWE5MjI0MzA4NmM3MTNmNTNiMWJkYzE4" # APIKey = "be5c37f3db1569737934240a0e3ad02d" APPId = "ed8eb862" APISecret = "YzEyMDYyMzljMDViNWJlZDdlOWJhYjVi" APIKey = "b4c831160d8933221e95bda817547e99" SECRET_KEY='#tdfnrcn1s610h4*csa2-p+=lfqz-ol+=uo$+n2sa' #科大讯飞校验声纹接口:客户端上传用户id、wav格式文件流,转换文件流为MP3缓存在本地固定路径下 # 提交用户id到数据库遍历全部声纹id # 每个声纹id+用户id+mp3路径上传到科大校验接口,筛选出得分最高的声纹id,并返回其口令类型,删除此mp3文件 # 2024/12/02 无token # author Feng @csrf_exempt def check_Uvoice(request): user_uid=request.POST.get("user_id",'')# 用户ID file_stream = request.FILES.get("file_stream") # Get the uploaded file best_res = None # 存储符合条件的最大 score 结果 max_score = 0.3 # 初始为0.3, 只记录比0.3大的score # file_path = './audio_cache/voice_JTYNZP9O_1729843681.mp3' # 当前时间 date_now=datetime.now() # 校验上传的文件流 if not file_stream: return JsonResponse({ "status": "error", "message": "File not uploaded.", "date": datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) # 确保用户id不为空 if not user_uid : return JsonResponse({ "status": "error", "date": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "message": "Please ensure all required fields are provided!", "end_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) # 定义本地缓存路径 # TODO----------迁移到登录/注册逻辑内 确保缓存目录只在应用启动时创建 cache_directory = "audio_cache" os.makedirs(cache_directory, exist_ok=True) # 创建一个唯一的文件名 timestamp = int(time.time()) mp3_path = os.path.join(cache_directory, f"voice_check_{user_uid}_{timestamp}.mp3") # 将上传的文件流转换为 MP3 并保存 if not dbOperate_upload_Uvoice.process_audio_file(file_stream, mp3_path): return JsonResponse({ "status": "error", "message": "Failed to process the audio file.", "date": datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) voices = VoiceFeatureInfo.objects.filter(uid=user_uid) for voice in voices: message_response = generalRequest.req_url( api_name='searchScoreFea', APPId=APPId, APIKey=APIKey, APISecret=APISecret, file_path=mp3_path, featureId=voice.voice_feature_id, groupId=user_uid ) if message_response.get('code') == 0: data_dict = json.loads(message_response.get('msg')) score = data_dict.get('score') # print(score) # 更新最大 score 和对应的结果 if score and score > max_score: max_score = score best_res = { "status": "success", "date": date_now.strftime('%Y-%m-%d %H:%M:%S'), "message": message_response, "score": score, "command_type": voice.command_type, "end_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S') } # 清理临时 mp3 文件 os.remove(mp3_path) # 检查是否找到符合条件的结果 if best_res: return JsonResponse(best_res) else: return JsonResponse({"message": "请重新发出口令."}) #上传声纹接口:客户端上传用户id、command_type、文件流 # 2024/12/02 无token # author Feng @csrf_exempt def upload_Uvoice(request): # if request.method != "POST": # return JsonResponse({ # "status": "error", # "message": "Only POST method is allowed.", # "date": datetime.strftime(datetime.today(), '%Y-%m-%d %H:%M:%S') # }) user_uid = request.POST.get("user", '') # 用户ID command_type = request.POST.get("command_type", '') file_stream = request.FILES.get("file_stream") # Get the uploaded file feature_id = str(time.time())[:10] # 随机 voice feature ID res = None # user_uid = 'iFLYTEK_examples_groupId' # command_type = 3 if not file_stream: return JsonResponse({ "status": "error", "message": "File not uploaded.", "date": datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) # 确保用户id和声纹id不为空 if not user_uid or not command_type: return JsonResponse({ "status": "error", "date": datetime.strftime(datetime.today(), '%Y-%m-%d %H:%M:%S'), "message": "Please ensure all required fields are provided!", "end_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) try: # 尝试将 command_type 转换为整数 # 使用INT 类型 查询和索引也会更高效 command_type = int(command_type) except ValueError: # 如果转换失败,返回错误信息 return JsonResponse({ "status": "error", "message": "Invalid value for command_type. It must be an integer.", "date": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "end_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) # 成功的情况下处理请求 res = { "status": "success", "date": datetime.strftime(datetime.today(), '%Y-%m-%d %H:%M:%S'), "message": dbOperate_upload_Uvoice.process_voice_recording(user_uid, feature_id, command_type,file_stream), "end_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S') } return JsonResponse(res) #上传声纹接口:客户端上传用户id、command_type、wav.zip文件流,access_token # 2024/12/02 无token # author Feng # 2025/3/31更新 # 处理用户声纹上传和比对 # 处理流程: # 1. 首次上传:保存为源文件,提取特征并存储 # 2. 二次上传:与源文件进行比对,决定是否通过验证 @csrf_exempt def upload_Uvoice_latest1(request): # 记录请求时间 current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') try: # 获取并验证请求参数 params = _validate_request_params(request) if isinstance(params, JsonResponse): return params access_token = params['access_token'] user_id = params['user_id'] dog_id = params['dog_id'] file_stream = params['file_stream'] voice_type = params['voice_type'] timestamp = params['timestamp'] current_times = params['current_times'] total_times = params['total_times'] feature_id = str(time.time())[:10] # 生成声纹特征ID # 验证访问令牌 token_validation = aidog_tools.validate_access_token(access_token) if isinstance(token_validation, JsonResponse): return token_validation user_id = token_validation['user_id'] # 取出数据库的uid: try: dog_info = Doginfo.objects.get(owner_id=user_id,d_id=dog_id) except Doginfo.DoesNotExist: return {"status": "error", "message": f"No dog found with uid={user_id}"} # 根据上传次数处理请求 if int(current_times) < int(total_times): # 首次上传:处理并保存源文件 return _handle_first_upload(user_id, dog_id,feature_id, voice_type, file_stream, current_time) else: # 二次上传:与源文件比对 return _handle_second_upload(user_id, dog_id,feature_id, voice_type, file_stream, current_time) except Exception as e: # 全局异常处理 import traceback traceback.print_exc() return JsonResponse({ "status": "error", "message": f"Server processing exception: {str(e)}", "date": current_time }) def _handle_first_upload(user_id, dog_id,feature_id, command_type, file_stream, current_time): """处理第一次声音上传,保存文件并提取特征""" try: # 生成唯一的文件路径 output_mp3_path = _get_file_path(user_id, feature_id,True) # 处理声音文件,转换为MP3并存储 process_result = dbOperate_upload_Uvoice.process_voice_recording( user_id, feature_id, command_type, file_stream,output_mp3_path ) print(f"process_voice_recording complete. API response: {process_result}") if process_result['status'] != 'success': return JsonResponse({ "status": "error", "message": process_result['msg']+"audio file failed to be processed", "date": current_time }) # 提取声音特征 mfcc_origin = wave_compare.extract_mfcc(output_mp3_path) character_value_origin = pickle.dumps(mfcc_origin) # 保存声纹信息到数据库 try: # 检查声纹信息是否存在 voice_feature_info, created = VoiceFeatureInfo.objects.get_or_create( uid=user_id, d_id=dog_id, command_type=command_type, defaults={ 'mfcc': character_value_origin, 'voice_path': output_mp3_path, 'voice_feature_id': feature_id, 'voice_group_id': user_id, } ) if not created: # 更新声纹信息 voice_feature_info.voice_path = output_mp3_path voice_feature_info.voice_feature_id = feature_id voice_feature_info.mfcc = character_value_origin voice_feature_info.save() # print(f"voice_feature_info11. API response: {voice_feature_info1.__dict__}") return JsonResponse({ "status": "success", "date": current_time, "message": "in progress", }) except Exception as db_error: return JsonResponse({ "status": "error", "message": f"Database operation error: {str(db_error)}", "date": current_time }) except Exception as e: return JsonResponse({ "status": "error", "message": f"error occurred while processing the first upload: {str(e)}", "date": current_time }) # 封装---更新狗的训练状态 # 参数 dog_id: 狗的ID,command_type: 命令类型,mapping: 命令类型到训练参数的映射 # 2025/6/11 更新狗的训练状态,包含上限检查和优化的业务逻辑 def _update_dog_training(dog_id, command_type): # 定义上限常量 MAX_VOICE_CALL = 100 MAX_VOICE_COMMAND = 100 # 获取或创建训练记录 dog_training, created = DogTrainingNew.objects.get_or_create( d_id=dog_id, defaults={ 'voicecall': 0, 'voicecallenable': 0, 'voicecommand': 0, 'voicecommandenable': 0, 'commandsit': 0, 'commandstand': 0, 'commandbark': 0, 'commandliedown': 0, 'commandshake': 0, 'commandtouch': 0, 'commanddeath': 0, 'commandturnl': 0, 'commandturnr': 0, } ) # 处理 voiceCall 训练逻辑 if command_type == 'voiceCall': current_voice_call = dog_training.voicecall current_voice_call_enable = dog_training.voicecallenable # 首次为全新d_id录制音频 if current_voice_call_enable == 0 and current_voice_call == 0: dog_training.voicecall = min(10, MAX_VOICE_CALL) dog_training.voicecallenable = 1 # voiceCallEnable==1的情况下,每次录音+4 elif current_voice_call_enable == 1: new_value = current_voice_call + 4 dog_training.voicecall = min(new_value, MAX_VOICE_CALL) # 重置了录音数据情况下(voiceCallEnable==0但voiceCall>0) elif current_voice_call_enable == 0 and current_voice_call > 0: new_value = current_voice_call + 4 dog_training.voicecall = min(new_value, MAX_VOICE_CALL) dog_training.voicecallenable = 1 # 重新启用 # 处理各种命令类型的训练逻辑 elif command_type in ['commandSit', 'commandStand', 'commandBark', 'commandLieDown', 'commandShake', 'commandTouch', 'commandDeath', 'commandTurnL', 'commandTurnR']: current_voice_command = dog_training.voicecommand current_voice_command_enable = dog_training.voicecommandenable # 获取对应的命令字段名(转换为小写) command_field_map = { 'commandSit': 'commandsit', 'commandStand': 'commandstand', 'commandBark': 'commandbark', 'commandLieDown': 'commandliedown', 'commandShake': 'commandshake', 'commandTouch': 'commandtouch', 'commandDeath': 'commanddeath', 'commandTurnL': 'commandturnl', 'commandTurnR': 'commandturnr', } command_field = command_field_map.get(command_type) if command_field: # 设置对应命令字段为1 setattr(dog_training, command_field, 1) # 首次为全新d_id录制口令音频 if current_voice_command_enable == 0 and current_voice_command == 0: dog_training.voicecommand = min(10, MAX_VOICE_COMMAND) dog_training.voicecommandenable = 1 # voiceCallEnable==1的情况下,每次录音+4 elif current_voice_command_enable == 1: new_value = current_voice_command + 4 dog_training.voicecall = min(new_value, MAX_VOICE_CALL) # 重置了录音数据情况下(voiceCommandEnable==0但voiceCommand>0) elif current_voice_command_enable == 0 and current_voice_command > 0: new_value = current_voice_command + 4 dog_training.voicecommand = min(new_value, MAX_VOICE_COMMAND) dog_training.voicecommandenable = 1 # 重新启用 # 保存更新 dog_training.save() # 处理第二次声音上传,与源文件进行比对 def _handle_second_upload(user_id, dog_id,feature_id,command_type, file_stream, current_time): # 定义常量 MATCH_THRESHOLD = 0.6 # 声纹匹配阈值 output_mp3_path = None try: # 生成唯一的文件路径 output_mp3_path = _get_file_path(user_id, feature_id) # 处理上传的音频文件 wav_to_mp3_result = dbOperate_upload_Uvoice.process_zip_audio_file(file_stream, output_mp3_path) if not wav_to_mp3_result: return JsonResponse({ "status": "error", "message": "audio file failed to be processed", "date": current_time }) score=0 best_res = False try: # 获取用户原始声纹信息 voice = VoiceFeatureInfo.objects.get(uid=user_id, d_id=dog_id,command_type=command_type) dog_info = Doginfo.objects.filter(owner_id=user_id) second_fail_details = { 'voice_files_deleted': 0, 'iflytek_features_deleted': 0, } # 调用科大讯飞声纹比对接口 message_response = generalRequest.req_url( api_name='searchScoreFea', APPId=APPId, APIKey=APIKey, APISecret=APISecret, file_path=output_mp3_path, featureId=voice.voice_feature_id, groupId=user_id ) # 处理第三方比对结果 if message_response.get('code') == 0: data_dict = json.loads(message_response.get('msg')) score = data_dict.get('score') # 更新最大 score 和对应的结果 if score and score > MATCH_THRESHOLD: best_res = True else: return JsonResponse({ "status": "error", "message": f" error occurred during call KEDA API", "date": current_time }) # print(f"KEDA score. API response: {score}")todo--------存储得分到数据库 # 使用本地比对算法 sound_wave_1 = voice.voice_path sound_wave_2 = output_mp3_path mfcc_1 = wave_compare.extract_mfcc(sound_wave_1) # mfcc_1=voice.mfcc mfcc_2 = wave_compare.extract_mfcc(sound_wave_2) # similarity = wave_compare.compute_similarity(mfcc_2, mfcc_1) if similarity['result'] and similarity['result'] > MATCH_THRESHOLD: best_res = True # 更新狗的训练状态 _update_dog_training(dog_id, command_type) else: #如果没有比对结果,或者比对得分太低,则删除数据库中路径信息,mfcc,feature_id,删除路径信息对应的本地mp3文件;并且调用科大讯飞删除声纹api, # 处理VoiceFeatureInfo表中的所有记录 # 删除本地音频文件 if voice.voice_path and delete_local_audio_file(voice.voice_path): second_fail_details['voice_files_deleted'] += 1 # 调用科大讯飞API删除音频特征 if voice.voice_feature_id: if delete_iflytek_feature(user_id, voice): second_fail_details['iflytek_features_deleted'] += 1 # 清空相关字段 voice.mfcc = None voice.voice_path = None voice.voice_feature_id = None voice.save() print(f"Audio similarity. API response: {similarity}") except VoiceFeatureInfo.DoesNotExist: return JsonResponse({ "status": "error", "message": "No matching voice was found", "date": current_time }) except Exception as e: return JsonResponse({ "status": "error", "message": f" error occurred during voice comparison: {str(e)}", "date": current_time }) # 返回比对结果 res_msg = 'pass' if best_res else 'fail' if best_res: res_msg = 'pass' dog_list = [] for dog in dog_info: dog_serialized = serializers.serialize('json', [dog]) dog_data = json.loads(dog_serialized)[0]['fields'] dog_id = dog_data['d_id'] # 获取狗的信息并转换为字典 # 获取狗的信息并转换为字典 new_dog_status = aidog_tools.model_to_dict_without_id(DogStatus.objects.filter(d_id=dog_id).first()) new_dog_training = aidog_tools.model_to_dict_without_id(DogTrainingNew.objects.filter(d_id=dog_id).first()) new_dog_body_attributes = aidog_tools.model_to_dict_without_id(DogBodyAttributes.objects.filter(d_id=dog_id).first()) new_dog_personality_relationship = aidog_tools.model_to_dict_without_id(DogPersonalityRelationship.objects.filter(d_id=dog_id).first()) # 合并所有相关信息 dog_info_combined = { **dog_data, **new_dog_status, **new_dog_training, **new_dog_body_attributes, **new_dog_personality_relationship, } # 只保留字段而不是数据库内部的元数据(如 '_state') dog_info_clean = {key: value for key, value in dog_info_combined.items() if not key.startswith('_')} dog_list.append(dog_info_clean) # 保存比对结果 VoiceCompareScores.objects.create( uid=user_id, voice_feature_id=feature_id, keda_score=score, mfcc_score=similarity['result'], kedacompare_spend_time=message_response['time_diff'], mfcccompare_spend_time=similarity['time_diff'], createtime=current_time ) return JsonResponse({ "status": "success", "date": current_time, "message": res_msg, "mfcc_score": similarity['result'], "keda_score": score, "user_id": user_id, "voice_type": command_type, "date_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "dogs": dog_list }) else: res_msg='fail' return JsonResponse({ "status": "success", "date": current_time, "message": res_msg, "second_fail_details":second_fail_details, "date_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), }) except Exception as e: return JsonResponse({ "status": "error", "message": f"error occurred while processing the secondary upload: {str(e)}", "date": current_time }) finally: # 清理临时文件 if output_mp3_path and os.path.exists(output_mp3_path): try: os.remove(output_mp3_path) except Exception: # 记录但不中断流程 pass # 生成并发安全的文件路径 # 参数: # user_id: 用户ID # feature_id: 声纹ID # is_temp: 是否为临时文件 # # 返回: # 文件存储路径 # 2025-4-8 Author Feng def _get_file_path(user_id, feature_id, is_temp=False): # 基础目录 base_directory = "audio_cache" # 按用户ID分目录 user_directory = os.path.join(base_directory, f"user_{user_id}") # 确保目录存在 os.makedirs(user_directory, exist_ok=True) # 确保文件名全局唯一,防止多用户并发冲突 timestamp = int(time.time()) # 文件命名包含必要的标识信息 file_name = f"{'voice_' if is_temp else 'tempvoice_'}{user_id}_{feature_id}_{timestamp}.mp3" return os.path.join(user_directory, file_name) def _validate_request_params(request): # 验证请求参数的完整性 # 获取请求参数 access_token = request.POST.get("access_token", '') user_id = request.POST.get("user_id", '') dog_id = request.POST.get("dog_id", '') # file_stream = request.FILES.get("voice") # 从form-data到binary切换:修改这里的文件获取方式,从'voice'改为'file' # 尝试从FILES获取文件(multipart/form-data方式) file_stream = request.FILES.get("voice") or request.FILES.get("file") # 如果FILES中没有找到文件,并且Content-Type是application/octet-stream, # 则从请求体中读取二进制数据 if not file_stream and request.content_type == 'application/octet-stream': from django.core.files.uploadedfile import SimpleUploadedFile file_stream = SimpleUploadedFile("voice.zip", request.body, content_type="application/octet-stream") voice_type = request.POST.get("voice_type", '') timestamp = request.POST.get("datetime", '') current_times = request.POST.get("current_times", '') # 当前第几次上传音频源文件 total_times = request.POST.get("total_times", '') # 总共会上传几次 current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 验证文件是否存在 if not file_stream: return JsonResponse({ "status": "error", "message": "voice file was not uploaded", "date": current_time }) # 验证必要参数是否存在 if not all([user_id, dog_id,voice_type, access_token, current_times, total_times]): return JsonResponse({ "status": "error", "date": current_time, "message": "Please ensure all params are provided", "end_time": current_time }) return { 'access_token': access_token, 'user_id': user_id, 'dog_id': dog_id, 'file_stream': file_stream, 'voice_type': voice_type, 'timestamp': timestamp, 'current_times': current_times, 'total_times': total_times } #语音呼唤发送接口:客户端上传用户id、wav.zip文件流,access_token # 2025/4/21 # author Feng # 将语音发送给服务器,返回所有狗的得分,得分最高的狗进入交互状态 # 得分类型float保留2位小数(减少得分一样概率) @csrf_exempt def voice_call_training(request): try: # 获取请求参数 access_token = request.POST.get("access_token", '') user_id = request.POST.get("user_id", '') # 尝试从FILES获取文件(multipart/form-data方式) file_stream = request.FILES.get("voice") or request.FILES.get("file") # 如果FILES中没有找到文件,并且Content-Type是application/octet-stream, # 则从请求体中读取二进制数据 if not file_stream and request.content_type == 'application/octet-stream': from django.core.files.uploadedfile import SimpleUploadedFile file_stream = SimpleUploadedFile("voice.zip", request.body, content_type="application/octet-stream") feature_id = str(time.time())[:12] # 生成声纹特征ID max_score = 0.6 # 初始为0.3, 只记录比0.3大的score score=0 max_MFCC_score=0.6 best_res = None # 存储符合条件的最大 score 结果 res_msg='' # 记录请求时间 current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 验证请求参数 # 验证文件是否存在 if not file_stream: return JsonResponse({ "status": "error", "message": "voice file was not uploaded", "date": current_time }) if not all([user_id, access_token]): return JsonResponse({ "status": "error", "date": current_time, "message": "Please ensure all required fields are provided!", "end_time": current_time }) # 验证访问令牌 token_validation = aidog_tools.validate_access_token(access_token) if isinstance(token_validation, JsonResponse): return token_validation user_id = token_validation['user_id'] # 当前时间 date_now=datetime.now() # 生成唯一的文件路径 output_mp3_path = _get_file_path(user_id, feature_id) # 处理上传的音频文件 wav_to_mp3_result = dbOperate_upload_Uvoice.process_zip_audio_file(file_stream, output_mp3_path) if not wav_to_mp3_result: return JsonResponse({ "status": "error", "message": "audio file failed to be processed", "date": current_time }) try: # 获取用户原始声纹信息 voices = VoiceFeatureInfo.objects.filter(uid=user_id, command_type='voiceCall') dog_info = Doginfo.objects.filter(owner_id=user_id) # 创建一个列表来存储所有voice的得分结果 voice_MFCC_score = { } voice_KEDA_score = { } for voice in voices: # 使用本地比对算法 sound_wave_1 = voice.voice_path sound_wave_2 = output_mp3_path mfcc_1 = wave_compare.extract_mfcc(sound_wave_1) mfcc_2 = wave_compare.extract_mfcc(sound_wave_2) similarity = wave_compare.compute_similarity(mfcc_2, mfcc_1) voice_MFCC_score[voice.d_id] = similarity['result'] print(f"Audio similarity. API response: {similarity}") # 使用科大算法 message_response = generalRequest.req_url( api_name='searchScoreFea', APPId=APPId, APIKey=APIKey, APISecret=APISecret, file_path=voice.voice_path, featureId=voice.voice_feature_id, groupId=user_id ) if message_response.get('code') == 0: data_dict = json.loads(message_response.get('msg')) score = data_dict.get('score') voice_KEDA_score[voice.d_id] = score else: return JsonResponse({ "status": "error", "code":400, "message": message_response.get('msg'), "date": current_time }) # 找出MFCC得分最高的狗ID highest_mfcc_score = 0 highest_mfcc_dog_id = None for dog_id, mfcc_score in voice_MFCC_score.items(): if mfcc_score and float(mfcc_score) > highest_mfcc_score: highest_mfcc_score = float(mfcc_score) highest_mfcc_dog_id = dog_id # 如果找到了最高分数的狗ID,更新其voicecall值 # if highest_mfcc_dog_id: # try: # dog_training = DogTrainingNew.objects.get(d_id=highest_mfcc_dog_id) # dog_training.voicecall += 4 # dog_training.save() # print(f"Updated voicecall for dog ID {highest_mfcc_dog_id}, new value: {dog_training.voicecall}") # except DogTrainingNew.DoesNotExist: # # 如果找不到对应的训练记录 # return JsonResponse({ # "status": "error", # "message": "No matching dog was found", # "date": current_time # }) # 如果找到了最高分数的狗ID,更新其voicecall值和personality属性 if highest_mfcc_dog_id: try: # 更新voicecall,每日最多+12 dog_training = DogTrainingNew.objects.get(d_id=highest_mfcc_dog_id) # 获取今天的日期(年-月-日) today = date_now.date() # 检查是否有今天的训练记录 training_log = DogTrainingLog.objects.filter( d_id=highest_mfcc_dog_id, owner_id=user_id, command_type='voiceCall', training_date=today ).first() # 如果没有今天的记录,创建一个新的 if not training_log: training_log = DogTrainingLog.objects.create( d_id=highest_mfcc_dog_id, owner_id=user_id, command_type='voiceCall', training_date=today, training_count=0 ) # 检查是否达到每日上限 if training_log.training_count < 3: # 每次+4,最多+12,所以最多3次 # voicecall是TINYINT类型,最大值限制为100 max_value = 100 # 检查更新后的值是否超过最大值 new_value = dog_training.voicecall + 4 if new_value > max_value: # 如果超过最大值,设置为最大值 dog_training.voicecall = max_value else: dog_training.voicecall = new_value dog_training.save() # 更新训练记录 training_log.training_count += 1 training_log.save() print( f"Updated voicecall for dog ID {highest_mfcc_dog_id}, new value: {dog_training.voicecall}") # 更新亲密度、友好度和服从度 try: dog_relationship = DogPersonalityRelationship.objects.get(d_id=highest_mfcc_dog_id) # 检查是否有今天的亲密度、友好度和服从度记录 relationship_log = DogPersonalityRelationshipLog.objects.filter( d_id=highest_mfcc_dog_id, log_date=today ).first() # 如果没有今天的记录,创建一个新的 if not relationship_log: relationship_log = DogPersonalityRelationshipLog.objects.create( d_id=highest_mfcc_dog_id, log_date=today, intimate_count=0, friendly_count=0, obedience_count=0 ) # 检查并更新亲密度 if relationship_log.intimate_count < 2: dog_relationship.intimate += 1 relationship_log.intimate_count += 1 # 检查并更新友好度 if relationship_log.friendly_count < 2: dog_relationship.friendly += 1 relationship_log.friendly_count += 1 # 检查并更新服从度 if relationship_log.obedience_count < 2: dog_relationship.obedience += 1 relationship_log.obedience_count += 1 # 保存更新 dog_relationship.save() relationship_log.save() print(f"Updated relationship attributes for dog ID {highest_mfcc_dog_id}") except DogPersonalityRelationship.DoesNotExist: print(f"No personality relationship found for dog ID {highest_mfcc_dog_id}") except DogTrainingNew.DoesNotExist: # 如果找不到对应的训练记录 return JsonResponse({ "status": "error", "message": "No matching dog was found", "date": current_time }) except VoiceFeatureInfo.DoesNotExist: return JsonResponse({ "status": "error", "message": "No matching voice was found", "date": current_time }) except Exception as e: return JsonResponse({ "status": "error", "message": f" error occurred during voice comparison: {str(e)}", "date": current_time }) finally: # 清理临时文件 if os.path.exists(output_mp3_path): os.remove(output_mp3_path) # 保存比对结果 voice_compare = VoiceCompareScores.objects.create( uid=user_id, voice_feature_id=feature_id, keda_score=score, mfcc_score=similarity['result'], kedacompare_spend_time=message_response['time_diff'], mfcccompare_spend_time=similarity['time_diff'], createtime=current_time ) dog_list = [] for dog in dog_info: dog_serialized = serializers.serialize('json', [dog]) dog_data = json.loads(dog_serialized)[0]['fields'] dog_id = dog_data['d_id'] # 获取狗的信息并转换为字典 # 获取狗的信息并转换为字典 new_dog_status = aidog_tools.model_to_dict_without_id(DogStatus.objects.filter(d_id=dog_id).first()) new_dog_training = aidog_tools.model_to_dict_without_id(DogTrainingNew.objects.filter(d_id=dog_id).first()) new_dog_body_attributes = aidog_tools.model_to_dict_without_id( DogBodyAttributes.objects.filter(d_id=dog_id).first()) new_dog_personality_relationship = aidog_tools.model_to_dict_without_id( DogPersonalityRelationship.objects.filter(d_id=dog_id).first()) # 合并所有相关信息 dog_info_combined = { **dog_data, **new_dog_status, **new_dog_training, **new_dog_body_attributes, **new_dog_personality_relationship, } # 只保留字段而不是数据库内部的元数据(如 '_state') dog_info_clean = {key: value for key, value in dog_info_combined.items() if not key.startswith('_')} dog_list.append(dog_info_clean) # 返回包含所有voice得分的结果 return JsonResponse({ "status": "success", "code":200, "date": current_time, "message": 'voice call result', "call Score MFCC":voice_MFCC_score, "call Score Xunfei":voice_KEDA_score, "MFCC response time": similarity['time_diff'], "XunFei response time": message_response.get('time_diff') , "dogs": dog_list }) except Exception as e: # 全局异常处理 import traceback traceback.print_exc() return JsonResponse({ "status": "error", "message": f"Server processing exception: {str(e)}", "date": current_time }) #语音指令发送接口:客户端上传用户id、wav.zip文件流,access_token,dog_id # 2025/4/28 # author Feng # 将语音发送给服务器,并附上狗的id,返回狗的所有动作中得分最高的 # commandScore 对应每一个命令得分。如果这个指令没有开启,得分0。如果开启最低得分1,最高得分99。得分类型float保留2位小数(减少得分一样概率) # 这个接口返回的是语音识别得分。得分+属性,由客户端判断结果。 @csrf_exempt def voice_command_training(request): try: # 初始化全局变量 output_mp3_path = None # 记录请求时间 current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') similarity_time_diff = None xunfei_time_diff = None # 获取请求参数 access_token = request.POST.get("access_token", '') user_id = request.POST.get("user_id", '') d_id = request.POST.get("dog_id", '') # 尝试从FILES获取文件(multipart/form-data方式) file_stream = request.FILES.get("voice") or request.FILES.get("file") # 如果FILES中没有找到文件,并且Content-Type是application/octet-stream, # 则从请求体中读取二进制数据 if not file_stream and request.content_type == 'application/octet-stream': from django.core.files.uploadedfile import SimpleUploadedFile file_stream = SimpleUploadedFile("voice.zip", request.body, content_type="application/octet-stream") feature_id = str(time.time())[:12] # 生成声纹特征ID # 验证请求参数 # 验证文件是否存在 if not file_stream: return JsonResponse({ "status": "error", "message": "voice file was not uploaded", "date": current_time }) if not all([user_id,d_id, access_token]): return JsonResponse({ "status": "error", "date": current_time, "message": "Please ensure all required fields are provided!", "end_time": current_time }) # 验证访问令牌 token_validation = aidog_tools.validate_access_token(access_token) if isinstance(token_validation, JsonResponse): return token_validation user_id = token_validation['user_id'] # 当前时间 date_now=datetime.now() # 生成唯一的文件路径 output_mp3_path = _get_file_path(user_id, feature_id) # 处理上传的音频文件 wav_to_mp3_result = dbOperate_upload_Uvoice.process_zip_audio_file(file_stream, output_mp3_path) if not wav_to_mp3_result: return JsonResponse({ "status": "error", "message": "audio file failed to be processed", "date": current_time }) try: # 获取用户原始声纹信息 voices = VoiceFeatureInfo.objects.filter(uid=user_id,d_id=d_id) dog_info = Doginfo.objects.filter(owner_id=user_id) dog_training=DogTrainingNew.objects.filter(d_id=d_id) # 创建一个列表来存储所有voice的得分结果 commandScoreMFCC = { } commandScoreXunFei = { } # 默认初始化这些变量,以防在下面的代码中没有被赋值 similarity_time_diff = 0 xunfei_time_diff = 0 for voice in voices: # 使用本地比对算法,只比对非voiceCall类型 if voice.command_type!= 'voiceCall' : sound_wave_1 = voice.voice_path sound_wave_2 = output_mp3_path mfcc_1 = wave_compare.extract_mfcc(sound_wave_1) mfcc_2 = wave_compare.extract_mfcc(sound_wave_2) similarity = wave_compare.compute_similarity(mfcc_2, mfcc_1) commandScoreMFCC[voice.command_type] = similarity['result'] similarity_time_diff = similarity['time_diff'] # 保存最后一次比对的时间差 print(f"Audio similarity. API response: {similarity}") # 使用科大算法 message_response = generalRequest.req_url( api_name='searchScoreFea', APPId=APPId, APIKey=APIKey, APISecret=APISecret, file_path=voice.voice_path, featureId=voice.voice_feature_id, groupId=user_id ) if message_response.get('code') == 0: data_dict = json.loads(message_response.get('msg')) score = data_dict.get('score') xunfei_time_diff = message_response.get('time_diff') # 保存科大API的时间差 commandScoreXunFei[voice.command_type] = float(data_dict.get('score', 0)) else: return JsonResponse({ "status": "error", "code":400, "message": message_response.get('msg'), "date": current_time }) # 保存比对结果 voice_compare = VoiceCompareScores.objects.create( uid=user_id, voice_feature_id=voice.voice_feature_id, keda_score=float(data_dict.get('score', 0)), mfcc_score=similarity['result'], kedacompare_spend_time=message_response['time_diff'], mfcccompare_spend_time=similarity['time_diff'], createtime=current_time ) # 找出MFCC得分最高的狗ID highest_mfcc_score = 0 highest_mfcc_command_type= None for command_type, mfcc_score in commandScoreMFCC.items(): if mfcc_score and float(mfcc_score) > highest_mfcc_score: highest_mfcc_score = float(mfcc_score) highest_mfcc_command_type = command_type # 如果找到了最高分数的狗ID,更新其voicecommand值和personality属性 if highest_mfcc_command_type: try: # 更新voicecall,每日最多+12 dog_training = DogTrainingNew.objects.get(d_id=d_id) # 获取今天的日期(年-月-日) today = date_now.date() # 检查是否有今天的训练记录 training_log = DogTrainingLog.objects.filter( d_id=d_id, owner_id=user_id, command_type='voiceCommand', training_date=today ).first() # 如果没有今天的记录,创建一个新的 if not training_log: training_log = DogTrainingLog.objects.create( d_id=d_id, owner_id=user_id, command_type='voiceCommand', training_date=today, training_count=0 ) # 检查是否达到每日上限 if training_log.training_count < 3: # 每次+4,最多+12,所以最多3次 # voicecommand是TINYINT类型,最大值限制为100 max_value = 100 # 检查更新后的值是否超过最大值 new_value = dog_training.voicecommand + 4 if new_value > max_value: # 如果超过最大值,设置为最大值 dog_training.voicecommand = max_value else: dog_training.voicecommand = new_value dog_training.save() # 更新训练记录 training_log.training_count += 1 training_log.save() print( f"Updated voicecommand for dog ID {d_id}, new value: {dog_training.voicecommand}") # 更新亲密度、友好度和服从度 try: dog_relationship = DogPersonalityRelationship.objects.get(d_id=d_id) # 检查是否有今天的亲密度、友好度和服从度记录 relationship_log = DogPersonalityRelationshipLog.objects.filter( d_id=d_id, log_date=today ).first() # 如果没有今天的记录,创建一个新的 if not relationship_log: relationship_log = DogPersonalityRelationshipLog.objects.create( d_id=d_id, log_date=today, intimate_count=0, friendly_count=0, obedience_count=0 ) # 检查并更新亲密度 if relationship_log.intimate_count < 2: dog_relationship.intimate += 1 relationship_log.intimate_count += 1 # 检查并更新友好度 if relationship_log.friendly_count < 2: dog_relationship.friendly += 1 relationship_log.friendly_count += 1 # 检查并更新服从度 if relationship_log.obedience_count < 2: dog_relationship.obedience += 1 relationship_log.obedience_count += 1 # 保存更新 dog_relationship.save() relationship_log.save() print(f"Updated relationship attributes for dog ID {d_id}") except DogPersonalityRelationship.DoesNotExist: print(f"No personality relationship found for dog ID {d_id}") except DogTrainingNew.DoesNotExist: # 如果找不到对应的训练记录 return JsonResponse({ "status": "error", "message": "No matching dog was found", "date": current_time }) except VoiceFeatureInfo.DoesNotExist: return JsonResponse({ "status": "error", "message": "No matching voice was found", "date": current_time }) except Exception as e: return JsonResponse({ "status": "error", "message": f" error occurred during voice comparison: {str(e)}", "date": current_time }) finally: # 清理临时文件 if os.path.exists(output_mp3_path): os.remove(output_mp3_path) dog_list = [] for dog in dog_info: dog_serialized = serializers.serialize('json', [dog]) dog_data = json.loads(dog_serialized)[0]['fields'] dog_id = dog_data['d_id'] # 获取狗的信息并转换为字典 # 获取狗的信息并转换为字典 new_dog_status = aidog_tools.model_to_dict_without_id(DogStatus.objects.filter(d_id=dog_id).first()) new_dog_training = aidog_tools.model_to_dict_without_id(DogTrainingNew.objects.filter(d_id=dog_id).first()) new_dog_body_attributes = aidog_tools.model_to_dict_without_id( DogBodyAttributes.objects.filter(d_id=dog_id).first()) new_dog_personality_relationship = aidog_tools.model_to_dict_without_id( DogPersonalityRelationship.objects.filter(d_id=dog_id).first()) # 合并所有相关信息 dog_info_combined = { **dog_data, **new_dog_status, **new_dog_training, **new_dog_body_attributes, **new_dog_personality_relationship, } # 只保留字段而不是数据库内部的元数据(如 '_state') dog_info_clean = {key: value for key, value in dog_info_combined.items() if not key.startswith('_')} dog_list.append(dog_info_clean) # 返回包含所有voice得分的结果 return JsonResponse({ "status": "success", "code":200, "date": current_time, "message": 'voice result', "commandScoreMFCC":commandScoreMFCC, "commandScoreXunFei":commandScoreXunFei, "MFCC response time": similarity_time_diff, "XunFei response time": xunfei_time_diff, "dogs": dog_list }) except Exception as e: # 全局异常处理 import traceback traceback.print_exc() return JsonResponse({ "status": "error", "message": f"Server processing exception: {str(e)}", "date": current_time }) #重制语音训练接口:客户端上传user_id、dog_id,access_token,date_time,type # POST /api/voice/reset/ # 2025/5/30 # author Feng # type==call 重制voiceCall # type==command 重制voiceCommand,以及下面所有指令 # type==all 重制所有 # 这个接口返回的是重置结果。 @csrf_exempt def voice_reset(request): # 记录请求时间 current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') try: # 获取请求参数 params = aidog_request_tools.handle_post_request_parameters( request, ["access_token", "user_id", "dog_id", "type"] ) date_time = request.POST.get("date_time", '') access_token = params["access_token"] dog_id = params["dog_id"] type = params["type"] # 验证访问令牌 token_validation = aidog_tools.validate_access_token(access_token) if isinstance(token_validation, JsonResponse): return token_validation user_id = token_validation['user_id'] # try: # 根据type类型,删除VoiceFeatureInfo表中该type类型下的mfcc字段清空,path字段和path对应的本地mp3音频文件, # voice_feature_id清空并根据此voice_feature_id通过调用科大讯飞API删除对应id下的音频文件message_response = generalRequest.req_url( # api_name='deleteFeature', APPId=APPId, APIKey=APIKey, APISecret=APISecret, # file_path=voice.voice_path, featureId=voice.voice_feature_id, groupId=user_id # ) # 根据dog_id,查找DogTrainingNew表中该type类型下的数值并清零 # 如果type=all,则清除DogTrainingNew、VoiceFeatureInfo表中该dog_id下的所有type数值以及本地音频,科大讯飞音频 try: # voices = VoiceFeatureInfo.objects.filter(uid=user_id,d_id=dog_id) # dog_training=DogTrainingNew.objects.filter(d_id=dog_id) dog_info = Doginfo.objects.filter(owner_id=user_id) reset_details = { 'voice_files_deleted': 0, 'iflytek_features_deleted': 0, 'training_fields_reset': [] } if type == "all": # 重置所有类型的数据 result = reset_all_data(user_id, dog_id, reset_details) else: # 重置特定类型的数据 result = reset_specific_type_data(user_id, dog_id, type, reset_details) if not result: return { 'error': result, 'message': 'Reset failed', 'details': reset_details } except Exception as e: return { 'success': False, 'message': f'Error during reset operation: {str(e)}', 'details': {} } dog_list = [] for dog in dog_info: dog_serialized = serializers.serialize('json', [dog]) dog_data = json.loads(dog_serialized)[0]['fields'] dog_id = dog_data['d_id'] # 获取狗的信息并转换为字典 new_dog_status = aidog_tools.model_to_dict_without_id(DogStatus.objects.filter(d_id=dog_id).first()) new_dog_training = aidog_tools.model_to_dict_without_id(DogTrainingNew.objects.filter(d_id=dog_id).first()) new_dog_body_attributes = aidog_tools.model_to_dict_without_id( DogBodyAttributes.objects.filter(d_id=dog_id).first()) new_dog_personality_relationship = aidog_tools.model_to_dict_without_id( DogPersonalityRelationship.objects.filter(d_id=dog_id).first()) # 合并所有相关信息 dog_info_combined = { **dog_data, **new_dog_status, **new_dog_training, **new_dog_body_attributes, **new_dog_personality_relationship, } # 只保留字段而不是数据库内部的元数据(如 '_state') dog_info_clean = {key: value for key, value in dog_info_combined.items() if not key.startswith('_')} dog_list.append(dog_info_clean) # 返回包含所有voice得分的结果 return JsonResponse({ "status": "success", "code":200, "date": current_time, "message": 'reset success', "dogs": dog_list, "reset_details": reset_details }) except Exception as e: # 全局异常处理 import traceback traceback.print_exc() return JsonResponse({ "status": "error", "message": f"Server processing exception: {str(e)}", "date": current_time }) # 重置所有类型的数据 def reset_all_data(user_id, dog_id, reset_details): try: # 1. 处理VoiceFeatureInfo表中的所有记录 voices = VoiceFeatureInfo.objects.filter(uid=user_id, d_id=dog_id) for voice in voices: # 删除本地音频文件 if voice.voice_path and delete_local_audio_file(voice.voice_path): reset_details['voice_files_deleted'] += 1 # 调用科大讯飞API删除音频特征 if voice.voice_feature_id: if delete_iflytek_feature(user_id, voice): reset_details['iflytek_features_deleted'] += 1 # 清空相关字段 voice.mfcc = None voice.voice_path = None voice.voice_feature_id = None voice.save() # 2. 重置DogTrainingNew表中的所有训练数据 dog_training = DogTrainingNew.objects.filter(d_id=dog_id).first() if dog_training: training_fields = get_all_training_fields() for field in training_fields: if hasattr(dog_training, field): setattr(dog_training, field, 0) reset_details['training_fields_reset'].append(field) dog_training.voicecallenable=0 dog_training.voicecommandenable=0 dog_training.save() return True except Exception as e: print(f"Error in reset_all_data: {str(e)}") return False # 重置特定类型的数据 def reset_specific_type_data(user_id, dog_id, reset_type, reset_details): try: # 1. 处理VoiceFeatureInfo表中特定type的记录 voices = VoiceFeatureInfo.objects.filter(uid=user_id, d_id=dog_id, command_type=reset_type) for voice in voices: # 删除本地音频文件 if voice.voice_path and delete_local_audio_file(voice.voice_path): reset_details['voice_files_deleted'] += 1 # 调用科大讯飞API删除音频特征 if voice.voice_feature_id: if delete_iflytek_feature(user_id, voice): reset_details['iflytek_features_deleted'] += 1 # 清空相关字段 voice.mfcc = None voice.voice_path = None voice.voice_feature_id = None voice.save() # 2. 重置DogTrainingNew表中特定type的数值 dog_training = DogTrainingNew.objects.filter(d_id=dog_id).first() if dog_training and hasattr(dog_training, reset_type): setattr(dog_training, reset_type, 0) dog_training.save() reset_details['training_fields_reset'].append(reset_type) return True except Exception as e: print(f"Error in reset_specific_type_data: {str(e)}") return False # 获取所有训练相关的字段名 def get_all_training_fields(): return [ 'commandsit', 'commandstand', 'commandliedown', 'commandbark', 'commandturnl', 'commandtouch', 'commandturnr', 'commanddeath','commandshake', ] # 删除本地音频文件 def delete_local_audio_file(file_path): try: if os.path.exists(file_path): os.remove(file_path) print(f"Successfully deleted local file: {file_path}") return True else: print(f"File not found: {file_path}") return False except Exception as e: print(f"Error deleting file {file_path}: {str(e)}") return False # 调用科大讯飞API删除音频特征 def delete_iflytek_feature(user_id, voice): try: # 科大讯飞配置 message_response = generalRequest.req_url( api_name='deleteFeature', APPId=APPId, # 需要定义 APIKey=APIKey, # 需要定义 APISecret=APISecret, # 需要定义 file_path=voice.voice_path, featureId=voice.voice_feature_id, groupId=user_id ) # 根据API响应判断是否成功 if message_response and message_response.get('status') == 'success': print(f"Successfully deleted Iflytek feature: {voice.voice_feature_id}") return True else: print(f"Failed to delete Iflytek feature: {voice.voice_feature_id}") return False except Exception as e: print(f"Error calling Iflytek API: {str(e)}") return False #科大讯飞校验声纹接口:客户端上传用户id、wav格式文件流,转换文件流为MP3缓存在本地固定路径下 # 提交用户id到数据库遍历全部声纹id # 每个声纹id+用户id+mp3路径上传到科大校验接口,筛选出得分最高的声纹id,并返回其口令类型,删除此mp3文件 # 2024/12/02 无token # author Feng # 2025/3/31更新 def check_Uvoice_latest(request): user_uid = request.POST.get("user_id", '') file_stream = request.FILES.get("file_stream") best_res = None max_score = 0.3 # Threshold score date_now = datetime.now() current_time = date_now.strftime('%Y-%m-%d %H:%M:%S') # Validate inputs if not file_stream: return JsonResponse({ "status": "error", "message": "File not uploaded.", "date": current_time }) if not user_uid: return JsonResponse({ "status": "error", "date": current_time, "message": "Please ensure all required fields are provided!", "end_time": current_time }) # Set up cache directory cache_directory = "audio_cache" os.makedirs(cache_directory, exist_ok=True) # Create temporary file timestamp = int(time.time()) mp3_path = os.path.join(cache_directory, f"voice_check_{user_uid}_{timestamp}.mp3") # Process audio file if not dbOperate_upload_Uvoice.process_audio_file(file_stream, mp3_path): return JsonResponse({ "status": "error", "message": "Failed to process the audio file.", "date": current_time }) # Get all voice features for the user voices = VoiceFeatureInfo.objects.filter(uid=user_uid) # Check the audio against each stored feature for voice in voices: message_response = generalRequest.req_url( api_name='searchScoreFea', APPId=APPId, APIKey=APIKey, APISecret=APISecret, file_path=mp3_path, featureId=voice.voice_feature_id, groupId=user_uid ) if message_response.get('code') == 0: data_dict = json.loads(message_response.get('msg')) score = data_dict.get('score') # Update best match if score and score > max_score: max_score = score best_res = { "status": "success", "date": current_time, "message": message_response, "score": score, "command_type": voice.command_type, "end_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S') } # Award points if match is found try: # Get or create points tracker audio_points, created = AudioPointsTracker.objects.get_or_create( uid=user_uid, command_type=voice.command_type, defaults={'points': 0} ) # Add 10 points audio_points.points += 10 audio_points.save() best_res["points"] = audio_points.points best_res["points_added"] = 10 except Exception as db_error: best_res["points_error"] = str(db_error) # Clean up temporary file os.remove(mp3_path) # Return results if best_res: return JsonResponse(best_res) else: return JsonResponse({ "status": "error", "message": "请重新发出口令.", "date": current_time })