2025-09-22 06:06:19 +08:00

222 lines
9.9 KiB
Python

from datetime import datetime, timedelta
from flask_sqlalchemy import SQLAlchemy
from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash
from app import db
import random
import string
class User(UserMixin, db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(255), unique=True, nullable=False, index=True)
password_hash = db.Column(db.String(255), nullable=False)
name = db.Column(db.String(100), nullable=False)
age = db.Column(db.SmallInteger, nullable=False)
gender = db.Column(db.SmallInteger, nullable=False, comment='0-男, 1-女')
parent_contact = db.Column(db.String(255), nullable=True, comment='家长联系方式')
is_verified = db.Column(db.Boolean, default=False, comment='邮箱是否验证')
created_at = db.Column(db.DateTime, default=datetime.utcnow, index=True)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 关联关系
voice_samples = db.relationship('VoiceSample', backref='user', lazy='dynamic', cascade='all, delete-orphan')
user_sessions = db.relationship('UserSession', backref='user', lazy='dynamic', cascade='all, delete-orphan')
user_progress = db.relationship('UserProgress', backref='user', lazy='dynamic', cascade='all, delete-orphan')
def set_password(self, password):
"""设置密码"""
self.password_hash = generate_password_hash(password)
def check_password(self, password):
"""验证密码"""
return check_password_hash(self.password_hash, password)
def get_latest_voice_sample(self):
"""获取最新的语音样本"""
return self.voice_samples.order_by(VoiceSample.upload_time.desc()).first()
def has_voice_clone_model(self):
"""检查是否有可用的声音克隆模型"""
latest_sample = self.get_latest_voice_sample()
return latest_sample and latest_sample.clone_model_status == 'ready'
def __repr__(self):
return f'<User {self.email}>'
class EmailVerification(db.Model):
__tablename__ = 'email_verifications'
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(255), nullable=False, index=True)
verification_code = db.Column(db.String(6), nullable=False, comment='6位数字验证码')
expires_at = db.Column(db.DateTime, nullable=False, index=True)
is_used = db.Column(db.Boolean, default=False, comment='是否已使用')
created_at = db.Column(db.DateTime, default=datetime.utcnow)
@classmethod
def generate_code(cls, email, expire_minutes=5):
"""生成验证码"""
# 清理过期的验证码
cls.query.filter(
cls.email == email,
cls.expires_at < datetime.utcnow()
).delete()
# 生成6位数字验证码
code = ''.join(random.choices(string.digits, k=6))
expires_at = datetime.utcnow() + timedelta(minutes=expire_minutes)
verification = cls(
email=email,
verification_code=code,
expires_at=expires_at
)
db.session.add(verification)
db.session.commit()
return code
@classmethod
def verify_code(cls, email, code):
"""验证验证码"""
verification = cls.query.filter(
cls.email == email,
cls.verification_code == code,
cls.expires_at > datetime.utcnow(),
cls.is_used == False
).first()
if verification:
verification.is_used = True
db.session.commit()
return True
return False
def __repr__(self):
return f'<EmailVerification {self.email}>'
class VoiceSample(db.Model):
__tablename__ = 'voice_samples'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
original_audio_url = db.Column(db.String(500), nullable=False, comment='原始语音文件URL')
upload_time = db.Column(db.DateTime, default=datetime.utcnow)
file_size = db.Column(db.Integer, nullable=True, comment='文件大小(字节)')
duration = db.Column(db.Numeric(5,2), nullable=True, comment='音频时长(秒)')
recognized_text = db.Column(db.Text, nullable=True, comment='语音识别文本')
clone_model_status = db.Column(db.String(20), default='pending', comment='克隆模型状态: pending/training/ready/failed')
clone_model_path = db.Column(db.String(500), nullable=True, comment='克隆模型文件路径')
clone_quality_score = db.Column(db.Numeric(3,2), nullable=True, comment='克隆质量评分(0-10)')
def __repr__(self):
return f'<VoiceSample {self.user_id}:{self.id}>'
class ScenarioCategory(db.Model):
__tablename__ = 'scenario_categories'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False, comment='分类名称')
description = db.Column(db.Text, nullable=True, comment='分类描述')
icon = db.Column(db.String(50), default='fas fa-folder', comment='图标类名')
color = db.Column(db.String(20), default='primary', comment='主题色')
created_at = db.Column(db.DateTime, default=datetime.utcnow)
# 关联关系
scenarios = db.relationship('Scenario', backref='category', lazy='dynamic')
def __repr__(self):
return f'<ScenarioCategory {self.name}>'
class Scenario(db.Model):
__tablename__ = 'scenarios'
id = db.Column(db.Integer, primary_key=True)
category_id = db.Column(db.Integer, db.ForeignKey('scenario_categories.id'), nullable=False)
title = db.Column(db.String(255), nullable=False, comment='场景标题')
description = db.Column(db.Text, nullable=True, comment='场景描述')
icon = db.Column(db.String(50), default='fas fa-comments', comment='场景图标')
min_questions = db.Column(db.Integer, default=5, comment='最少问题数')
max_questions = db.Column(db.Integer, default=15, comment='最多问题数')
difficulty_level = db.Column(db.SmallInteger, default=1, comment='难度等级1-5')
is_active = db.Column(db.Boolean, default=True, comment='是否激活')
play_count = db.Column(db.Integer, default=0, comment='游玩次数')
created_at = db.Column(db.DateTime, default=datetime.utcnow)
# 关联关系
questions = db.relationship('ScenarioQuestion', backref='scenario', lazy='dynamic')
user_sessions = db.relationship('UserSession', backref='scenario', lazy='dynamic')
user_progress = db.relationship('UserProgress', backref='scenario', lazy='dynamic')
def get_difficulty_badge(self):
"""获取难度徽章样式"""
if self.difficulty_level == 1:
return 'success'
elif self.difficulty_level == 2:
return 'warning'
elif self.difficulty_level == 3:
return 'orange'
elif self.difficulty_level == 4:
return 'danger'
else:
return 'dark'
def get_difficulty_text(self):
"""获取难度文本"""
levels = {1: '简单', 2: '中等', 3: '有趣', 4: '挑战', 5: '专家'}
return levels.get(self.difficulty_level, '未知')
def __repr__(self):
return f'<Scenario {self.title}>'
class ScenarioQuestion(db.Model):
__tablename__ = 'scenario_questions'
id = db.Column(db.Integer, primary_key=True)
scenario_id = db.Column(db.Integer, db.ForeignKey('scenarios.id'), nullable=False)
question_order = db.Column(db.Integer, nullable=False, comment='问题顺序')
question_text = db.Column(db.Text, nullable=False, comment='问题文本')
expected_response_type = db.Column(db.String(50), default='open', comment='期望回答类型')
is_required = db.Column(db.Boolean, default=True, comment='是否必答')
def __repr__(self):
return f'<ScenarioQuestion {self.scenario_id}:{self.question_order}>'
class UserSession(db.Model):
__tablename__ = 'user_sessions'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
scenario_id = db.Column(db.Integer, db.ForeignKey('scenarios.id'), nullable=False)
start_time = db.Column(db.DateTime, default=datetime.utcnow)
end_time = db.Column(db.DateTime, nullable=True)
duration = db.Column(db.Integer, nullable=True, comment='学习时长(秒)')
completion_rate = db.Column(db.Numeric(5,2), default=0.00, comment='完成度百分比')
total_questions = db.Column(db.Integer, default=0, comment='总问题数')
answered_questions = db.Column(db.Integer, default=0, comment='已回答问题数')
status = db.Column(db.Enum('ongoing', 'completed', 'interrupted', name='session_status'), default='ongoing')
def __repr__(self):
return f'<UserSession {self.user_id}:{self.scenario_id}>'
class UserProgress(db.Model):
__tablename__ = 'user_progress'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
scenario_id = db.Column(db.Integer, db.ForeignKey('scenarios.id'), nullable=False)
completion_count = db.Column(db.Integer, default=0, comment='完成次数')
best_score = db.Column(db.Numeric(4,2), nullable=True, comment='最佳得分')
average_score = db.Column(db.Numeric(4,2), nullable=True, comment='平均得分')
total_duration = db.Column(db.Integer, default=0, comment='总学习时长(秒)')
last_completed_at = db.Column(db.DateTime, nullable=True, comment='最后完成时间')
first_completed_at = db.Column(db.DateTime, nullable=True, comment='首次完成时间')
__table_args__ = (db.UniqueConstraint('user_id', 'scenario_id', name='uk_user_scenario'),)
def __repr__(self):
return f'<UserProgress {self.user_id}:{self.scenario_id}>'