123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- import zipfile
- import tempfile
- import shutil
- import random
- import os
- import jwt
- import traceback
- from django.http import HttpResponse,JsonResponse
- SECRET_KEY='#tdfnrcn1s610h4*csa2-p+=lfqz-ol+=uo$+n2sa'
- #构建封装用户信息字典
- #2025-5-27
- #Author Feng
- def _build_user_info(user):
- return {
- "user_name": user.user_name,
- "coin": user.coin,
- "isRegUser": bool(getattr(user, 'isRegUser', 0)),
- "mobile": user.mobile,
- "email": user.email,
- "level": user.level,
- "uuid": user.uuid,
- "registration_time": user.registration_time.isoformat() if hasattr(user.registration_time, 'isoformat') else str(user.registration_time)
- }
- #生成指定范围内的随机数
- #2025-5-27
- #Author Feng
- def generate_random_number(start, end):
- return random.randint(start, end)
- # 统一的错误响应处理
- def handle_error(error_message):
- # 打印错误堆栈信息到日志或控制台
- traceback.print_exc()
- return JsonResponse({"status": "error", "message": error_message}, status=500)
- def model_to_dict_without_id(model_instance):
- """将 Django 模型实例转换为字典,并移除 'id' 字段"""
- if model_instance:
- model_dict = model_instance.__dict__.copy()
- model_dict.pop("id", None) # 删除 id 字段
- model_dict.pop("_state", None) # 删除 Django 内部字段
- return model_dict
- return {}
- # # 生成access_token
- # def generate_access_token(user_id,expiration_time):
- #
- # payload = {
- # 'user_id': user_id,
- # 'exp': expiration_time
- # }
- # return jwt.encode(payload, SECRET_KEY, algorithm='HS256')
- #
- #
- # # 生成login_token
- # def generate_login_token(user_id,expiration_time):
- # payload = {
- # 'user_id': user_id,
- # 'exp': expiration_time
- # }
- #
- # return jwt.encode(payload, SECRET_KEY, algorithm='HS256')
- #解压zip文件中的wav音频到指定目录
- # 2025/4/01
- # author Feng
- def extract_wav_from_zip(zip_file, target_directory):
- temp_dir = tempfile.mkdtemp()
- try:
- # 解压zip到临时目录
- with zipfile.ZipFile(zip_file, 'r') as zip_ref:
- zip_ref.extractall(temp_dir)
- # Find wav file in the extracted directory
- wav_files = []
- for root, _, files in os.walk(temp_dir):
- for file in files:
- if file.lower().endswith('.wav'):
- wav_files.append(os.path.join(root, file))
- if not wav_files:
- return None
- # Use the first wav file found
- wav_file_path = wav_files[0]
- # Copy to target directory with a specific name
- target_path = os.path.join(target_directory, 'voice.wav')
- shutil.copy2(wav_file_path, target_path)
- return target_path
- finally:
- # Clean up temp directory
- shutil.rmtree(temp_dir)
- #验证访问令牌的有效性
- def validate_access_token(access_token):
- # 验证访问令牌的有效性
- try:
- decoded_token = jwt.decode(access_token, SECRET_KEY, algorithms=['HS256'])
- return decoded_token
- except jwt.ExpiredSignatureError:
- return JsonResponse({
- "status": "error",
- "code": 601,
- "message": "access_token expired, please refresh"
- })
- except jwt.InvalidTokenError:
- return JsonResponse({
- "status": "error",
- "code": 400,
- "message": "invalid access_token"
- })
|