123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253 |
- import shutil
- import time
- import zipfile
- from datetime import datetime
- from pydub import AudioSegment
- from pydub.exceptions import CouldntDecodeError
- import uuid
- import tempfile
- import os
- os.environ.setdefault("DJANGO_SETTINGS_MODULE", "aiDogProject.settings")
- import django
- django.setup()
- from aiDogApp.models import *
- from .generalRequest import req_url # Importing your generalRequest.py method
- # 转换上传的文件流为mp3 并存储在本地
- def process_audio_file(file_stream, output_mp3_path):
- try:
- # 使用临时目录创建临时 WAV 文件路径----old version
- # temp_dir = tempfile.gettempdir() # 使用系统临时目录
- # temp_wav_path = os.path.join(temp_dir, f"temp_{uuid.uuid4().hex}.wav")
- # 文件处理优化:使用内存处理(如io.BytesIO)替代文件 I / O,减少磁盘访问。---new version
- temp_wav_path = tempfile.mktemp(suffix='.wav')
- # 保存上传的文件流为临时 WAV 文件
- with open(temp_wav_path, 'wb') as temp_file:
- temp_file.write(file_stream.read())
- # 检查临时文件是否有效
- if not os.path.exists(temp_wav_path) or os.path.getsize(temp_wav_path) == 0:
- # print("Error: Temporary WAV file is empty or not created.")
- return False
- # 转换 WAV 到 MP3
- audio_segment = AudioSegment.from_wav(temp_wav_path)
- audio_segment.export(output_mp3_path, format="mp3")
- # 清理临时 WAV 文件
- os.remove(temp_wav_path)
- return True
- # return "Success: Audio file converted and saved."
- except Exception as e:
- print(f"Error processing audio file: {e}")
- return False
- # return f"Error: {str(e)}"
- # 将zip内的wav文件解压出来
- # 再转换上传的文件流为mp3 并存储在本地
- # 2025/4/1
- # author Feng
- def process_zip_audio_file(file_stream, output_mp3_path):
- try:
- # 创建临时目录用于解压文件
- temp_dir = tempfile.mkdtemp()
- zip_name = output_mp3_path[0:8]
- temp_zip_path = os.path.join(temp_dir, f"{zip_name}.zip")
- # 保存上传的压缩文件流
- with open(temp_zip_path, 'wb') as temp_file:
- temp_file.write(file_stream.read())
- # 检查临时压缩文件是否有效
- if not os.path.exists(temp_zip_path) or os.path.getsize(temp_zip_path) == 0:
- print("Error: Temporary ZIP file is empty or not created.")
- return False
- # return "Error: Temporary ZIP file is empty or not created."
- # 解压文件
- with zipfile.ZipFile(temp_zip_path, 'r') as zip_ref:
- zip_ref.extractall(temp_dir)
- # 查找voice.wav文件
- wav_file_path = os.path.join(temp_dir, "voice.wav")
- if not os.path.exists(wav_file_path):
- print("Error: voice.wav not found in the zip archive.")
- return False
- # return "Error: voice.wav not found in the zip archive."
- # 转换WAV到MP3
- audio_segment = AudioSegment.from_wav(wav_file_path)
- audio_segment.export(output_mp3_path, format="mp3")
- # 清理临时文件
- shutil.rmtree(temp_dir) # 递归删除临时目录及其内容
- return True
- except Exception as e:
- print(f"Error processing audio file: {e}")
- # 确保清理临时文件,即使发生错误
- if 'temp_dir' in locals() and os.path.exists(temp_dir):
- shutil.rmtree(temp_dir)
- return False
- # return f"Error processing audio file: {e}"
- # Function to insert audio information into MySQL database
- def insert_voice_data(uid,mp3_path, feature_id,command_type,record_time):
- try:
- # 读取 WAV 文件 为二进制数据
- # with open(mp3_path, 'rb') as audio_file:
- # wav_data = audio_file.read()
- wav_data=''
- # 插入到声纹信息表中
- # 口令类型(0:叫名字 1:坐下,2:站起,3:摇尾巴,4:走过来,5 抬左手,6:抬右手)
- if command_type==0:
- # 口令类型(0:叫名字
- # insert_query = """
- # INSERT INTO voice_feature_info (uid, voice_feature_id, voice_name_command,command_type)
- # VALUES (%s, %s, %s, %s)
- # """
- # # cursor.execute(insert_query, (uid, feature_id, wav_data, command_type))
- updated_count =VoiceFeatureInfo.objects.filter(uid=uid,command_type=command_type).update(voice_feature_id=feature_id, voice_name_command=wav_data,
- record_time=record_time)
- if updated_count > 0:
- return True
- else:
- return False
- elif command_type==1:
- # 口令类型(1:坐下
- updated_count =VoiceFeatureInfo.objects.filter(uid=uid,command_type=command_type).update(voice_feature_id=feature_id, voice_sit_command=wav_data,
- record_time=record_time)
- if updated_count > 0:
- return True
- else:
- return False
- elif command_type == 2:
- # 口令类型(2:站起
- updated_count =VoiceFeatureInfo.objects.filter(uid=uid,command_type=command_type).update(voice_feature_id=feature_id, voice_stand_command=wav_data,
- record_time=record_time)
- if updated_count > 0:
- return True
- else:
- return False
- elif command_type == 3:
- # 口令类型(3:摇尾巴
- updated_count =VoiceFeatureInfo.objects.filter(uid=uid,command_type=command_type).update(voice_feature_id=feature_id, voice_wagtail_command=wav_data,
- record_time=record_time)
- if updated_count > 0:
- return True
- else:
- return False
- else:
- # 口令类型(4:走过来
- updated_count =VoiceFeatureInfo.objects.filter(uid=uid,command_type=command_type).update(voice_feature_id=feature_id, voice_come_command=wav_data,
- record_time=record_time)
- if updated_count > 0:
- return True
- else:
- return False
- # conn.commit()
- print(f"Data inserted into database for UID: {uid}")
- # return True
- # 抛出异常
- except Exception as err:
- print(f"Error: {err}")
- return f"Error: {str(err)}"
- # return False
- # APPId = "527f831f"
- # APISecret = "YWE5MjI0MzA4NmM3MTNmNTNiMWJkYzE4"
- # APIKey = "be5c37f3db1569737934240a0e3ad02d"
- APPId = "ed8eb862"
- APISecret = "YzEyMDYyMzljMDViNWJlZDdlOWJhYjVi"
- APIKey = "b4c831160d8933221e95bda817547e99"
- # 接收声纹录音, 保存在本地, 并插入声纹特征到数据库, 然后调用科大接口识别 req_url()
- def process_voice_recording(uid, voice_feature_id, command_type, file_stream,mp3_path):
- # 定义本地缓存路径
- # cache_directory = "audio_cache"
- # os.makedirs(cache_directory, exist_ok=True)
- # 创建一个唯一的文件名
- timestamp=int(time.time())
- dd = datetime.today()
- timestamp11 = datetime.strftime(dd, '%Y-%m-%d %H:%M:%S')
- record_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- # # 按用户ID分目录
- # user_directory = os.path.join(cache_directory, f"user_{uid}")
- # # 确保目录存在
- # os.makedirs(user_directory, exist_ok=True)
- # # 文件命名包含必要的标识信息
- # file_name = f"voice_{uid}_{voice_feature_id}_{timestamp}.mp3"
- #
- # mp3_path=os.path.join(user_directory, file_name)
- # 接收文件流转成mp3 并存储到cache
- # if process_audio_file(file_stream, mp3_path):
- if process_zip_audio_file(file_stream, mp3_path):
- # 如果转换成功,则插入数据库
- # return insert_voice_data(uid,mp3_path,voice_feature_id,command_type,record_time)
- # TODO 后续替换为本地声纹识别算法后,map3_path更换为特征值,只存储特征值到数据库,每次比对特征值即可-----TODO
- # if insert_voice_data(uid,mp3_path,voice_feature_id,command_type,record_time):
- # 调用第三方声纹识别 API
- res1 = req_url(
- api_name='createFeature', APPId=APPId, APIKey=APIKey, APISecret=APISecret,
- file_path=mp3_path, featureId=voice_feature_id, groupId=uid
- )
- print(f"Audio processing complete. API response: {res1}")
- yy = datetime.today()
- end_timestamp = datetime.strftime(yy, '%Y-%m-%d %H:%M:%S')
- # return (
- # f"音频文件缓存在 {mp3_path} 并且传参到 req_url().返回内容为{res}.请求时间为:---{timestamp11}.返回时间为:---{end_timestamp}")
- # return f"Audio processed and sent to req_url(). API response: {res}"
- # res['status'] = tempResult.get('header').get('message')
- # res["msg"] = str(base64.b64decode(fb), 'UTF-8')
- # res['response_time'] =datetime.strftime(datetime.today(), '%Y-%m-%d %H:%M:%S')
- res = {
- "code": 200,
- "status": res1['status'],
- "msg": res1["msg"],
- "response_time": res1['response_time'],
- }
- return res
- # else:
- # res = {
- # "code": 400,
- # "status": "Fail",
- # "msg": "Failed to insert voice data into database.",
- #
- # }
- return res
- else:
- res = {
- "code": 400,
- "status": "Fail",
- "msg": "Failed to process the audio file.",
- }
- print("Failed to process the audio file.")
- return res
|