superlishunqin f434b83090 first commit
2025-03-17 22:43:53 +08:00

232 lines
6.6 KiB
Python

from flask import Blueprint, request, jsonify, session
import random
import string
import re
from datetime import datetime, timedelta
from utils.db import get_db_connection, close_connection
from utils.password import hash_password
from utils.email_sender import send_verification_email
from utils.password import hash_password, check_password
# 创建蓝图
auth = Blueprint('auth', __name__)
# 存储临时验证码
verification_codes = {}
@auth.route('/register', methods=['POST'])
def register():
"""用户注册接口"""
data = request.get_json()
# 检查必要字段
if not all(k in data for k in ('email', 'password', 'name')):
return jsonify({'success': False, 'message': '缺少必要的注册信息'}), 400
email = data['email']
password = data['password']
name = data['name']
gender = data.get('gender')
birth_date = data.get('birth_date')
# 验证邮箱格式
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, email):
return jsonify({'success': False, 'message': '邮箱格式不正确'}), 400
# 验证密码强度
if len(password) < 8:
return jsonify({'success': False, 'message': '密码长度必须至少为8个字符'}), 400
# 检查邮箱是否已注册
conn = get_db_connection()
if not conn:
return jsonify({'success': False, 'message': '数据库连接失败'}), 500
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT id FROM users WHERE email = %s", (email,))
existing_user = cursor.fetchone()
if existing_user:
cursor.close()
close_connection(conn)
return jsonify({'success': False, 'message': '该邮箱已被注册'}), 400
# 生成验证码
verification_code = ''.join(random.choices(string.digits, k=6))
# 存储验证码(实际项目中应使用Redis等带过期功能的存储)
verification_codes[email] = {
'code': verification_code,
'expires': datetime.now() + timedelta(minutes=30),
'user_data': {
'email': email,
'password': password,
'name': name,
'gender': gender,
'birth_date': birth_date
}
}
# 发送验证邮件
email_sent = send_verification_email(email, verification_code)
cursor.close()
close_connection(conn)
if not email_sent:
return jsonify({'success': False, 'message': '验证邮件发送失败'}), 500
return jsonify({
'success': True,
'message': '验证码已发送到您的邮箱,请查收并完成注册'
})
@auth.route('/verify', methods=['POST'])
def verify():
"""验证邮箱验证码"""
data = request.get_json()
if not all(k in data for k in ('email', 'code')):
return jsonify({'success': False, 'message': '缺少必要的验证信息'}), 400
email = data['email']
code = data['code']
# 检查验证码
if email not in verification_codes:
return jsonify({'success': False, 'message': '验证码不存在或已过期'}), 400
stored_data = verification_codes[email]
# 检查验证码是否过期
if datetime.now() > stored_data['expires']:
del verification_codes[email]
return jsonify({'success': False, 'message': '验证码已过期'}), 400
# 检查验证码是否匹配
if code != stored_data['code']:
return jsonify({'success': False, 'message': '验证码不正确'}), 400
# 获取用户数据
user_data = stored_data['user_data']
# 处理空日期值
birth_date = user_data.get('birth_date')
if birth_date == '' or not birth_date:
birth_date = None
# 存储用户信息到数据库
conn = get_db_connection()
if not conn:
return jsonify({'success': False, 'message': '数据库连接失败'}), 500
cursor = conn.cursor(dictionary=True)
try:
# 哈希处理密码
hashed_password = hash_password(user_data['password'])
# 插入用户数据
query = """
INSERT INTO users (email, password, name, gender, birth_date)
VALUES (%s, %s, %s, %s, %s)
"""
cursor.execute(query, (
user_data['email'],
hashed_password,
user_data['name'],
user_data['gender'],
birth_date # 使用处理过的birth_date值
))
conn.commit()
# 清除验证码
del verification_codes[email]
return jsonify({
'success': True,
'message': '注册成功,请登录系统'
})
except Exception as e:
conn.rollback()
return jsonify({'success': False, 'message': f'注册失败: {str(e)}'}), 500
finally:
cursor.close()
close_connection(conn)
# 在routes/auth.py现有代码的底部添加
@auth.route('/login', methods=['POST'])
def login():
"""用户登录接口"""
data = request.get_json()
if not all(k in data for k in ('email', 'password')):
return jsonify({'success': False, 'message': '请输入邮箱和密码'}), 400
email = data['email']
password = data['password']
# 连接数据库
conn = get_db_connection()
if not conn:
return jsonify({'success': False, 'message': '数据库连接失败'}), 500
cursor = conn.cursor(dictionary=True)
try:
# 查询用户
cursor.execute("SELECT id, email, password, name FROM users WHERE email = %s", (email,))
user = cursor.fetchone()
if not user:
return jsonify({'success': False, 'message': '用户不存在'}), 404
# 验证密码
if not check_password(user['password'], password):
return jsonify({'success': False, 'message': '密码错误'}), 401
# 创建会话
session['user_id'] = user['id']
session['user_info'] = {
'id': user['id'],
'email': user['email'],
'name': user['name']
}
return jsonify({
'success': True,
'message': '登录成功',
'user': {
'id': user['id'],
'email': user['email'],
'name': user['name']
}
})
except Exception as e:
print(f"登录错误: {str(e)}") # 添加错误日志
return jsonify({'success': False, 'message': f'登录失败: {str(e)}'}), 500
finally:
cursor.close()
close_connection(conn)
@auth.route('/logout')
def logout():
"""用户登出接口"""
from flask import session
# 清除会话
session.pop('user_id', None)
session.pop('user_info', None)
return jsonify({'success': True, 'message': '已成功登出'})