import shutil import time import zipfile from datetime import datetime from pydub import AudioSegment from pydub.exceptions import CouldntDecodeError import uuid import tempfile import os os.environ.setdefault("DJANGO_SETTINGS_MODULE", "aiDogProject.settings") import django django.setup() from aiDogApp.models import * from .generalRequest import req_url # Importing your generalRequest.py method # 转换上传的文件流为mp3 并存储在本地 def process_audio_file(file_stream, output_mp3_path): try: # 使用临时目录创建临时 WAV 文件路径----old version # temp_dir = tempfile.gettempdir() # 使用系统临时目录 # temp_wav_path = os.path.join(temp_dir, f"temp_{uuid.uuid4().hex}.wav") # 文件处理优化:使用内存处理(如io.BytesIO)替代文件 I / O,减少磁盘访问。---new version temp_wav_path = tempfile.mktemp(suffix='.wav') # 保存上传的文件流为临时 WAV 文件 with open(temp_wav_path, 'wb') as temp_file: temp_file.write(file_stream.read()) # 检查临时文件是否有效 if not os.path.exists(temp_wav_path) or os.path.getsize(temp_wav_path) == 0: # print("Error: Temporary WAV file is empty or not created.") return False # 转换 WAV 到 MP3 audio_segment = AudioSegment.from_wav(temp_wav_path) audio_segment.export(output_mp3_path, format="mp3") # 清理临时 WAV 文件 os.remove(temp_wav_path) return True # return "Success: Audio file converted and saved." except Exception as e: print(f"Error processing audio file: {e}") return False # return f"Error: {str(e)}" # 将zip内的wav文件解压出来 # 再转换上传的文件流为mp3 并存储在本地 # 2025/4/1 # author Feng def process_zip_audio_file(file_stream, output_mp3_path): try: # 创建临时目录用于解压文件 temp_dir = tempfile.mkdtemp() zip_name = output_mp3_path[0:8] temp_zip_path = os.path.join(temp_dir, f"{zip_name}.zip") # 保存上传的压缩文件流 with open(temp_zip_path, 'wb') as temp_file: temp_file.write(file_stream.read()) # 检查临时压缩文件是否有效 if not os.path.exists(temp_zip_path) or os.path.getsize(temp_zip_path) == 0: print("Error: Temporary ZIP file is empty or not created.") return False # return "Error: Temporary ZIP file is empty or not created." # 解压文件 with zipfile.ZipFile(temp_zip_path, 'r') as zip_ref: zip_ref.extractall(temp_dir) # 查找voice.wav文件 wav_file_path = os.path.join(temp_dir, "voice.wav") if not os.path.exists(wav_file_path): print("Error: voice.wav not found in the zip archive.") return False # return "Error: voice.wav not found in the zip archive." # 转换WAV到MP3 audio_segment = AudioSegment.from_wav(wav_file_path) audio_segment.export(output_mp3_path, format="mp3") # 清理临时文件 shutil.rmtree(temp_dir) # 递归删除临时目录及其内容 return True except Exception as e: print(f"Error processing audio file: {e}") # 确保清理临时文件,即使发生错误 if 'temp_dir' in locals() and os.path.exists(temp_dir): shutil.rmtree(temp_dir) return False # return f"Error processing audio file: {e}" # Function to insert audio information into MySQL database def insert_voice_data(uid,mp3_path, feature_id,command_type,record_time): try: # 读取 WAV 文件 为二进制数据 # with open(mp3_path, 'rb') as audio_file: # wav_data = audio_file.read() wav_data='' # 插入到声纹信息表中 # 口令类型(0:叫名字 1:坐下,2:站起,3:摇尾巴,4:走过来,5 抬左手,6:抬右手) if command_type==0: # 口令类型(0:叫名字 # insert_query = """ # INSERT INTO voice_feature_info (uid, voice_feature_id, voice_name_command,command_type) # VALUES (%s, %s, %s, %s) # """ # # cursor.execute(insert_query, (uid, feature_id, wav_data, command_type)) updated_count =VoiceFeatureInfo.objects.filter(uid=uid,command_type=command_type).update(voice_feature_id=feature_id, voice_name_command=wav_data, record_time=record_time) if updated_count > 0: return True else: return False elif command_type==1: # 口令类型(1:坐下 updated_count =VoiceFeatureInfo.objects.filter(uid=uid,command_type=command_type).update(voice_feature_id=feature_id, voice_sit_command=wav_data, record_time=record_time) if updated_count > 0: return True else: return False elif command_type == 2: # 口令类型(2:站起 updated_count =VoiceFeatureInfo.objects.filter(uid=uid,command_type=command_type).update(voice_feature_id=feature_id, voice_stand_command=wav_data, record_time=record_time) if updated_count > 0: return True else: return False elif command_type == 3: # 口令类型(3:摇尾巴 updated_count =VoiceFeatureInfo.objects.filter(uid=uid,command_type=command_type).update(voice_feature_id=feature_id, voice_wagtail_command=wav_data, record_time=record_time) if updated_count > 0: return True else: return False else: # 口令类型(4:走过来 updated_count =VoiceFeatureInfo.objects.filter(uid=uid,command_type=command_type).update(voice_feature_id=feature_id, voice_come_command=wav_data, record_time=record_time) if updated_count > 0: return True else: return False # conn.commit() print(f"Data inserted into database for UID: {uid}") # return True # 抛出异常 except Exception as err: print(f"Error: {err}") return f"Error: {str(err)}" # return False # APPId = "527f831f" # APISecret = "YWE5MjI0MzA4NmM3MTNmNTNiMWJkYzE4" # APIKey = "be5c37f3db1569737934240a0e3ad02d" APPId = "ed8eb862" APISecret = "YzEyMDYyMzljMDViNWJlZDdlOWJhYjVi" APIKey = "b4c831160d8933221e95bda817547e99" # 接收声纹录音, 保存在本地, 并插入声纹特征到数据库, 然后调用科大接口识别 req_url() def process_voice_recording(uid, voice_feature_id, command_type, file_stream,mp3_path): # 定义本地缓存路径 # cache_directory = "audio_cache" # os.makedirs(cache_directory, exist_ok=True) # 创建一个唯一的文件名 timestamp=int(time.time()) dd = datetime.today() timestamp11 = datetime.strftime(dd, '%Y-%m-%d %H:%M:%S') record_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # # 按用户ID分目录 # user_directory = os.path.join(cache_directory, f"user_{uid}") # # 确保目录存在 # os.makedirs(user_directory, exist_ok=True) # # 文件命名包含必要的标识信息 # file_name = f"voice_{uid}_{voice_feature_id}_{timestamp}.mp3" # # mp3_path=os.path.join(user_directory, file_name) # 接收文件流转成mp3 并存储到cache # if process_audio_file(file_stream, mp3_path): if process_zip_audio_file(file_stream, mp3_path): # 如果转换成功,则插入数据库 # return insert_voice_data(uid,mp3_path,voice_feature_id,command_type,record_time) # TODO 后续替换为本地声纹识别算法后,map3_path更换为特征值,只存储特征值到数据库,每次比对特征值即可-----TODO # if insert_voice_data(uid,mp3_path,voice_feature_id,command_type,record_time): # 调用第三方声纹识别 API res1 = req_url( api_name='createFeature', APPId=APPId, APIKey=APIKey, APISecret=APISecret, file_path=mp3_path, featureId=voice_feature_id, groupId=uid ) print(f"Audio processing complete. API response: {res1}") yy = datetime.today() end_timestamp = datetime.strftime(yy, '%Y-%m-%d %H:%M:%S') # return ( # f"音频文件缓存在 {mp3_path} 并且传参到 req_url().返回内容为{res}.请求时间为:---{timestamp11}.返回时间为:---{end_timestamp}") # return f"Audio processed and sent to req_url(). API response: {res}" # res['status'] = tempResult.get('header').get('message') # res["msg"] = str(base64.b64decode(fb), 'UTF-8') # res['response_time'] =datetime.strftime(datetime.today(), '%Y-%m-%d %H:%M:%S') res = { "code": 200, "status": res1['status'], "msg": res1["msg"], "response_time": res1['response_time'], } return res # else: # res = { # "code": 400, # "status": "Fail", # "msg": "Failed to insert voice data into database.", # # } return res else: res = { "code": 400, "status": "Fail", "msg": "Failed to process the audio file.", } print("Failed to process the audio file.") return res