201 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			201 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from flask import Blueprint, render_template, request, jsonify
 | 
						|
from flask_login import current_user, login_required
 | 
						|
from app.models.log import Log
 | 
						|
from app.models.user import User, db  # 导入db
 | 
						|
from app.controllers.user import admin_required  # 导入admin_required装饰器
 | 
						|
from datetime import datetime, timedelta
 | 
						|
 | 
						|
# 创建蓝图
 | 
						|
log_bp = Blueprint('log', __name__, url_prefix='/log')
 | 
						|
 | 
						|
 | 
						|
@log_bp.route('/list')
 | 
						|
@login_required
 | 
						|
@admin_required
 | 
						|
def log_list():
 | 
						|
    """日志列表页面"""
 | 
						|
    # 获取筛选参数
 | 
						|
    page = request.args.get('page', 1, type=int)
 | 
						|
    user_id = request.args.get('user_id', type=int)
 | 
						|
    action = request.args.get('action')
 | 
						|
    target_type = request.args.get('target_type')
 | 
						|
 | 
						|
    # 处理日期范围参数
 | 
						|
    date_range = request.args.get('date_range', '7')  # 默认显示7天内的日志
 | 
						|
    end_date = datetime.now()
 | 
						|
    start_date = None
 | 
						|
 | 
						|
    if date_range == '1':
 | 
						|
        start_date = end_date - timedelta(days=1)
 | 
						|
    elif date_range == '7':
 | 
						|
        start_date = end_date - timedelta(days=7)
 | 
						|
    elif date_range == '30':
 | 
						|
        start_date = end_date - timedelta(days=30)
 | 
						|
    elif date_range == 'custom':
 | 
						|
        start_date_str = request.args.get('start_date')
 | 
						|
        end_date_str = request.args.get('end_date')
 | 
						|
 | 
						|
        if start_date_str:
 | 
						|
            start_date = datetime.strptime(start_date_str, '%Y-%m-%d')
 | 
						|
        if end_date_str:
 | 
						|
            end_date = datetime.strptime(end_date_str + ' 23:59:59', '%Y-%m-%d %H:%M:%S')
 | 
						|
 | 
						|
    # 获取分页数据
 | 
						|
    pagination = Log.get_logs(
 | 
						|
        page=page,
 | 
						|
        per_page=20,
 | 
						|
        user_id=user_id,
 | 
						|
        action=action,
 | 
						|
        target_type=target_type,
 | 
						|
        start_date=start_date,
 | 
						|
        end_date=end_date
 | 
						|
    )
 | 
						|
 | 
						|
    # 获取用户列表和操作类型列表,用于筛选
 | 
						|
    users = User.query.all()
 | 
						|
 | 
						|
    # 统计各类操作的数量
 | 
						|
    action_types = db.session.query(Log.action, db.func.count(Log.id)) \
 | 
						|
        .group_by(Log.action).all()
 | 
						|
 | 
						|
    target_types = db.session.query(Log.target_type, db.func.count(Log.id)) \
 | 
						|
        .filter(Log.target_type != None) \
 | 
						|
        .group_by(Log.target_type).all()
 | 
						|
 | 
						|
    return render_template(
 | 
						|
        'log/list.html',
 | 
						|
        pagination=pagination,
 | 
						|
        users=users,
 | 
						|
        action_types=action_types,
 | 
						|
        target_types=target_types,
 | 
						|
        filters={
 | 
						|
            'user_id': user_id,
 | 
						|
            'action': action,
 | 
						|
            'target_type': target_type,
 | 
						|
            'date_range': date_range,
 | 
						|
            'start_date': start_date.strftime('%Y-%m-%d') if start_date else '',
 | 
						|
            'end_date': end_date.strftime('%Y-%m-%d') if end_date != datetime.now() else ''
 | 
						|
        }
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
@log_bp.route('/detail/<int:log_id>')
 | 
						|
@login_required
 | 
						|
@admin_required
 | 
						|
def log_detail(log_id):
 | 
						|
    """日志详情页面"""
 | 
						|
    log = Log.query.get_or_404(log_id)
 | 
						|
    return render_template('log/detail.html', log=log)
 | 
						|
 | 
						|
 | 
						|
@log_bp.route('/api/export', methods=['POST'])
 | 
						|
@login_required
 | 
						|
@admin_required
 | 
						|
def export_logs():
 | 
						|
    """导出日志API"""
 | 
						|
    import csv
 | 
						|
    from io import StringIO
 | 
						|
    from flask import Response
 | 
						|
 | 
						|
    data = request.get_json()
 | 
						|
    user_id = data.get('user_id')
 | 
						|
    action = data.get('action')
 | 
						|
    target_type = data.get('target_type')
 | 
						|
    start_date_str = data.get('start_date')
 | 
						|
    end_date_str = data.get('end_date')
 | 
						|
 | 
						|
    # 处理日期范围
 | 
						|
    start_date = None
 | 
						|
    end_date = datetime.now()
 | 
						|
 | 
						|
    if start_date_str:
 | 
						|
        start_date = datetime.strptime(start_date_str, '%Y-%m-%d')
 | 
						|
    if end_date_str:
 | 
						|
        end_date = datetime.strptime(end_date_str + ' 23:59:59', '%Y-%m-%d %H:%M:%S')
 | 
						|
 | 
						|
    # 查询日志
 | 
						|
    query = Log.query.order_by(Log.created_at.desc())
 | 
						|
 | 
						|
    if user_id:
 | 
						|
        query = query.filter(Log.user_id == user_id)
 | 
						|
    if action:
 | 
						|
        query = query.filter(Log.action == action)
 | 
						|
    if target_type:
 | 
						|
        query = query.filter(Log.target_type == target_type)
 | 
						|
    if start_date:
 | 
						|
        query = query.filter(Log.created_at >= start_date)
 | 
						|
    if end_date:
 | 
						|
        query = query.filter(Log.created_at <= end_date)
 | 
						|
 | 
						|
    logs = query.all()
 | 
						|
 | 
						|
    # 生成CSV文件
 | 
						|
    si = StringIO()
 | 
						|
    csv_writer = csv.writer(si)
 | 
						|
 | 
						|
    # 写入标题行
 | 
						|
    csv_writer.writerow(['ID', '用户', '操作类型', '目标类型', '目标ID', 'IP地址', '描述', '创建时间'])
 | 
						|
 | 
						|
    # 写入数据行
 | 
						|
    for log in logs:
 | 
						|
        username = log.user.username if log.user else "未登录"
 | 
						|
        csv_writer.writerow([
 | 
						|
            log.id,
 | 
						|
            username,
 | 
						|
            log.action,
 | 
						|
            log.target_type or '',
 | 
						|
            log.target_id or '',
 | 
						|
            log.ip_address or '',
 | 
						|
            log.description or '',
 | 
						|
            log.created_at.strftime('%Y-%m-%d %H:%M:%S')
 | 
						|
        ])
 | 
						|
 | 
						|
    # 设置响应头,使浏览器将其识别为下载文件
 | 
						|
    filename = f"system_logs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
 | 
						|
 | 
						|
    output = si.getvalue()
 | 
						|
 | 
						|
    # 返回Base64编码的CSV数据
 | 
						|
    import base64
 | 
						|
    encoded_data = base64.b64encode(output.encode('utf-8')).decode('utf-8')
 | 
						|
 | 
						|
    return jsonify({
 | 
						|
        'success': True,
 | 
						|
        'message': f'已生成 {len(logs)} 条日志记录',
 | 
						|
        'count': len(logs),
 | 
						|
        'filename': filename,
 | 
						|
        'filedata': encoded_data,
 | 
						|
        'filetype': 'text/csv'
 | 
						|
    })
 | 
						|
 | 
						|
 | 
						|
@log_bp.route('/api/clear', methods=['POST'])
 | 
						|
@login_required
 | 
						|
@admin_required
 | 
						|
def clear_logs():
 | 
						|
    """清空日志API"""
 | 
						|
    data = request.get_json()
 | 
						|
    days = data.get('days', 0)
 | 
						|
 | 
						|
    try:
 | 
						|
        if days > 0:
 | 
						|
            # 清除指定天数前的日志
 | 
						|
            cutoff_date = datetime.now() - timedelta(days=days)
 | 
						|
            deleted = Log.query.filter(Log.created_at < cutoff_date).delete()
 | 
						|
        else:
 | 
						|
            # 清空全部日志
 | 
						|
            deleted = Log.query.delete()
 | 
						|
 | 
						|
        db.session.commit()
 | 
						|
        return jsonify({
 | 
						|
            'success': True,
 | 
						|
            'message': f'成功清除 {deleted} 条日志记录',
 | 
						|
            'count': deleted
 | 
						|
        })
 | 
						|
    except Exception as e:
 | 
						|
        db.session.rollback()
 | 
						|
        return jsonify({
 | 
						|
            'success': False,
 | 
						|
            'message': f'清除日志失败: {str(e)}'
 | 
						|
        }), 500
 |