Happy_language/app/routes/voice_clone.py
superlishunqin 403fdeafa4 debug1
2025-09-22 06:39:34 +08:00

231 lines
8.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
语音克隆功能路由 - 专门处理用户声音克隆
"""
import os
import tempfile
import uuid
import re
import subprocess
import librosa
import soundfile as sf
from flask import Blueprint, request, jsonify, render_template, current_app
from flask_login import login_required, current_user
from app.services.cosyvoice_service import cosyvoice_service
from app.models import VoiceSample, db
from werkzeug.utils import secure_filename
import logging
logger = logging.getLogger(__name__)
voice_clone_bp = Blueprint('voice_clone', __name__)
def convert_audio_format(input_path, output_path, target_sr=16000):
"""转换音频格式为标准WAV"""
try:
# 使用librosa读取音频支持多种格式
audio, sr = librosa.load(input_path, sr=target_sr, mono=True)
# 保存为标准WAV格式
sf.write(output_path, audio, target_sr, format='WAV', subtype='PCM_16')
logger.info(f"音频格式转换成功: {input_path} -> {output_path}")
return True
except Exception as e:
logger.error(f"音频格式转换失败: {str(e)}")
# 备用方案使用ffmpeg
try:
cmd = [
'ffmpeg', '-i', input_path,
'-ar', '16000', # 采样率
'-ac', '1', # 单声道
'-sample_fmt', 's16', # 16位
'-y', output_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"使用ffmpeg转换成功: {input_path} -> {output_path}")
return True
else:
logger.error(f"ffmpeg转换失败: {result.stderr}")
return False
except Exception as fe:
logger.error(f"ffmpeg备用方案也失败: {str(fe)}")
return False
@voice_clone_bp.route('/voice-clone')
@login_required
def voice_clone_page():
"""语音克隆专门页面"""
user_voice_sample = current_user.get_latest_voice_sample()
return render_template('voice_clone/index.html', voice_sample=user_voice_sample)
@voice_clone_bp.route('/api/voice-clone/upload', methods=['POST'])
@login_required
def upload_voice_sample():
"""上传用户语音样本进行克隆"""
try:
if 'audio' not in request.files:
return jsonify({
"success": False,
"message": "请选择音频文件"
})
file = request.files['audio']
if file.filename == '':
return jsonify({
"success": False,
"message": "请选择音频文件"
})
# 生成唯一文件名
unique_id = str(uuid.uuid4())[:8]
filename = f"voice_sample_{current_user.id}_{unique_id}.wav"
# 保存到临时目录
temp_dir = tempfile.gettempdir()
original_path = os.path.join(temp_dir, f"original_{filename}")
file.save(original_path)
# 转换为标准WAV格式
file_path = os.path.join(temp_dir, filename)
if convert_audio_format(original_path, file_path):
# 转换成功,删除原始文件
try:
os.remove(original_path)
except:
pass
else:
# 转换失败,使用原始文件
logger.warning("音频格式转换失败,使用原始文件")
os.rename(original_path, file_path)
# 识别语音内容
recognized_text = cosyvoice_service.recognize_audio(file_path)
# 保存到数据库
voice_sample = VoiceSample(
user_id=current_user.id,
original_audio_url=file_path,
file_size=os.path.getsize(file_path),
recognized_text=recognized_text,
clone_model_status='ready' # 简化流程直接标记为ready
)
db.session.add(voice_sample)
db.session.commit()
logger.info(f"用户 {current_user.id} 上传语音样本成功: {filename}")
return jsonify({
"success": True,
"message": "语音样本上传成功AI已经学会你的声音了",
"sample_id": voice_sample.id,
"recognized_text": recognized_text,
"file_info": {
"size": voice_sample.file_size,
"duration": float(voice_sample.duration) if voice_sample.duration else None
}
})
except Exception as e:
logger.error(f"语音样本上传失败: {str(e)}")
return jsonify({
"success": False,
"message": f"上传失败: {str(e)}"
})
@voice_clone_bp.route('/api/voice-clone/generate', methods=['POST'])
@login_required
def generate_cloned_speech():
"""使用用户的声音克隆生成语音"""
try:
data = request.get_json()
text = data.get('text', '')
if not text:
return jsonify({
"success": False,
"message": "请输入要合成的文本"
})
# 获取用户最新的语音样本
voice_sample = current_user.get_latest_voice_sample()
if not voice_sample:
return jsonify({
"success": False,
"message": "请先录制语音样本"
})
# 详细记录调用参数进行调试
logger.info(f"=== 语音克隆调用参数对比 ===")
logger.info(f"text: {text}")
logger.info(f"reference_audio_path: {voice_sample.original_audio_url}")
logger.info(f"reference_text: {voice_sample.recognized_text}")
logger.info(f"seed: 42")
logger.info(f"音频文件是否存在: {os.path.exists(voice_sample.original_audio_url)}")
if os.path.exists(voice_sample.original_audio_url):
logger.info(f"音频文件大小: {os.path.getsize(voice_sample.original_audio_url)} bytes")
# 使用CosyVoice进行语音克隆
stream_audio, full_audio = cosyvoice_service.generate_speech_with_voice_cloning(
text=text,
reference_audio_path=voice_sample.original_audio_url,
reference_text=voice_sample.recognized_text or "",
seed=42
)
if full_audio:
return jsonify({
"success": True,
"message": "用你的声音说话成功!",
"audio_url": full_audio,
"original_text": text
})
else:
return jsonify({
"success": False,
"message": "语音生成失败,请重试"
})
except Exception as e:
logger.error(f"语音克隆生成失败: {str(e)}")
return jsonify({
"success": False,
"message": f"生成失败: {str(e)}"
})
@voice_clone_bp.route('/api/voice-clone/status', methods=['GET'])
@login_required
def get_voice_clone_status():
"""获取用户语音克隆状态"""
try:
voice_sample = current_user.get_latest_voice_sample()
if not voice_sample:
return jsonify({
"success": True,
"has_sample": False,
"status": "no_sample",
"message": "还没有录制语音样本哦!快来录制你的专属声音吧"
})
return jsonify({
"success": True,
"has_sample": True,
"status": voice_sample.clone_model_status,
"sample_id": voice_sample.id,
"recognized_text": voice_sample.recognized_text,
"upload_time": voice_sample.upload_time.strftime("%Y-%m-%d %H:%M"),
"message": "你的专属声音已准备好!"
})
except Exception as e:
logger.error(f"获取语音克隆状态失败: {str(e)}")
return jsonify({
"success": False,
"message": f"获取状态失败: {str(e)}"
})