""" 管理员视图 """ from flask import Blueprint, render_template, request, redirect, url_for, flash, session, jsonify, g from werkzeug.security import generate_password_hash from app.models.admin import AdminUser from app.models.user import User from app.models.order import Order, OrderItem, ShippingInfo from app.models.payment import Payment from app.models.operation_log import OperationLog from app.utils.decorators import admin_required, log_operation from config.database import db from datetime import datetime, timedelta from sqlalchemy import func, or_ import json admin_bp = Blueprint('admin', __name__, url_prefix='/admin') @admin_bp.route('/login', methods=['GET', 'POST']) def login(): """管理员登录""" if request.method == 'POST': username = request.form.get('username', '').strip() password = request.form.get('password', '').strip() if not username or not password: flash('请输入用户名和密码', 'error') return render_template('admin/login.html') # 查找管理员 admin = AdminUser.query.filter_by(username=username).first() if not admin or not admin.check_password(password): flash('用户名或密码错误', 'error') return render_template('admin/login.html') if admin.status != 1: flash('账号已被禁用,请联系系统管理员', 'error') return render_template('admin/login.html') # 登录成功 session['admin_id'] = admin.id session['admin_username'] = admin.username # 更新最后登录时间 admin.update_last_login() # 记录登录日志 try: OperationLog.create_log( user_id=admin.id, user_type=2, action='管理员登录', ip_address=request.remote_addr, user_agent=request.headers.get('User-Agent') ) except Exception as e: print(f"记录登录日志失败: {str(e)}") flash('登录成功', 'success') return redirect(url_for('admin.dashboard')) return render_template('admin/login.html') @admin_bp.route('/logout') @admin_required @log_operation('管理员登出') def logout(): """管理员登出""" session.pop('admin_id', None) session.pop('admin_username', None) flash('已安全退出', 'info') return redirect(url_for('admin.login')) @admin_bp.route('/dashboard') @admin_required def dashboard(): """管理员仪表板""" try: # 获取统计数据 stats = { 'total_users': User.query.count(), 'active_users': User.query.filter_by(status=1).count(), 'total_admins': AdminUser.query.count(), 'recent_logs_count': OperationLog.query.filter( OperationLog.created_at >= datetime.now() - timedelta(days=7) ).count() } # 获取最近的操作日志 recent_logs = OperationLog.query.order_by( OperationLog.created_at.desc() ).limit(10).all() # 最近7天用户注册趋势 user_trend = [] for i in range(6, -1, -1): date = datetime.now() - timedelta(days=i) date_start = date.replace(hour=0, minute=0, second=0, microsecond=0) date_end = date_start + timedelta(days=1) count = User.query.filter( User.created_at >= date_start, User.created_at < date_end ).count() user_trend.append({ 'date': date.strftime('%m-%d'), 'count': count }) return render_template('admin/dashboard.html', stats=stats, recent_logs=recent_logs, user_trend=user_trend) except Exception as e: flash(f'加载仪表板数据失败: {str(e)}', 'error') return render_template('admin/dashboard.html', stats={}, recent_logs=[], user_trend=[]) @admin_bp.route('/profile') @admin_required def profile(): """管理员个人资料""" return render_template('admin/profile.html', admin=g.current_admin) @admin_bp.route('/profile/edit', methods=['POST']) @admin_required @log_operation('修改管理员资料') def edit_profile(): """编辑管理员个人资料""" try: real_name = request.form.get('real_name', '').strip() email = request.form.get('email', '').strip() phone = request.form.get('phone', '').strip() # 更新信息 if real_name: g.current_admin.real_name = real_name if email: g.current_admin.email = email if phone: g.current_admin.phone = phone db.session.commit() flash('个人资料更新成功', 'success') except Exception as e: db.session.rollback() flash(f'更新失败: {str(e)}', 'error') return redirect(url_for('admin.profile')) @admin_bp.route('/change-password', methods=['POST']) @admin_required @log_operation('修改管理员密码') def change_password(): """修改管理员密码""" try: current_password = request.form.get('current_password', '').strip() new_password = request.form.get('new_password', '').strip() confirm_password = request.form.get('confirm_password', '').strip() # 验证当前密码 if not g.current_admin.check_password(current_password): flash('当前密码错误', 'error') return redirect(url_for('admin.profile')) # 验证新密码 if len(new_password) < 6: flash('新密码长度至少6位', 'error') return redirect(url_for('admin.profile')) if new_password != confirm_password: flash('新密码和确认密码不一致', 'error') return redirect(url_for('admin.profile')) # 更新密码 g.current_admin.set_password(new_password) db.session.commit() flash('密码修改成功', 'success') except Exception as e: db.session.rollback() flash(f'密码修改失败: {str(e)}', 'error') return redirect(url_for('admin.profile')) @admin_bp.route('/users') @admin_required def users(): """用户管理""" page = request.args.get('page', 1, type=int) per_page = 20 query = User.query.order_by(User.created_at.desc()) # 搜索功能 search = request.args.get('search', '').strip() if search: query = query.filter( or_( User.username.like(f'%{search}%'), User.email.like(f'%{search}%'), User.phone.like(f'%{search}%'), User.nickname.like(f'%{search}%') ) ) # 状态筛选 status = request.args.get('status', '', type=str) if status: query = query.filter(User.status == int(status)) users = query.paginate(page=page, per_page=per_page, error_out=False) # 计算本周新增用户数 week_start = datetime.now() - timedelta(days=7) week_new_users = User.query.filter(User.created_at >= week_start).count() return render_template('admin/users.html', users=users, search=search, status=status, week_new_users=week_new_users) @admin_bp.route('/users//detail') @admin_required def user_detail(user_id): """获取用户详情""" try: user = User.query.get_or_404(user_id) return jsonify({ 'success': True, 'user': user.to_dict() }) except Exception as e: return jsonify({ 'success': False, 'message': str(e) }) @admin_bp.route('/users//toggle-status', methods=['POST']) @admin_required @log_operation('切换用户状态') def toggle_user_status(user_id): """切换用户状态""" try: user = User.query.get_or_404(user_id) data = request.get_json() new_status = data.get('status') if new_status not in [0, 1]: return jsonify({ 'success': False, 'message': '无效的状态值' }) user.status = new_status db.session.commit() action_text = '启用' if new_status == 1 else '禁用' return jsonify({ 'success': True, 'message': f'用户已{action_text}' }) except Exception as e: db.session.rollback() return jsonify({ 'success': False, 'message': str(e) }) @admin_bp.route('/orders') @admin_required def orders(): """订单管理""" page = request.args.get('page', 1, type=int) per_page = 20 query = Order.query.order_by(Order.created_at.desc()) # 搜索功能 search = request.args.get('search', '').strip() if search: query = query.filter( or_( Order.order_sn.like(f'%{search}%'), Order.user.has(User.username.like(f'%{search}%')), Order.user.has(User.phone.like(f'%{search}%')) ) ) # 状态筛选 status = request.args.get('status', '', type=str) if status: query = query.filter(Order.status == int(status)) # 日期筛选 start_date = request.args.get('start_date', '').strip() end_date = request.args.get('end_date', '').strip() if start_date: try: start_date_obj = datetime.strptime(start_date, '%Y-%m-%d') query = query.filter(Order.created_at >= start_date_obj) except ValueError: pass if end_date: try: end_date_obj = datetime.strptime(end_date, '%Y-%m-%d') + timedelta(days=1) query = query.filter(Order.created_at < end_date_obj) except ValueError: pass orders = query.paginate(page=page, per_page=per_page, error_out=False) # 获取订单统计 order_stats = {} for status_code, status_name in Order.STATUS_CHOICES.items(): count = Order.query.filter_by(status=status_code).count() order_stats[status_code] = {'name': status_name, 'count': count} return render_template('admin/orders.html', orders=orders, search=search, status=status, start_date=start_date, end_date=end_date, order_stats=order_stats, ORDER_STATUS=Order.STATUS_CHOICES) @admin_bp.route('/orders/') @admin_required def order_detail(order_id): """订单详情""" order = Order.query.get_or_404(order_id) # 获取支付记录 payment = Payment.query.filter_by(order_id=order_id).first() # 获取物流信息 shipping_info = ShippingInfo.query.filter_by(order_id=order_id).first() return render_template('admin/order_detail.html', order=order, payment=payment, shipping_info=shipping_info) @admin_bp.route('/orders//ship', methods=['POST']) @admin_required @log_operation('订单发货') def ship_order(order_id): """订单发货""" try: order = Order.query.get_or_404(order_id) if order.status != Order.STATUS_PENDING_SHIPMENT: return jsonify({'success': False, 'message': '订单状态不允许发货'}) # 获取发货信息 shipping_company = request.form.get('shipping_company', '').strip() tracking_number = request.form.get('tracking_number', '').strip() if not shipping_company or not tracking_number: return jsonify({'success': False, 'message': '请填写完整的物流信息'}) # 更新订单状态 order.status = Order.STATUS_SHIPPED order.shipped_at = datetime.utcnow() # 创建或更新物流信息 shipping_info = ShippingInfo.query.filter_by(order_id=order_id).first() if not shipping_info: shipping_info = ShippingInfo(order_id=order_id) db.session.add(shipping_info) shipping_info.shipping_company = shipping_company shipping_info.tracking_number = tracking_number shipping_info.shipping_status = 1 # 已发货 db.session.commit() return jsonify({'success': True, 'message': '发货成功'}) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': f'发货失败: {str(e)}'}) @admin_bp.route('/orders//refund', methods=['POST']) @admin_required @log_operation('订单退款') def refund_order(order_id): """订单退款""" try: order = Order.query.get_or_404(order_id) if order.status not in [Order.STATUS_PENDING_SHIPMENT, Order.STATUS_SHIPPED, Order.STATUS_REFUNDING]: return jsonify({'success': False, 'message': '订单状态不允许退款'}) # 获取退款信息 refund_reason = request.form.get('refund_reason', '').strip() if not refund_reason: return jsonify({'success': False, 'message': '请填写退款原因'}) # 更新订单状态 order.status = Order.STATUS_REFUNDING # 更新支付记录状态 payment = Payment.query.filter_by(order_id=order_id).first() if payment: payment.status = Payment.STATUS_REFUNDED # 恢复库存 from app.models.product import ProductInventory for item in order.order_items: if item.sku_code: sku_info = ProductInventory.query.filter_by(sku_code=item.sku_code).first() if sku_info: sku_info.stock += item.quantity # 减少销量 if item.product: item.product.sales_count = max(0, item.product.sales_count - item.quantity) db.session.commit() return jsonify({'success': True, 'message': '退款处理成功'}) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': f'退款失败: {str(e)}'}) @admin_bp.route('/orders//cancel', methods=['POST']) @admin_required @log_operation('取消订单') def cancel_order(order_id): """取消订单""" try: order = Order.query.get_or_404(order_id) if not order.can_cancel(): return jsonify({'success': False, 'message': '订单状态不允许取消'}) # 获取取消原因 cancel_reason = request.form.get('cancel_reason', '').strip() # 更新订单状态 order.status = Order.STATUS_CANCELLED # 恢复库存 from app.models.product import ProductInventory for item in order.order_items: if item.sku_code: sku_info = ProductInventory.query.filter_by(sku_code=item.sku_code).first() if sku_info: sku_info.stock += item.quantity # 减少销量 if item.product: item.product.sales_count = max(0, item.product.sales_count - item.quantity) db.session.commit() return jsonify({'success': True, 'message': '订单已取消'}) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': f'取消失败: {str(e)}'}) @admin_bp.route('/logs') @admin_required def logs(): """操作日志""" page = request.args.get('page', 1, type=int) per_page = 50 query = OperationLog.query.order_by(OperationLog.created_at.desc()) # 用户类型筛选 user_type = request.args.get('user_type', '', type=str) if user_type: query = query.filter(OperationLog.user_type == int(user_type)) # 操作类型筛选 action = request.args.get('action', '').strip() if action: query = query.filter(OperationLog.action.like(f'%{action}%')) logs = query.paginate(page=page, per_page=per_page, error_out=False) # 计算今日操作数 today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) today_logs_count = OperationLog.query.filter( OperationLog.created_at >= today_start ).count() return render_template('admin/logs.html', logs=logs, user_type=user_type, action=action, today_logs_count=today_logs_count) @admin_bp.route('/logs//detail') @admin_required def log_detail(log_id): """获取日志详情""" try: log = OperationLog.query.get_or_404(log_id) return jsonify({ 'success': True, 'log': log.to_dict() }) except Exception as e: return jsonify({ 'success': False, 'message': str(e) }) @admin_bp.route('/logs/clear', methods=['POST']) @admin_required @log_operation('清理操作日志') def clear_logs(): """清理操作日志""" try: data = request.get_json() days_to_keep = data.get('days_to_keep', 30) # 计算删除日期 delete_before = datetime.now() - timedelta(days=days_to_keep) # 删除旧日志 deleted_count = OperationLog.query.filter( OperationLog.created_at < delete_before ).delete() db.session.commit() return jsonify({ 'success': True, 'message': f'已清理 {deleted_count} 条历史日志' }) except Exception as e: db.session.rollback() return jsonify({ 'success': False, 'message': str(e) })