222 lines
9.9 KiB
Python
222 lines
9.9 KiB
Python
from datetime import datetime, timedelta
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from flask_login import UserMixin
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from app import db
|
|
import random
|
|
import string
|
|
|
|
class User(UserMixin, db.Model):
|
|
__tablename__ = 'users'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
email = db.Column(db.String(255), unique=True, nullable=False, index=True)
|
|
password_hash = db.Column(db.String(255), nullable=False)
|
|
name = db.Column(db.String(100), nullable=False)
|
|
age = db.Column(db.SmallInteger, nullable=False)
|
|
gender = db.Column(db.SmallInteger, nullable=False, comment='0-男, 1-女')
|
|
parent_contact = db.Column(db.String(255), nullable=True, comment='家长联系方式')
|
|
is_verified = db.Column(db.Boolean, default=False, comment='邮箱是否验证')
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow, index=True)
|
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
|
|
|
# 关联关系
|
|
voice_samples = db.relationship('VoiceSample', backref='user', lazy='dynamic', cascade='all, delete-orphan')
|
|
user_sessions = db.relationship('UserSession', backref='user', lazy='dynamic', cascade='all, delete-orphan')
|
|
user_progress = db.relationship('UserProgress', backref='user', lazy='dynamic', cascade='all, delete-orphan')
|
|
|
|
def set_password(self, password):
|
|
"""设置密码"""
|
|
self.password_hash = generate_password_hash(password)
|
|
|
|
def check_password(self, password):
|
|
"""验证密码"""
|
|
return check_password_hash(self.password_hash, password)
|
|
|
|
def get_latest_voice_sample(self):
|
|
"""获取最新的语音样本"""
|
|
return self.voice_samples.order_by(VoiceSample.upload_time.desc()).first()
|
|
|
|
def has_voice_clone_model(self):
|
|
"""检查是否有可用的声音克隆模型"""
|
|
latest_sample = self.get_latest_voice_sample()
|
|
return latest_sample and latest_sample.clone_model_status == 'ready'
|
|
|
|
def __repr__(self):
|
|
return f'<User {self.email}>'
|
|
|
|
class EmailVerification(db.Model):
|
|
__tablename__ = 'email_verifications'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
email = db.Column(db.String(255), nullable=False, index=True)
|
|
verification_code = db.Column(db.String(6), nullable=False, comment='6位数字验证码')
|
|
expires_at = db.Column(db.DateTime, nullable=False, index=True)
|
|
is_used = db.Column(db.Boolean, default=False, comment='是否已使用')
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
|
|
@classmethod
|
|
def generate_code(cls, email, expire_minutes=5):
|
|
"""生成验证码"""
|
|
# 清理过期的验证码
|
|
cls.query.filter(
|
|
cls.email == email,
|
|
cls.expires_at < datetime.utcnow()
|
|
).delete()
|
|
|
|
# 生成6位数字验证码
|
|
code = ''.join(random.choices(string.digits, k=6))
|
|
expires_at = datetime.utcnow() + timedelta(minutes=expire_minutes)
|
|
|
|
verification = cls(
|
|
email=email,
|
|
verification_code=code,
|
|
expires_at=expires_at
|
|
)
|
|
|
|
db.session.add(verification)
|
|
db.session.commit()
|
|
|
|
return code
|
|
|
|
@classmethod
|
|
def verify_code(cls, email, code):
|
|
"""验证验证码"""
|
|
verification = cls.query.filter(
|
|
cls.email == email,
|
|
cls.verification_code == code,
|
|
cls.expires_at > datetime.utcnow(),
|
|
cls.is_used == False
|
|
).first()
|
|
|
|
if verification:
|
|
verification.is_used = True
|
|
db.session.commit()
|
|
return True
|
|
return False
|
|
|
|
def __repr__(self):
|
|
return f'<EmailVerification {self.email}>'
|
|
|
|
class VoiceSample(db.Model):
|
|
__tablename__ = 'voice_samples'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
|
|
original_audio_url = db.Column(db.String(500), nullable=False, comment='原始语音文件URL')
|
|
upload_time = db.Column(db.DateTime, default=datetime.utcnow)
|
|
file_size = db.Column(db.Integer, nullable=True, comment='文件大小(字节)')
|
|
duration = db.Column(db.Numeric(5,2), nullable=True, comment='音频时长(秒)')
|
|
recognized_text = db.Column(db.Text, nullable=True, comment='语音识别文本')
|
|
clone_model_status = db.Column(db.String(20), default='pending', comment='克隆模型状态: pending/training/ready/failed')
|
|
clone_model_path = db.Column(db.String(500), nullable=True, comment='克隆模型文件路径')
|
|
clone_quality_score = db.Column(db.Numeric(3,2), nullable=True, comment='克隆质量评分(0-10)')
|
|
|
|
def __repr__(self):
|
|
return f'<VoiceSample {self.user_id}:{self.id}>'
|
|
|
|
class ScenarioCategory(db.Model):
|
|
__tablename__ = 'scenario_categories'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(100), nullable=False, comment='分类名称')
|
|
description = db.Column(db.Text, nullable=True, comment='分类描述')
|
|
icon = db.Column(db.String(50), default='fas fa-folder', comment='图标类名')
|
|
color = db.Column(db.String(20), default='primary', comment='主题色')
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
|
|
# 关联关系
|
|
scenarios = db.relationship('Scenario', backref='category', lazy='dynamic')
|
|
|
|
def __repr__(self):
|
|
return f'<ScenarioCategory {self.name}>'
|
|
|
|
class Scenario(db.Model):
|
|
__tablename__ = 'scenarios'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
category_id = db.Column(db.Integer, db.ForeignKey('scenario_categories.id'), nullable=False)
|
|
title = db.Column(db.String(255), nullable=False, comment='场景标题')
|
|
description = db.Column(db.Text, nullable=True, comment='场景描述')
|
|
icon = db.Column(db.String(50), default='fas fa-comments', comment='场景图标')
|
|
min_questions = db.Column(db.Integer, default=5, comment='最少问题数')
|
|
max_questions = db.Column(db.Integer, default=15, comment='最多问题数')
|
|
difficulty_level = db.Column(db.SmallInteger, default=1, comment='难度等级1-5')
|
|
is_active = db.Column(db.Boolean, default=True, comment='是否激活')
|
|
play_count = db.Column(db.Integer, default=0, comment='游玩次数')
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
|
|
# 关联关系
|
|
questions = db.relationship('ScenarioQuestion', backref='scenario', lazy='dynamic')
|
|
user_sessions = db.relationship('UserSession', backref='scenario', lazy='dynamic')
|
|
user_progress = db.relationship('UserProgress', backref='scenario', lazy='dynamic')
|
|
|
|
def get_difficulty_badge(self):
|
|
"""获取难度徽章样式"""
|
|
if self.difficulty_level == 1:
|
|
return 'success'
|
|
elif self.difficulty_level == 2:
|
|
return 'warning'
|
|
elif self.difficulty_level == 3:
|
|
return 'orange'
|
|
elif self.difficulty_level == 4:
|
|
return 'danger'
|
|
else:
|
|
return 'dark'
|
|
|
|
def get_difficulty_text(self):
|
|
"""获取难度文本"""
|
|
levels = {1: '简单', 2: '中等', 3: '有趣', 4: '挑战', 5: '专家'}
|
|
return levels.get(self.difficulty_level, '未知')
|
|
|
|
def __repr__(self):
|
|
return f'<Scenario {self.title}>'
|
|
|
|
class ScenarioQuestion(db.Model):
|
|
__tablename__ = 'scenario_questions'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
scenario_id = db.Column(db.Integer, db.ForeignKey('scenarios.id'), nullable=False)
|
|
question_order = db.Column(db.Integer, nullable=False, comment='问题顺序')
|
|
question_text = db.Column(db.Text, nullable=False, comment='问题文本')
|
|
expected_response_type = db.Column(db.String(50), default='open', comment='期望回答类型')
|
|
is_required = db.Column(db.Boolean, default=True, comment='是否必答')
|
|
|
|
def __repr__(self):
|
|
return f'<ScenarioQuestion {self.scenario_id}:{self.question_order}>'
|
|
|
|
class UserSession(db.Model):
|
|
__tablename__ = 'user_sessions'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
|
|
scenario_id = db.Column(db.Integer, db.ForeignKey('scenarios.id'), nullable=False)
|
|
start_time = db.Column(db.DateTime, default=datetime.utcnow)
|
|
end_time = db.Column(db.DateTime, nullable=True)
|
|
duration = db.Column(db.Integer, nullable=True, comment='学习时长(秒)')
|
|
completion_rate = db.Column(db.Numeric(5,2), default=0.00, comment='完成度百分比')
|
|
total_questions = db.Column(db.Integer, default=0, comment='总问题数')
|
|
answered_questions = db.Column(db.Integer, default=0, comment='已回答问题数')
|
|
status = db.Column(db.Enum('ongoing', 'completed', 'interrupted', name='session_status'), default='ongoing')
|
|
|
|
def __repr__(self):
|
|
return f'<UserSession {self.user_id}:{self.scenario_id}>'
|
|
|
|
class UserProgress(db.Model):
|
|
__tablename__ = 'user_progress'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
|
|
scenario_id = db.Column(db.Integer, db.ForeignKey('scenarios.id'), nullable=False)
|
|
completion_count = db.Column(db.Integer, default=0, comment='完成次数')
|
|
best_score = db.Column(db.Numeric(4,2), nullable=True, comment='最佳得分')
|
|
average_score = db.Column(db.Numeric(4,2), nullable=True, comment='平均得分')
|
|
total_duration = db.Column(db.Integer, default=0, comment='总学习时长(秒)')
|
|
last_completed_at = db.Column(db.DateTime, nullable=True, comment='最后完成时间')
|
|
first_completed_at = db.Column(db.DateTime, nullable=True, comment='首次完成时间')
|
|
|
|
__table_args__ = (db.UniqueConstraint('user_id', 'scenario_id', name='uk_user_scenario'),)
|
|
|
|
def __repr__(self):
|
|
return f'<UserProgress {self.user_id}:{self.scenario_id}>'
|