2025-07-04 19:07:35 +08:00

250 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
管理员视图
"""
from flask import Blueprint, render_template, request, redirect, url_for, flash, session, jsonify, g
from werkzeug.security import generate_password_hash
from app.models.admin import AdminUser
from app.models.user import User
from app.models.operation_log import OperationLog
from app.utils.decorators import admin_required, log_operation
from config.database import db
from datetime import datetime, timedelta
from sqlalchemy import func
admin_bp = Blueprint('admin', __name__, url_prefix='/admin')
@admin_bp.route('/login', methods=['GET', 'POST'])
def login():
"""管理员登录"""
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '').strip()
if not username or not password:
flash('请输入用户名和密码', 'error')
return render_template('admin/login.html')
# 查找管理员
admin = AdminUser.query.filter_by(username=username).first()
if not admin or not admin.check_password(password):
flash('用户名或密码错误', 'error')
return render_template('admin/login.html')
if admin.status != 1:
flash('账号已被禁用,请联系系统管理员', 'error')
return render_template('admin/login.html')
# 登录成功
session['admin_id'] = admin.id
session['admin_username'] = admin.username
# 更新最后登录时间
admin.update_last_login()
# 记录登录日志
try:
OperationLog.create_log(
user_id=admin.id,
user_type=2,
action='管理员登录',
ip_address=request.remote_addr,
user_agent=request.headers.get('User-Agent')
)
except Exception as e:
print(f"记录登录日志失败: {str(e)}")
flash('登录成功', 'success')
return redirect(url_for('admin.dashboard'))
return render_template('admin/login.html')
@admin_bp.route('/logout')
@admin_required
@log_operation('管理员登出')
def logout():
"""管理员登出"""
session.pop('admin_id', None)
session.pop('admin_username', None)
flash('已安全退出', 'info')
return redirect(url_for('admin.login'))
@admin_bp.route('/dashboard')
@admin_required
def dashboard():
"""管理员仪表板"""
try:
# 获取统计数据
stats = {
'total_users': User.query.count(),
'active_users': User.query.filter_by(status=1).count(),
'total_admins': AdminUser.query.count(),
'recent_logs_count': OperationLog.query.filter(
OperationLog.created_at >= datetime.now() - timedelta(days=7)
).count()
}
# 获取最近的操作日志
recent_logs = OperationLog.query.order_by(
OperationLog.created_at.desc()
).limit(10).all()
# 用户注册趋势最近7天
user_trend = []
for i in range(6, -1, -1):
date = datetime.now() - timedelta(days=i)
date_start = date.replace(hour=0, minute=0, second=0, microsecond=0)
date_end = date_start + timedelta(days=1)
count = User.query.filter(
User.created_at >= date_start,
User.created_at < date_end
).count()
user_trend.append({
'date': date.strftime('%m-%d'),
'count': count
})
return render_template('admin/dashboard.html',
stats=stats,
recent_logs=recent_logs,
user_trend=user_trend)
except Exception as e:
flash(f'加载仪表板数据失败: {str(e)}', 'error')
return render_template('admin/dashboard.html',
stats={},
recent_logs=[],
user_trend=[])
@admin_bp.route('/profile')
@admin_required
def profile():
"""管理员个人资料"""
return render_template('admin/profile.html', admin=g.current_admin)
@admin_bp.route('/profile/edit', methods=['POST'])
@admin_required
@log_operation('修改管理员资料')
def edit_profile():
"""编辑管理员个人资料"""
try:
real_name = request.form.get('real_name', '').strip()
email = request.form.get('email', '').strip()
phone = request.form.get('phone', '').strip()
# 更新信息
if real_name:
g.current_admin.real_name = real_name
if email:
g.current_admin.email = email
if phone:
g.current_admin.phone = phone
db.session.commit()
flash('个人资料更新成功', 'success')
except Exception as e:
db.session.rollback()
flash(f'更新失败: {str(e)}', 'error')
return redirect(url_for('admin.profile'))
@admin_bp.route('/change-password', methods=['POST'])
@admin_required
@log_operation('修改管理员密码')
def change_password():
"""修改管理员密码"""
try:
current_password = request.form.get('current_password', '').strip()
new_password = request.form.get('new_password', '').strip()
confirm_password = request.form.get('confirm_password', '').strip()
# 验证当前密码
if not g.current_admin.check_password(current_password):
flash('当前密码错误', 'error')
return redirect(url_for('admin.profile'))
# 验证新密码
if len(new_password) < 6:
flash('新密码长度至少6位', 'error')
return redirect(url_for('admin.profile'))
if new_password != confirm_password:
flash('新密码和确认密码不一致', 'error')
return redirect(url_for('admin.profile'))
# 更新密码
g.current_admin.set_password(new_password)
db.session.commit()
flash('密码修改成功', 'success')
except Exception as e:
db.session.rollback()
flash(f'密码修改失败: {str(e)}', 'error')
return redirect(url_for('admin.profile'))
@admin_bp.route('/users')
@admin_required
def users():
"""用户管理"""
page = request.args.get('page', 1, type=int)
per_page = 20
query = User.query.order_by(User.created_at.desc())
# 搜索功能
search = request.args.get('search', '').strip()
if search:
query = query.filter(
db.or_(
User.username.like(f'%{search}%'),
User.email.like(f'%{search}%'),
User.phone.like(f'%{search}%'),
User.nickname.like(f'%{search}%')
)
)
# 状态筛选
status = request.args.get('status', '', type=str)
if status:
query = query.filter(User.status == int(status))
users = query.paginate(page=page, per_page=per_page, error_out=False)
return render_template('admin/users.html', users=users, search=search, status=status)
@admin_bp.route('/logs')
@admin_required
def logs():
"""操作日志"""
page = request.args.get('page', 1, type=int)
per_page = 50
query = OperationLog.query.order_by(OperationLog.created_at.desc())
# 用户类型筛选
user_type = request.args.get('user_type', '', type=str)
if user_type:
query = query.filter(OperationLog.user_type == int(user_type))
# 操作类型筛选
action = request.args.get('action', '').strip()
if action:
query = query.filter(OperationLog.action.like(f'%{action}%'))
logs = query.paginate(page=page, per_page=per_page, error_out=False)
return render_template('admin/logs.html', logs=logs, user_type=user_type, action=action)