superlishunqin 50293df5f3 fix_logout
2025-05-07 22:11:41 +08:00

221 lines
7.2 KiB
Python

from flask import Blueprint, request, jsonify, session, redirect, url_for, render_template
import random
import string
import re
from datetime import datetime, timedelta
from utils.db import get_db_connection, close_connection
from utils.email_sender import send_verification_email
from utils.password import hash_password, check_password # 移除了重复的 hash_password
# 创建蓝图
auth = Blueprint('auth', __name__)
# 存储临时验证码
verification_codes = {}
@auth.route('/register', methods=['POST'])
def register():
"""用户注册接口"""
data = request.get_json()
# 检查必要字段
if not all(k in data for k in ('email', 'password', 'name')):
return jsonify({'success': False, 'message': '缺少必要的注册信息'}), 400
email = data['email']
password = data['password']
name = data['name']
gender = data.get('gender')
birth_date = data.get('birth_date')
# 验证邮箱格式
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, email):
return jsonify({'success': False, 'message': '邮箱格式不正确'}), 400
# 验证密码强度
if len(password) < 8:
return jsonify({'success': False, 'message': '密码长度必须至少为8个字符'}), 400
# 检查邮箱是否已注册
conn = None
cursor = None
try:
conn = get_db_connection()
if not conn:
return jsonify({'success': False, 'message': '数据库连接失败'}), 500
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT id FROM users WHERE email = %s", (email,))
existing_user = cursor.fetchone()
if existing_user:
return jsonify({'success': False, 'message': '该邮箱已被注册'}), 400
# 生成验证码
verification_code = ''.join(random.choices(string.digits, k=6))
# 存储验证码
verification_codes[email] = {
'code': verification_code,
'expires': datetime.now() + timedelta(minutes=30),
'user_data': {
'email': email,
'password': password,
'name': name,
'gender': gender,
'birth_date': birth_date
}
}
# 发送验证邮件
email_sent = send_verification_email(email, verification_code)
if not email_sent:
return jsonify({'success': False, 'message': '验证邮件发送失败'}), 500
return jsonify({
'success': True,
'message': '验证码已发送到您的邮箱,请查收并完成注册'
})
finally:
if cursor:
cursor.close()
if conn:
close_connection(conn)
@auth.route('/verify', methods=['POST'])
def verify():
"""验证邮箱验证码"""
data = request.get_json()
if not all(k in data for k in ('email', 'code')):
return jsonify({'success': False, 'message': '缺少必要的验证信息'}), 400
email = data['email']
code = data['code']
if email not in verification_codes:
return jsonify({'success': False, 'message': '验证码不存在或已过期'}), 400
stored_data = verification_codes[email]
if datetime.now() > stored_data['expires']:
del verification_codes[email]
return jsonify({'success': False, 'message': '验证码已过期'}), 400
if code != stored_data['code']:
return jsonify({'success': False, 'message': '验证码不正确'}), 400
user_data = stored_data['user_data']
birth_date = user_data.get('birth_date')
if birth_date == '' or not birth_date:
birth_date = None
conn = None
cursor = None
try:
conn = get_db_connection()
if not conn:
return jsonify({'success': False, 'message': '数据库连接失败'}), 500
cursor = conn.cursor(dictionary=True)
hashed_password = hash_password(user_data['password'])
query = """
INSERT INTO users (email, password, name, gender, birth_date)
VALUES (%s, %s, %s, %s, %s)
"""
cursor.execute(query, (
user_data['email'],
hashed_password,
user_data['name'],
user_data['gender'],
birth_date
))
conn.commit()
del verification_codes[email]
return jsonify({
'success': True,
'message': '注册成功,请登录系统'
})
except Exception as e:
if conn:
conn.rollback()
return jsonify({'success': False, 'message': f'注册失败: {str(e)}'}), 500
finally:
if cursor:
cursor.close()
if conn:
close_connection(conn)
@auth.route('/login', methods=['GET', 'POST']) # 支持 GET 和 POST
def login():
"""用户登录接口 (POST) 及登录页面 (GET)"""
if request.method == 'POST':
data = request.get_json()
if not all(k in data for k in ('email', 'password')):
return jsonify({'success': False, 'message': '请输入邮箱和密码'}), 400
email = data['email']
password = data['password']
conn = None
cursor = None
try:
conn = get_db_connection()
if not conn:
return jsonify({'success': False, 'message': '数据库连接失败'}), 500
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT id, email, password, name FROM users WHERE email = %s", (email,))
user = cursor.fetchone()
if not user:
return jsonify({'success': False, 'message': '用户不存在'}), 404
if not check_password(user['password'], password):
return jsonify({'success': False, 'message': '密码错误'}), 401
session['user_id'] = user['id']
session['user_info'] = {
'id': user['id'],
'email': user['email'],
'name': user['name']
}
return jsonify({
'success': True,
'message': '登录成功',
'user': session['user_info'] # 返回用户信息给前端JS
})
except Exception as e:
print(f"登录错误: {str(e)}")
return jsonify({'success': False, 'message': f'登录失败: {str(e)}'}), 500
finally:
if cursor:
cursor.close()
if conn:
close_connection(conn)
# GET 请求: 显示登录页面
if 'user_id' in session:
# 如果用户已登录,重定向到 dashboard
# !!! 重要: 请确认你的 dashboard 路由 !!!
# 假设 dashboard 路由在 'classify' 蓝图下,且函数名为 'dashboard'
# 如果你的 dashboard 在主应用 app.py 中,可能是 url_for('dashboard_function_name')
return redirect(url_for('classify.dashboard')) # 修改这里如果你的 dashboard 路由不同
return render_template('login.html')
@auth.route('/logout')
def logout():
"""用户登出接口"""
session.pop('user_id', None)
session.pop('user_info', None)
# 重定向到登录页面 (GET /login)
return redirect(url_for('auth.login'))