from flask import Blueprint, request, jsonify, session import random import string import re from datetime import datetime, timedelta from utils.db import get_db_connection, close_connection from utils.password import hash_password from utils.email_sender import send_verification_email from utils.password import hash_password, check_password # 创建蓝图 auth = Blueprint('auth', __name__) # 存储临时验证码 verification_codes = {} @auth.route('/register', methods=['POST']) def register(): """用户注册接口""" data = request.get_json() # 检查必要字段 if not all(k in data for k in ('email', 'password', 'name')): return jsonify({'success': False, 'message': '缺少必要的注册信息'}), 400 email = data['email'] password = data['password'] name = data['name'] gender = data.get('gender') birth_date = data.get('birth_date') # 验证邮箱格式 email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' if not re.match(email_pattern, email): return jsonify({'success': False, 'message': '邮箱格式不正确'}), 400 # 验证密码强度 if len(password) < 8: return jsonify({'success': False, 'message': '密码长度必须至少为8个字符'}), 400 # 检查邮箱是否已注册 conn = get_db_connection() if not conn: return jsonify({'success': False, 'message': '数据库连接失败'}), 500 cursor = conn.cursor(dictionary=True) cursor.execute("SELECT id FROM users WHERE email = %s", (email,)) existing_user = cursor.fetchone() if existing_user: cursor.close() close_connection(conn) return jsonify({'success': False, 'message': '该邮箱已被注册'}), 400 # 生成验证码 verification_code = ''.join(random.choices(string.digits, k=6)) # 存储验证码(实际项目中应使用Redis等带过期功能的存储) verification_codes[email] = { 'code': verification_code, 'expires': datetime.now() + timedelta(minutes=30), 'user_data': { 'email': email, 'password': password, 'name': name, 'gender': gender, 'birth_date': birth_date } } # 发送验证邮件 email_sent = send_verification_email(email, verification_code) cursor.close() close_connection(conn) if not email_sent: return jsonify({'success': False, 'message': '验证邮件发送失败'}), 500 return jsonify({ 'success': True, 'message': '验证码已发送到您的邮箱,请查收并完成注册' }) @auth.route('/verify', methods=['POST']) def verify(): """验证邮箱验证码""" data = request.get_json() if not all(k in data for k in ('email', 'code')): return jsonify({'success': False, 'message': '缺少必要的验证信息'}), 400 email = data['email'] code = data['code'] # 检查验证码 if email not in verification_codes: return jsonify({'success': False, 'message': '验证码不存在或已过期'}), 400 stored_data = verification_codes[email] # 检查验证码是否过期 if datetime.now() > stored_data['expires']: del verification_codes[email] return jsonify({'success': False, 'message': '验证码已过期'}), 400 # 检查验证码是否匹配 if code != stored_data['code']: return jsonify({'success': False, 'message': '验证码不正确'}), 400 # 获取用户数据 user_data = stored_data['user_data'] # 处理空日期值 birth_date = user_data.get('birth_date') if birth_date == '' or not birth_date: birth_date = None # 存储用户信息到数据库 conn = get_db_connection() if not conn: return jsonify({'success': False, 'message': '数据库连接失败'}), 500 cursor = conn.cursor(dictionary=True) try: # 哈希处理密码 hashed_password = hash_password(user_data['password']) # 插入用户数据 query = """ INSERT INTO users (email, password, name, gender, birth_date) VALUES (%s, %s, %s, %s, %s) """ cursor.execute(query, ( user_data['email'], hashed_password, user_data['name'], user_data['gender'], birth_date # 使用处理过的birth_date值 )) conn.commit() # 清除验证码 del verification_codes[email] return jsonify({ 'success': True, 'message': '注册成功,请登录系统' }) except Exception as e: conn.rollback() return jsonify({'success': False, 'message': f'注册失败: {str(e)}'}), 500 finally: cursor.close() close_connection(conn) # 在routes/auth.py现有代码的底部添加 @auth.route('/login', methods=['POST']) def login(): """用户登录接口""" data = request.get_json() if not all(k in data for k in ('email', 'password')): return jsonify({'success': False, 'message': '请输入邮箱和密码'}), 400 email = data['email'] password = data['password'] # 连接数据库 conn = get_db_connection() if not conn: return jsonify({'success': False, 'message': '数据库连接失败'}), 500 cursor = conn.cursor(dictionary=True) try: # 查询用户 cursor.execute("SELECT id, email, password, name FROM users WHERE email = %s", (email,)) user = cursor.fetchone() if not user: return jsonify({'success': False, 'message': '用户不存在'}), 404 # 验证密码 if not check_password(user['password'], password): return jsonify({'success': False, 'message': '密码错误'}), 401 # 创建会话 session['user_id'] = user['id'] session['user_info'] = { 'id': user['id'], 'email': user['email'], 'name': user['name'] } return jsonify({ 'success': True, 'message': '登录成功', 'user': { 'id': user['id'], 'email': user['email'], 'name': user['name'] } }) except Exception as e: print(f"登录错误: {str(e)}") # 添加错误日志 return jsonify({'success': False, 'message': f'登录失败: {str(e)}'}), 500 finally: cursor.close() close_connection(conn) @auth.route('/logout') def logout(): """用户登出接口""" from flask import session # 清除会话 session.pop('user_id', None) session.pop('user_info', None) return jsonify({'success': True, 'message': '已成功登出'})