superlishunqin 29009ef7de user
2025-05-01 04:52:53 +08:00

382 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Blueprint, render_template, request, redirect, url_for, flash, session, jsonify
from werkzeug.security import generate_password_hash, check_password_hash
from app.models.user import User, db
from app.utils.email import send_verification_email, generate_verification_code
import logging
from functools import wraps
import time
from datetime import datetime, timedelta
from app.services.user_service import UserService
from flask_login import login_user, logout_user, current_user, login_required
from app.models.user import User
# 创建蓝图
user_bp = Blueprint('user', __name__)
# 使用内存字典代替Redis存储验证码
class VerificationStore:
def __init__(self):
self.codes = {} # 存储格式: {email: {'code': code, 'expires': timestamp}}
def setex(self, email, seconds, code):
"""设置验证码并指定过期时间"""
expiry = datetime.now() + timedelta(seconds=seconds)
self.codes[email] = {'code': code, 'expires': expiry}
return True
def get(self, email):
"""获取验证码如果过期则返回None"""
if email not in self.codes:
return None
data = self.codes[email]
if datetime.now() > data['expires']:
# 验证码已过期,删除它
self.delete(email)
return None
return data['code']
def delete(self, email):
"""删除验证码"""
if email in self.codes:
del self.codes[email]
return True
# 使用内存存储验证码
verification_codes = VerificationStore()
# 添加管理员权限检查装饰器
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated or current_user.role_id != 1:
flash('您没有管理员权限', 'error')
return redirect(url_for('index'))
return f(*args, **kwargs)
return decorated_function
@user_bp.route('/login', methods=['GET', 'POST'])
def login():
# 如果用户已经登录,直接重定向到首页
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
remember_me = request.form.get('remember_me') == 'on'
if not username or not password:
return render_template('login.html', error='用户名和密码不能为空')
# 检查用户是否存在
user = User.query.filter((User.username == username) | (User.email == username)).first()
if not user or not user.check_password(password):
return render_template('login.html', error='用户名或密码错误')
if user.status == 0:
return render_template('login.html', error='账号已被禁用,请联系管理员')
# 使用 Flask-Login 的 login_user 函数
login_user(user, remember=remember_me)
# 这些session信息仍然可以保留但不再用于认证
session['username'] = user.username
session['role_id'] = user.role_id
# 获取登录后要跳转的页面
next_page = request.args.get('next')
if not next_page or not next_page.startswith('/'):
next_page = url_for('index')
# 重定向到首页或其他请求的页面
return redirect(next_page)
return render_template('login.html')
@user_bp.route('/register', methods=['GET', 'POST'])
def register():
# 如果用户已登录,重定向到首页
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
confirm_password = request.form.get('confirm_password')
verification_code = request.form.get('verification_code')
# 验证表单数据
if not username or not email or not password or not confirm_password or not verification_code:
return render_template('register.html', error='所有字段都是必填项')
if password != confirm_password:
return render_template('register.html', error='两次输入的密码不匹配')
# 检查用户名和邮箱是否已存在
if User.query.filter_by(username=username).first():
return render_template('register.html', error='用户名已存在')
if User.query.filter_by(email=email).first():
return render_template('register.html', error='邮箱已被注册')
# 验证验证码
stored_code = verification_codes.get(email)
if not stored_code or stored_code != verification_code:
return render_template('register.html', error='验证码无效或已过期')
# 创建新用户
try:
new_user = User(
username=username,
password=password, # 密码会在模型中自动哈希
email=email,
nickname=username # 默认昵称与用户名相同
)
db.session.add(new_user)
db.session.commit()
# 清除验证码
verification_codes.delete(email)
flash('注册成功,请登录', 'success')
return redirect(url_for('user.login'))
except Exception as e:
db.session.rollback()
logging.error(f"User registration failed: {str(e)}")
return render_template('register.html', error='注册失败,请稍后重试')
return render_template('register.html')
@user_bp.route('/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('user.login'))
@user_bp.route('/send_verification_code', methods=['POST'])
def send_verification_code():
data = request.get_json()
email = data.get('email')
if not email:
return jsonify({'success': False, 'message': '请提供邮箱地址'})
# 检查邮箱格式
import re
if not re.match(r"[^@]+@[^@]+\.[^@]+", email):
return jsonify({'success': False, 'message': '邮箱格式不正确'})
# 生成验证码
code = generate_verification_code()
# 存储验证码(10分钟有效)
verification_codes.setex(email, 600, code) # 10分钟过期
# 发送验证码邮件
if send_verification_email(email, code):
return jsonify({'success': True, 'message': '验证码已发送'})
else:
return jsonify({'success': False, 'message': '邮件发送失败,请稍后重试'})
# 用户管理列表
@user_bp.route('/manage')
@login_required
@admin_required
def user_list():
page = request.args.get('page', 1, type=int)
search = request.args.get('search', '')
status = request.args.get('status', type=int)
role_id = request.args.get('role_id', type=int)
pagination = UserService.get_users(
page=page,
per_page=10,
search_query=search,
status=status,
role_id=role_id
)
roles = UserService.get_all_roles()
return render_template(
'user/list.html',
pagination=pagination,
search=search,
status=status,
role_id=role_id,
roles=roles
)
# 用户详情/编辑页面
@user_bp.route('/edit/<int:user_id>', methods=['GET', 'POST'])
@login_required
@admin_required
def user_edit(user_id):
user = UserService.get_user_by_id(user_id)
if not user:
flash('用户不存在', 'error')
return redirect(url_for('user.user_list'))
roles = UserService.get_all_roles()
if request.method == 'POST':
data = {
'email': request.form.get('email'),
'phone': request.form.get('phone'),
'nickname': request.form.get('nickname'),
'role_id': int(request.form.get('role_id')),
'status': int(request.form.get('status')),
}
password = request.form.get('password')
if password:
data['password'] = password
success, message = UserService.update_user(user_id, data)
if success:
flash(message, 'success')
return redirect(url_for('user.user_list'))
else:
flash(message, 'error')
return render_template('user/edit.html', user=user, roles=roles)
# 用户状态管理API
@user_bp.route('/status/<int:user_id>', methods=['POST'])
@login_required
@admin_required
def user_status(user_id):
data = request.get_json()
status = data.get('status')
if status is None or status not in [0, 1]:
return jsonify({'success': False, 'message': '无效的状态值'})
# 不能修改自己的状态
if user_id == current_user.id:
return jsonify({'success': False, 'message': '不能修改自己的状态'})
success, message = UserService.change_user_status(user_id, status)
return jsonify({'success': success, 'message': message})
# 用户删除API
@user_bp.route('/delete/<int:user_id>', methods=['POST'])
@login_required
@admin_required
def user_delete(user_id):
# 不能删除自己
if user_id == current_user.id:
return jsonify({'success': False, 'message': '不能删除自己的账号'})
success, message = UserService.delete_user(user_id)
return jsonify({'success': success, 'message': message})
# 个人中心页面
@user_bp.route('/profile', methods=['GET', 'POST'])
@login_required
def user_profile():
user = current_user
if request.method == 'POST':
data = {
'email': request.form.get('email'),
'phone': request.form.get('phone'),
'nickname': request.form.get('nickname')
}
current_password = request.form.get('current_password')
new_password = request.form.get('new_password')
confirm_password = request.form.get('confirm_password')
# 如果用户想要修改密码
if current_password and new_password:
if not user.check_password(current_password):
flash('当前密码不正确', 'error')
return render_template('user/profile.html', user=user)
if new_password != confirm_password:
flash('两次输入的新密码不匹配', 'error')
return render_template('user/profile.html', user=user)
data['password'] = new_password
success, message = UserService.update_user(user.id, data)
if success:
flash(message, 'success')
else:
flash(message, 'error')
return render_template('user/profile.html', user=user)
# 角色管理页面
@user_bp.route('/roles', methods=['GET'])
@login_required
@admin_required
def role_list():
roles = UserService.get_all_roles()
return render_template('user/roles.html', roles=roles)
# 创建/编辑角色API
@user_bp.route('/role/save', methods=['POST'])
@login_required
@admin_required
def role_save():
data = request.get_json()
role_id = data.get('id')
role_name = data.get('role_name')
description = data.get('description')
if not role_name:
return jsonify({'success': False, 'message': '角色名不能为空'})
if role_id: # 更新
success, message = UserService.update_role(role_id, role_name, description)
else: # 创建
success, message = UserService.create_role(role_name, description)
return jsonify({'success': success, 'message': message})
"""
@user_bp.route('/api/role/<int:role_id>/user-count')
@login_required
@admin_required
def get_role_user_count(role_id):
count = User.query.filter_by(role_id=role_id).count()
return jsonify({'count': count})
"""
@user_bp.route('/user/role/<int:role_id>/count', methods=['GET'])
@login_required
@admin_required
def get_role_user_count(role_id):
"""获取指定角色的用户数量"""
try:
count = User.query.filter_by(role_id=role_id).count()
return jsonify({
'success': True,
'count': count
})
except Exception as e:
return jsonify({
'success': False,
'message': f"查询失败: {str(e)}",
'count': 0
}), 500