2025-05-14 15:08:06 +08:00

758 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Blueprint, render_template, request, redirect, url_for, flash, session, jsonify
from werkzeug.security import generate_password_hash, check_password_hash
from app.models.user import User, db
from app.models.log import Log # 导入日志模型
from app.utils.email import send_verification_email, generate_verification_code
import logging
from functools import wraps
import time
from datetime import datetime, timedelta
from app.services.user_service import UserService
from flask_login import login_user, logout_user, current_user, login_required
from app.models.user import User
# 创建蓝图
user_bp = Blueprint('user', __name__)
# 使用内存字典代替Redis存储验证码
class VerificationStore:
def __init__(self):
self.codes = {} # 存储格式: {email: {'code': code, 'expires': timestamp}}
def setex(self, email, seconds, code):
"""设置验证码并指定过期时间"""
expiry = datetime.now() + timedelta(seconds=seconds)
self.codes[email] = {'code': code, 'expires': expiry}
return True
def get(self, email):
"""获取验证码如果过期则返回None"""
if email not in self.codes:
return None
data = self.codes[email]
if datetime.now() > data['expires']:
# 验证码已过期,删除它
self.delete(email)
return None
return data['code']
def delete(self, email):
"""删除验证码"""
if email in self.codes:
del self.codes[email]
return True
# 使用内存存储验证码
verification_codes = VerificationStore()
# 添加管理员权限检查装饰器
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
print(
f"DEBUG: admin_required检查用户认证={current_user.is_authenticated}角色ID={current_user.role_id if current_user.is_authenticated else 'None'}")
if not current_user.is_authenticated:
print("DEBUG: 用户未登录,重定向到登录页面")
return redirect(url_for('user.login', next=request.url))
if current_user.role_id != 1:
print(f"DEBUG: 用户{current_user.username}不是管理员角色ID={current_user.role_id}")
flash('您没有管理员权限访问此页面', 'error')
return redirect(url_for('index'))
print(f"DEBUG: 用户{current_user.username}是管理员,允许访问")
return f(*args, **kwargs)
return decorated_function
@user_bp.route('/login', methods=['GET', 'POST'])
def login():
print(f"DEBUG: 登录函数被调用,认证状态={current_user.is_authenticated}")
print(f"DEBUG: 请求方法={request.method}next参数={request.args.get('next')}")
# 获取next参数
next_page = request.args.get('next')
# 如果用户已经登录,处理重定向
if current_user.is_authenticated:
if next_page:
from urllib.parse import urlparse
parsed = urlparse(next_page)
path = parsed.path
print(f"DEBUG: 提取的路径={path}")
# 删除特殊处理直接重定向到path
return redirect(path)
return redirect(url_for('index'))
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
remember_me = request.form.get('remember_me') == 'on'
if not username or not password:
return render_template('login.html', error='用户名和密码不能为空')
# 检查用户是否存在
user = User.query.filter((User.username == username) | (User.email == username)).first()
if not user or not user.check_password(password):
# 记录登录失败日志
Log.add_log(
action="登录失败",
ip_address=request.remote_addr,
description=f"尝试使用用户名/邮箱 {username} 登录失败"
)
return render_template('login.html', error='用户名或密码错误')
if user.status == 0:
# 记录禁用账号登录尝试
Log.add_log(
action="登录失败",
user_id=user.id,
ip_address=request.remote_addr,
description=f"禁用账号 {username} 尝试登录"
)
return render_template('login.html', error='账号已被禁用,请联系管理员')
# 使用 Flask-Login 的 login_user 函数
login_user(user, remember=remember_me)
# 记录登录成功日志
Log.add_log(
action="用户登录",
user_id=user.id,
ip_address=request.remote_addr,
description=f"用户 {user.username} 登录成功"
)
# 这些session信息仍然可以保留但不再用于认证
session['username'] = user.username
session['role_id'] = user.role_id
# 获取登录后要跳转的页面
next_page = request.args.get('next')
if not next_page or not next_page.startswith('/'):
next_page = url_for('index')
# 重定向到首页或其他请求的页面
return redirect(next_page)
return render_template('login.html')
@user_bp.route('/register', methods=['GET', 'POST'])
def register():
# 如果用户已登录,重定向到首页
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
confirm_password = request.form.get('confirm_password')
verification_code = request.form.get('verification_code')
# 验证表单数据
if not username or not email or not password or not confirm_password or not verification_code:
return render_template('register.html', error='所有字段都是必填项')
if password != confirm_password:
return render_template('register.html', error='两次输入的密码不匹配')
# 检查用户名和邮箱是否已存在
if User.query.filter_by(username=username).first():
return render_template('register.html', error='用户名已存在')
if User.query.filter_by(email=email).first():
return render_template('register.html', error='邮箱已被注册')
# 验证验证码
stored_code = verification_codes.get(email)
if not stored_code or stored_code != verification_code:
return render_template('register.html', error='验证码无效或已过期')
# 创建新用户
try:
new_user = User(
username=username,
password=password, # 密码会在模型中自动哈希
email=email,
nickname=username # 默认昵称与用户名相同
)
db.session.add(new_user)
db.session.commit()
# 记录用户注册日志
Log.add_log(
action="用户注册",
user_id=new_user.id,
ip_address=request.remote_addr,
description=f"新用户 {username} 注册成功"
)
# 清除验证码
verification_codes.delete(email)
flash('注册成功,请登录', 'success')
return redirect(url_for('user.login'))
except Exception as e:
db.session.rollback()
logging.error(f"User registration failed: {str(e)}")
return render_template('register.html', error='注册失败,请稍后重试')
return render_template('register.html')
@user_bp.route('/logout')
@login_required
def logout():
username = current_user.username
user_id = current_user.id
# 先记录日志,再登出
Log.add_log(
action="用户登出",
user_id=user_id,
ip_address=request.remote_addr,
description=f"用户 {username} 登出系统"
)
logout_user()
return redirect(url_for('user.login'))
@user_bp.route('/send_verification_code', methods=['POST'])
def send_verification_code():
data = request.get_json()
email = data.get('email')
if not email:
return jsonify({'success': False, 'message': '请提供邮箱地址'})
# 检查邮箱格式
import re
if not re.match(r"[^@]+@[^@]+\.[^@]+", email):
return jsonify({'success': False, 'message': '邮箱格式不正确'})
# 生成验证码
code = generate_verification_code()
# 存储验证码(10分钟有效)
verification_codes.setex(email, 600, code) # 10分钟过期
# 发送验证码邮件
if send_verification_email(email, code):
# 记录发送验证码日志
Log.add_log(
action="发送验证码",
ip_address=request.remote_addr,
description=f"向邮箱 {email} 发送验证码"
)
return jsonify({'success': True, 'message': '验证码已发送'})
else:
return jsonify({'success': False, 'message': '邮件发送失败,请稍后重试'})
# 用户管理列表
@user_bp.route('/manage')
@login_required
@admin_required
def user_list():
page = request.args.get('page', 1, type=int)
search = request.args.get('search', '')
status = request.args.get('status', type=int)
role_id = request.args.get('role_id', type=int)
pagination = UserService.get_users(
page=page,
per_page=10,
search_query=search,
status=status,
role_id=role_id
)
# 记录管理员访问用户列表日志
Log.add_log(
action="访问用户管理",
user_id=current_user.id,
ip_address=request.remote_addr,
description=f"管理员 {current_user.username} 访问用户管理列表"
)
roles = UserService.get_all_roles()
return render_template(
'user/list.html',
pagination=pagination,
search=search,
status=status,
role_id=role_id,
roles=roles
)
# 用户详情/编辑页面
@user_bp.route('/edit/<int:user_id>', methods=['GET', 'POST'])
@login_required
@admin_required
def user_edit(user_id):
user = UserService.get_user_by_id(user_id)
if not user:
flash('用户不存在', 'error')
return redirect(url_for('user.user_list'))
roles = UserService.get_all_roles()
if request.method == 'POST':
data = {
'email': request.form.get('email'),
'phone': request.form.get('phone'),
'nickname': request.form.get('nickname'),
'role_id': int(request.form.get('role_id')),
'status': int(request.form.get('status')),
}
password = request.form.get('password')
if password:
data['password'] = password
success, message = UserService.update_user(user_id, data)
if success:
# 记录管理员编辑用户信息日志
Log.add_log(
action="编辑用户",
user_id=current_user.id,
target_type="用户",
target_id=user_id,
ip_address=request.remote_addr,
description=f"管理员 {current_user.username} 编辑用户 {user.username} 的信息"
)
flash(message, 'success')
return redirect(url_for('user.user_list'))
else:
flash(message, 'error')
# 记录访问用户编辑页面日志
if request.method == 'GET':
Log.add_log(
action="访问用户编辑",
user_id=current_user.id,
target_type="用户",
target_id=user_id,
ip_address=request.remote_addr,
description=f"管理员 {current_user.username} 访问用户 {user.username} 的编辑页面"
)
return render_template('user/edit.html', user=user, roles=roles)
# 用户状态管理API
@user_bp.route('/status/<int:user_id>', methods=['POST'])
@login_required
@admin_required
def user_status(user_id):
data = request.get_json()
status = data.get('status')
if status is None or status not in [0, 1]:
return jsonify({'success': False, 'message': '无效的状态值'})
# 不能修改自己的状态
if user_id == current_user.id:
return jsonify({'success': False, 'message': '不能修改自己的状态'})
# 查询用户获取用户名(用于日志)
target_user = User.query.get(user_id)
if not target_user:
return jsonify({'success': False, 'message': '用户不存在'})
success, message = UserService.change_user_status(user_id, status)
if success:
# 记录修改用户状态日志
status_text = "启用" if status == 1 else "禁用"
Log.add_log(
action=f"用户{status_text}",
user_id=current_user.id,
target_type="用户",
target_id=user_id,
ip_address=request.remote_addr,
description=f"管理员 {current_user.username} {status_text}用户 {target_user.username}"
)
return jsonify({'success': success, 'message': message})
# 用户删除API
@user_bp.route('/delete/<int:user_id>', methods=['POST'])
@login_required
@admin_required
def user_delete(user_id):
# 不能删除自己
if user_id == current_user.id:
return jsonify({'success': False, 'message': '不能删除自己的账号'})
# 查询用户获取用户名(用于日志)
target_user = User.query.get(user_id)
if not target_user:
return jsonify({'success': False, 'message': '用户不存在'})
target_username = target_user.username # 保存用户名以便记录在日志中
success, message = UserService.delete_user(user_id)
if success:
# 记录删除用户日志
Log.add_log(
action="删除用户",
user_id=current_user.id,
target_type="用户",
ip_address=request.remote_addr,
description=f"管理员 {current_user.username} 删除用户 {target_username}"
)
return jsonify({'success': success, 'message': message})
# 个人中心页面
@user_bp.route('/profile', methods=['GET', 'POST'])
@login_required
def user_profile():
user = current_user
if request.method == 'POST':
data = {
'email': request.form.get('email'),
'phone': request.form.get('phone'),
'nickname': request.form.get('nickname')
}
current_password = request.form.get('current_password')
new_password = request.form.get('new_password')
confirm_password = request.form.get('confirm_password')
# 如果用户想要修改密码
if current_password and new_password:
if not user.check_password(current_password):
flash('当前密码不正确', 'error')
return render_template('user/profile.html', user=user)
if new_password != confirm_password:
flash('两次输入的新密码不匹配', 'error')
return render_template('user/profile.html', user=user)
data['password'] = new_password
password_changed = True
else:
password_changed = False
success, message = UserService.update_user(user.id, data)
if success:
# 记录用户修改个人信息日志
log_description = f"用户 {user.username} 修改了个人信息"
if password_changed:
log_description += ",包括密码修改"
Log.add_log(
action="修改个人信息",
user_id=user.id,
ip_address=request.remote_addr,
description=log_description
)
flash(message, 'success')
else:
flash(message, 'error')
return render_template('user/profile.html', user=user)
# 角色管理页面
@user_bp.route('/roles', methods=['GET'])
@login_required
@admin_required
def role_list():
roles = UserService.get_all_roles()
# 记录访问角色管理页面日志
Log.add_log(
action="访问角色管理",
user_id=current_user.id,
ip_address=request.remote_addr,
description=f"管理员 {current_user.username} 访问角色管理页面"
)
return render_template('user/roles.html', roles=roles)
# 获取所有系统权限的API
@user_bp.route('/permissions', methods=['GET'])
@login_required
@admin_required
def get_permissions():
"""获取所有可用的系统权限"""
from app.models.permission import Permission
try:
permissions = Permission.query.order_by(Permission.code).all()
# 转换为JSON格式
permissions_data = [{
'id': p.id,
'code': p.code,
'name': p.name,
'description': p.description
} for p in permissions]
return jsonify({
'success': True,
'permissions': permissions_data
})
except Exception as e:
logging.error(f"获取权限列表失败: {str(e)}")
return jsonify({
'success': False,
'message': f"获取权限列表失败: {str(e)}"
}), 500
# 获取特定角色的权限
@user_bp.route('/role/<int:role_id>/permissions', methods=['GET'])
@login_required
@admin_required
def get_role_permissions(role_id):
"""获取指定角色的权限ID列表"""
from app.models.user import Role
try:
role = Role.query.get(role_id)
if not role:
return jsonify({
'success': False,
'message': '角色不存在'
}), 404
# 获取角色的所有权限ID
permissions = [p.id for p in role.permissions]
return jsonify({
'success': True,
'permissions': permissions
})
except Exception as e:
logging.error(f"获取角色权限失败: {str(e)}")
return jsonify({
'success': False,
'message': f"获取角色权限失败: {str(e)}"
}), 500
# 修改角色保存路由,支持权限管理
@user_bp.route('/role/save', methods=['POST'])
@login_required
@admin_required
def role_save():
"""创建或更新角色,包括权限分配"""
data = request.get_json()
role_id = data.get('id')
role_name = data.get('role_name')
description = data.get('description')
permission_ids = data.get('permissions', []) # 获取权限ID列表
if not role_name:
return jsonify({'success': False, 'message': '角色名不能为空'})
# 处理系统内置角色的权限保护
if role_id and int(role_id) in [1, 2]:
permission_ids = None # 不修改内置角色的权限
if role_id: # 更新角色
success, message = UserService.update_role(role_id, role_name, description, permission_ids)
if success:
# 记录编辑角色日志
Log.add_log(
action="编辑角色",
user_id=current_user.id,
target_type="角色",
target_id=role_id,
ip_address=request.remote_addr,
description=f"管理员 {current_user.username} 编辑角色 {role_name},包含权限设置"
)
else: # 创建角色
success, message, new_role_id = UserService.create_role(role_name, description, permission_ids)
if success:
role_id = new_role_id
# 记录创建角色日志
Log.add_log(
action="创建角色",
user_id=current_user.id,
target_type="角色",
target_id=role_id,
ip_address=request.remote_addr,
description=f"管理员 {current_user.username} 创建新角色 {role_name},设置了 {len(permission_ids)} 个权限"
)
return jsonify({'success': success, 'message': message})
# 角色删除API
@user_bp.route('/role/delete/<int:role_id>', methods=['POST'])
@login_required
@admin_required
def role_delete(role_id):
"""删除角色"""
# 保护系统内置角色
if role_id in [1, 2]:
return jsonify({
'success': False,
'message': '不能删除系统内置角色'
})
from app.models.user import Role
try:
# 获取角色信息用于日志记录
role = Role.query.get(role_id)
if not role:
return jsonify({
'success': False,
'message': '角色不存在'
}), 404
role_name = role.role_name
# 检查是否有用户在使用该角色
user_count = User.query.filter_by(role_id=role_id).count()
if user_count > 0:
return jsonify({
'success': False,
'message': f'无法删除:该角色下存在 {user_count} 个用户'
})
# 删除角色
db.session.delete(role)
db.session.commit()
# 记录删除角色日志
Log.add_log(
action="删除角色",
user_id=current_user.id,
target_type="角色",
ip_address=request.remote_addr,
description=f"管理员 {current_user.username} 删除了角色 {role_name}"
)
return jsonify({
'success': True,
'message': '角色删除成功'
})
except Exception as e:
db.session.rollback()
logging.error(f"删除角色失败: {str(e)}")
return jsonify({
'success': False,
'message': f"删除角色失败: {str(e)}"
}), 500
@user_bp.route('/role/<int:role_id>/count', methods=['GET'])
@login_required
@admin_required
def get_role_user_count(role_id):
"""获取指定角色的用户数量"""
try:
count = User.query.filter_by(role_id=role_id).count()
return jsonify({
'success': True,
'count': count
})
except Exception as e:
return jsonify({
'success': False,
'message': f"查询失败: {str(e)}",
'count': 0
}), 500
@user_bp.route('/add', methods=['GET', 'POST'])
@login_required
@admin_required
def add_user():
roles = UserService.get_all_roles()
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
confirm_password = request.form.get('confirm_password')
verification_code = request.form.get('verification_code')
nickname = request.form.get('nickname')
phone = request.form.get('phone')
if phone == '':
phone = None
nickname = request.form.get('nickname')
role_id = request.form.get('role_id', 2, type=int) # 默认为普通用户
status = request.form.get('status', 1, type=int) # 默认为启用状态
# 验证表单数据
if not username or not email or not password or not confirm_password or not verification_code:
return render_template('user/add.html', error='所有必填字段不能为空', roles=roles)
if password != confirm_password:
return render_template('user/add.html', error='两次输入的密码不匹配', roles=roles)
# 检查用户名和邮箱是否已存在
if User.query.filter_by(username=username).first():
return render_template('user/add.html', error='用户名已存在', roles=roles)
if User.query.filter_by(email=email).first():
return render_template('user/add.html', error='邮箱已被注册', roles=roles)
# 验证验证码
stored_code = verification_codes.get(email)
if not stored_code or stored_code != verification_code:
return render_template('user/add.html', error='验证码无效或已过期', roles=roles)
# 创建新用户
try:
new_user = User(
username=username,
password=password, # 密码会在模型中自动哈希
email=email,
nickname=nickname or username, # 如果未提供昵称,使用用户名
phone=phone,
role_id=role_id,
status=status
)
db.session.add(new_user)
db.session.commit()
# 记录管理员添加用户日志
Log.add_log(
action="添加用户",
user_id=current_user.id,
target_type="用户",
target_id=new_user.id,
ip_address=request.remote_addr,
description=f"管理员 {current_user.username} 添加新用户 {username}"
)
# 清除验证码
verification_codes.delete(email)
flash('用户添加成功', 'success')
return redirect(url_for('user.user_list'))
except Exception as e:
db.session.rollback()
logging.error(f"用户添加失败: {str(e)}")
return render_template('user/add.html', error=f'添加用户失败: {str(e)}', roles=roles)
# GET请求显示添加用户表单
return render_template('user/add.html', roles=roles)