231 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			231 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""
 | 
						||
语音克隆功能路由 - 专门处理用户声音克隆
 | 
						||
"""
 | 
						||
import os
 | 
						||
import tempfile
 | 
						||
import uuid
 | 
						||
import re
 | 
						||
import subprocess
 | 
						||
import librosa
 | 
						||
import soundfile as sf
 | 
						||
from flask import Blueprint, request, jsonify, render_template, current_app
 | 
						||
from flask_login import login_required, current_user
 | 
						||
from app.services.cosyvoice_service import cosyvoice_service
 | 
						||
from app.models import VoiceSample, db
 | 
						||
from werkzeug.utils import secure_filename
 | 
						||
import logging
 | 
						||
 | 
						||
logger = logging.getLogger(__name__)
 | 
						||
voice_clone_bp = Blueprint('voice_clone', __name__)
 | 
						||
 | 
						||
def convert_audio_format(input_path, output_path, target_sr=16000):
 | 
						||
    """转换音频格式为标准WAV"""
 | 
						||
    try:
 | 
						||
        # 使用librosa读取音频(支持多种格式)
 | 
						||
        audio, sr = librosa.load(input_path, sr=target_sr, mono=True)
 | 
						||
        
 | 
						||
        # 保存为标准WAV格式
 | 
						||
        sf.write(output_path, audio, target_sr, format='WAV', subtype='PCM_16')
 | 
						||
        
 | 
						||
        logger.info(f"音频格式转换成功: {input_path} -> {output_path}")
 | 
						||
        return True
 | 
						||
    except Exception as e:
 | 
						||
        logger.error(f"音频格式转换失败: {str(e)}")
 | 
						||
        
 | 
						||
        # 备用方案:使用ffmpeg
 | 
						||
        try:
 | 
						||
            cmd = [
 | 
						||
                'ffmpeg', '-i', input_path, 
 | 
						||
                '-ar', '16000',  # 采样率
 | 
						||
                '-ac', '1',      # 单声道
 | 
						||
                '-sample_fmt', 's16',  # 16位
 | 
						||
                '-y', output_path
 | 
						||
            ]
 | 
						||
            result = subprocess.run(cmd, capture_output=True, text=True)
 | 
						||
            
 | 
						||
            if result.returncode == 0:
 | 
						||
                logger.info(f"使用ffmpeg转换成功: {input_path} -> {output_path}")
 | 
						||
                return True
 | 
						||
            else:
 | 
						||
                logger.error(f"ffmpeg转换失败: {result.stderr}")
 | 
						||
                return False
 | 
						||
        except Exception as fe:
 | 
						||
            logger.error(f"ffmpeg备用方案也失败: {str(fe)}")
 | 
						||
            return False
 | 
						||
 | 
						||
 | 
						||
 | 
						||
@voice_clone_bp.route('/voice-clone')
 | 
						||
@login_required
 | 
						||
def voice_clone_page():
 | 
						||
    """语音克隆专门页面"""
 | 
						||
    user_voice_sample = current_user.get_latest_voice_sample()
 | 
						||
    return render_template('voice_clone/index.html', voice_sample=user_voice_sample)
 | 
						||
 | 
						||
@voice_clone_bp.route('/api/voice-clone/upload', methods=['POST'])
 | 
						||
@login_required
 | 
						||
def upload_voice_sample():
 | 
						||
    """上传用户语音样本进行克隆"""
 | 
						||
    try:
 | 
						||
        if 'audio' not in request.files:
 | 
						||
            return jsonify({
 | 
						||
                "success": False,
 | 
						||
                "message": "请选择音频文件"
 | 
						||
            })
 | 
						||
        
 | 
						||
        file = request.files['audio']
 | 
						||
        if file.filename == '':
 | 
						||
            return jsonify({
 | 
						||
                "success": False,
 | 
						||
                "message": "请选择音频文件"
 | 
						||
            })
 | 
						||
        
 | 
						||
        # 生成唯一文件名
 | 
						||
        unique_id = str(uuid.uuid4())[:8]
 | 
						||
        filename = f"voice_sample_{current_user.id}_{unique_id}.wav"
 | 
						||
        
 | 
						||
        # 保存到临时目录
 | 
						||
        temp_dir = tempfile.gettempdir()
 | 
						||
        original_path = os.path.join(temp_dir, f"original_{filename}")
 | 
						||
        file.save(original_path)
 | 
						||
        
 | 
						||
        # 转换为标准WAV格式
 | 
						||
        file_path = os.path.join(temp_dir, filename)
 | 
						||
        if convert_audio_format(original_path, file_path):
 | 
						||
            # 转换成功,删除原始文件
 | 
						||
            try:
 | 
						||
                os.remove(original_path)
 | 
						||
            except:
 | 
						||
                pass
 | 
						||
        else:
 | 
						||
            # 转换失败,使用原始文件
 | 
						||
            logger.warning("音频格式转换失败,使用原始文件")
 | 
						||
            os.rename(original_path, file_path)
 | 
						||
        
 | 
						||
        # 识别语音内容
 | 
						||
        recognized_text = cosyvoice_service.recognize_audio(file_path)
 | 
						||
        
 | 
						||
        # 保存到数据库
 | 
						||
        voice_sample = VoiceSample(
 | 
						||
            user_id=current_user.id,
 | 
						||
            original_audio_url=file_path,
 | 
						||
            file_size=os.path.getsize(file_path),
 | 
						||
            recognized_text=recognized_text,
 | 
						||
            clone_model_status='ready'  # 简化流程,直接标记为ready
 | 
						||
        )
 | 
						||
        
 | 
						||
        db.session.add(voice_sample)
 | 
						||
        db.session.commit()
 | 
						||
        
 | 
						||
        logger.info(f"用户 {current_user.id} 上传语音样本成功: {filename}")
 | 
						||
        
 | 
						||
        return jsonify({
 | 
						||
            "success": True,
 | 
						||
            "message": "语音样本上传成功!AI已经学会你的声音了",
 | 
						||
            "sample_id": voice_sample.id,
 | 
						||
            "recognized_text": recognized_text,
 | 
						||
            "file_info": {
 | 
						||
                "size": voice_sample.file_size,
 | 
						||
                "duration": float(voice_sample.duration) if voice_sample.duration else None
 | 
						||
            }
 | 
						||
        })
 | 
						||
        
 | 
						||
    except Exception as e:
 | 
						||
        logger.error(f"语音样本上传失败: {str(e)}")
 | 
						||
        return jsonify({
 | 
						||
            "success": False,
 | 
						||
            "message": f"上传失败: {str(e)}"
 | 
						||
        })
 | 
						||
 | 
						||
@voice_clone_bp.route('/api/voice-clone/generate', methods=['POST'])
 | 
						||
@login_required
 | 
						||
def generate_cloned_speech():
 | 
						||
    """使用用户的声音克隆生成语音"""
 | 
						||
    try:
 | 
						||
        data = request.get_json()
 | 
						||
        text = data.get('text', '')
 | 
						||
        
 | 
						||
        if not text:
 | 
						||
            return jsonify({
 | 
						||
                "success": False,
 | 
						||
                "message": "请输入要合成的文本"
 | 
						||
            })
 | 
						||
        
 | 
						||
        # 获取用户最新的语音样本
 | 
						||
        voice_sample = current_user.get_latest_voice_sample()
 | 
						||
        if not voice_sample:
 | 
						||
            return jsonify({
 | 
						||
                "success": False,
 | 
						||
                "message": "请先录制语音样本"
 | 
						||
            })
 | 
						||
        
 | 
						||
        # 详细记录调用参数进行调试
 | 
						||
        logger.info(f"=== 语音克隆调用参数对比 ===")
 | 
						||
        logger.info(f"text: {text}")
 | 
						||
        logger.info(f"reference_audio_path: {voice_sample.original_audio_url}")
 | 
						||
        logger.info(f"reference_text: {voice_sample.recognized_text}")
 | 
						||
        logger.info(f"seed: 42")
 | 
						||
        logger.info(f"音频文件是否存在: {os.path.exists(voice_sample.original_audio_url)}")
 | 
						||
        if os.path.exists(voice_sample.original_audio_url):
 | 
						||
            logger.info(f"音频文件大小: {os.path.getsize(voice_sample.original_audio_url)} bytes")
 | 
						||
        
 | 
						||
        # 使用CosyVoice进行语音克隆
 | 
						||
        stream_audio, full_audio = cosyvoice_service.generate_speech_with_voice_cloning(
 | 
						||
            text=text,
 | 
						||
            reference_audio_path=voice_sample.original_audio_url,
 | 
						||
            reference_text=voice_sample.recognized_text or "",
 | 
						||
            seed=42
 | 
						||
        )
 | 
						||
        
 | 
						||
        if full_audio:
 | 
						||
            return jsonify({
 | 
						||
                "success": True,
 | 
						||
                "message": "用你的声音说话成功!",
 | 
						||
                "audio_url": full_audio,
 | 
						||
                "original_text": text
 | 
						||
            })
 | 
						||
        else:
 | 
						||
            return jsonify({
 | 
						||
                "success": False,
 | 
						||
                "message": "语音生成失败,请重试"
 | 
						||
            })
 | 
						||
            
 | 
						||
    except Exception as e:
 | 
						||
        logger.error(f"语音克隆生成失败: {str(e)}")
 | 
						||
        return jsonify({
 | 
						||
            "success": False,
 | 
						||
            "message": f"生成失败: {str(e)}"
 | 
						||
        })
 | 
						||
 | 
						||
@voice_clone_bp.route('/api/voice-clone/status', methods=['GET'])
 | 
						||
@login_required
 | 
						||
def get_voice_clone_status():
 | 
						||
    """获取用户语音克隆状态"""
 | 
						||
    try:
 | 
						||
        voice_sample = current_user.get_latest_voice_sample()
 | 
						||
        
 | 
						||
        if not voice_sample:
 | 
						||
            return jsonify({
 | 
						||
                "success": True,
 | 
						||
                "has_sample": False,
 | 
						||
                "status": "no_sample",
 | 
						||
                "message": "还没有录制语音样本哦!快来录制你的专属声音吧"
 | 
						||
            })
 | 
						||
        
 | 
						||
        return jsonify({
 | 
						||
            "success": True,
 | 
						||
            "has_sample": True,
 | 
						||
            "status": voice_sample.clone_model_status,
 | 
						||
            "sample_id": voice_sample.id,
 | 
						||
            "recognized_text": voice_sample.recognized_text,
 | 
						||
            "upload_time": voice_sample.upload_time.strftime("%Y-%m-%d %H:%M"),
 | 
						||
            "message": "你的专属声音已准备好!"
 | 
						||
        })
 | 
						||
        
 | 
						||
    except Exception as e:
 | 
						||
        logger.error(f"获取语音克隆状态失败: {str(e)}")
 | 
						||
        return jsonify({
 | 
						||
            "success": False,
 | 
						||
            "message": f"获取状态失败: {str(e)}"
 | 
						||
        })
 |