287 lines
		
	
	
		
			9.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			287 lines
		
	
	
		
			9.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from flask import Blueprint, render_template, request, jsonify
 | 
						||
from flask_login import current_user, login_required
 | 
						||
from app.models.log import Log
 | 
						||
from app.models.user import User, db  # 导入db
 | 
						||
from app.utils.auth import permission_required  # 更改为导入permission_required装饰器
 | 
						||
from datetime import datetime, timedelta
 | 
						||
 | 
						||
# 创建蓝图
 | 
						||
log_bp = Blueprint('log', __name__, url_prefix='/log')
 | 
						||
 | 
						||
 | 
						||
@log_bp.route('/list')
 | 
						||
@login_required
 | 
						||
@permission_required('view_logs')  # 替代 @admin_required
 | 
						||
def log_list():
 | 
						||
    """日志列表页面"""
 | 
						||
    # 获取筛选参数
 | 
						||
    page = request.args.get('page', 1, type=int)
 | 
						||
    user_id = request.args.get('user_id', type=int)
 | 
						||
    action = request.args.get('action')
 | 
						||
    target_type = request.args.get('target_type')
 | 
						||
 | 
						||
    # 处理日期范围参数
 | 
						||
    date_range = request.args.get('date_range', '7')  # 默认显示7天内的日志
 | 
						||
    end_date = datetime.now()
 | 
						||
    start_date = None
 | 
						||
 | 
						||
    if date_range == '1':
 | 
						||
        start_date = end_date - timedelta(days=1)
 | 
						||
    elif date_range == '7':
 | 
						||
        start_date = end_date - timedelta(days=7)
 | 
						||
    elif date_range == '30':
 | 
						||
        start_date = end_date - timedelta(days=30)
 | 
						||
    elif date_range == 'custom':
 | 
						||
        start_date_str = request.args.get('start_date')
 | 
						||
        end_date_str = request.args.get('end_date')
 | 
						||
 | 
						||
        if start_date_str:
 | 
						||
            start_date = datetime.strptime(start_date_str, '%Y-%m-%d')
 | 
						||
        if end_date_str:
 | 
						||
            end_date = datetime.strptime(end_date_str + ' 23:59:59', '%Y-%m-%d %H:%M:%S')
 | 
						||
 | 
						||
    # 获取分页数据
 | 
						||
    pagination = Log.get_logs(
 | 
						||
        page=page,
 | 
						||
        per_page=20,
 | 
						||
        user_id=user_id,
 | 
						||
        action=action,
 | 
						||
        target_type=target_type,
 | 
						||
        start_date=start_date,
 | 
						||
        end_date=end_date
 | 
						||
    )
 | 
						||
 | 
						||
    # 获取用户列表和操作类型列表,用于筛选
 | 
						||
    users = User.query.all()
 | 
						||
 | 
						||
    # 统计各类操作的数量
 | 
						||
    action_types = db.session.query(Log.action, db.func.count(Log.id)) \
 | 
						||
        .group_by(Log.action).all()
 | 
						||
 | 
						||
    target_types = db.session.query(Log.target_type, db.func.count(Log.id)) \
 | 
						||
        .filter(Log.target_type != None) \
 | 
						||
        .group_by(Log.target_type).all()
 | 
						||
 | 
						||
    return render_template(
 | 
						||
        'log/list.html',
 | 
						||
        pagination=pagination,
 | 
						||
        users=users,
 | 
						||
        action_types=action_types,
 | 
						||
        target_types=target_types,
 | 
						||
        filters={
 | 
						||
            'user_id': user_id,
 | 
						||
            'action': action,
 | 
						||
            'target_type': target_type,
 | 
						||
            'date_range': date_range,
 | 
						||
            'start_date': start_date.strftime('%Y-%m-%d') if start_date else '',
 | 
						||
            'end_date': end_date.strftime('%Y-%m-%d') if end_date != datetime.now() else ''
 | 
						||
        }
 | 
						||
    )
 | 
						||
 | 
						||
 | 
						||
@log_bp.route('/detail/<int:log_id>')
 | 
						||
@login_required
 | 
						||
@permission_required('view_logs')  # 替代 @admin_required
 | 
						||
def log_detail(log_id):
 | 
						||
    """日志详情页面"""
 | 
						||
    log = Log.query.get_or_404(log_id)
 | 
						||
    return render_template('log/detail.html', log=log)
 | 
						||
 | 
						||
 | 
						||
@log_bp.route('/api/export', methods=['POST'])
 | 
						||
@login_required
 | 
						||
@permission_required('view_logs')  # 替代 @admin_required
 | 
						||
def export_logs():
 | 
						||
    """导出日志API"""
 | 
						||
    data = request.get_json()
 | 
						||
    user_id = data.get('user_id')
 | 
						||
    action = data.get('action')
 | 
						||
    target_type = data.get('target_type')
 | 
						||
    start_date_str = data.get('start_date')
 | 
						||
    end_date_str = data.get('end_date')
 | 
						||
    export_format = data.get('format', 'csv')  # 获取导出格式参数
 | 
						||
 | 
						||
    # 处理日期范围
 | 
						||
    start_date = None
 | 
						||
    end_date = datetime.now()
 | 
						||
 | 
						||
    if start_date_str:
 | 
						||
        start_date = datetime.strptime(start_date_str, '%Y-%m-%d')
 | 
						||
    if end_date_str:
 | 
						||
        end_date = datetime.strptime(end_date_str + ' 23:59:59', '%Y-%m-%d %H:%M:%S')
 | 
						||
 | 
						||
    # 查询日志
 | 
						||
    query = Log.query.order_by(Log.created_at.desc())
 | 
						||
 | 
						||
    if user_id:
 | 
						||
        query = query.filter(Log.user_id == user_id)
 | 
						||
    if action:
 | 
						||
        query = query.filter(Log.action == action)
 | 
						||
    if target_type:
 | 
						||
        query = query.filter(Log.target_type == target_type)
 | 
						||
    if start_date:
 | 
						||
        query = query.filter(Log.created_at >= start_date)
 | 
						||
    if end_date:
 | 
						||
        query = query.filter(Log.created_at <= end_date)
 | 
						||
 | 
						||
    logs = query.all()
 | 
						||
 | 
						||
    try:
 | 
						||
        # 根据格式选择导出方法
 | 
						||
        if export_format == 'xlsx':
 | 
						||
            return export_as_xlsx(logs)
 | 
						||
        else:
 | 
						||
            return export_as_csv(logs)
 | 
						||
    except Exception as e:
 | 
						||
        # 记录错误以便调试
 | 
						||
        import traceback
 | 
						||
        error_details = traceback.format_exc()
 | 
						||
        current_app.logger.error(f"Export error: {str(e)}\n{error_details}")
 | 
						||
 | 
						||
        return jsonify({
 | 
						||
            'success': False,
 | 
						||
            'message': f'导出失败: {str(e)}'
 | 
						||
        }), 500
 | 
						||
 | 
						||
 | 
						||
def export_as_csv(logs):
 | 
						||
    """导出为CSV格式"""
 | 
						||
    import csv
 | 
						||
    from io import StringIO
 | 
						||
    import base64
 | 
						||
 | 
						||
    # 创建CSV文件
 | 
						||
    output = StringIO()
 | 
						||
    output.write('\ufeff')  # 添加BOM标记,解决Excel中文乱码
 | 
						||
 | 
						||
    csv_writer = csv.writer(output)
 | 
						||
 | 
						||
    # 写入标题行
 | 
						||
    csv_writer.writerow(['ID', '用户', '操作类型', '目标类型', '目标ID', 'IP地址', '描述', '创建时间'])
 | 
						||
 | 
						||
    # 写入数据行
 | 
						||
    for log in logs:
 | 
						||
        username = log.user.username if log.user else "未登录"
 | 
						||
        csv_writer.writerow([
 | 
						||
            log.id,
 | 
						||
            username,
 | 
						||
            log.action,
 | 
						||
            log.target_type or '',
 | 
						||
            log.target_id or '',
 | 
						||
            log.ip_address or '',
 | 
						||
            log.description or '',
 | 
						||
            log.created_at.strftime('%Y-%m-%d %H:%M:%S')
 | 
						||
        ])
 | 
						||
 | 
						||
    # 获取CSV字符串并进行Base64编码
 | 
						||
    csv_string = output.getvalue()
 | 
						||
    csv_bytes = csv_string.encode('utf-8')
 | 
						||
    b64_encoded = base64.b64encode(csv_bytes).decode('utf-8')
 | 
						||
 | 
						||
    # 设置文件名
 | 
						||
    filename = f"system_logs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
 | 
						||
 | 
						||
    return jsonify({
 | 
						||
        'success': True,
 | 
						||
        'message': f'已生成 {len(logs)} 条日志记录',
 | 
						||
        'count': len(logs),
 | 
						||
        'filename': filename,
 | 
						||
        'filedata': b64_encoded,
 | 
						||
        'filetype': 'text/csv'
 | 
						||
    })
 | 
						||
 | 
						||
 | 
						||
def export_as_xlsx(logs):
 | 
						||
    """导出为XLSX格式"""
 | 
						||
    import base64
 | 
						||
    from io import BytesIO
 | 
						||
 | 
						||
    try:
 | 
						||
        # 动态导入openpyxl,如果不存在则抛出异常
 | 
						||
        import openpyxl
 | 
						||
    except ImportError:
 | 
						||
        raise Exception("未安装openpyxl库,无法导出Excel格式。请安装后重试: pip install openpyxl")
 | 
						||
 | 
						||
    # 创建工作簿和工作表
 | 
						||
    wb = openpyxl.Workbook()
 | 
						||
    ws = wb.active
 | 
						||
    ws.title = "系统日志"
 | 
						||
 | 
						||
    # 写入标题行
 | 
						||
    headers = ['ID', '用户', '操作类型', '目标类型', '目标ID', 'IP地址', '描述', '创建时间']
 | 
						||
    for col_idx, header in enumerate(headers, 1):
 | 
						||
        ws.cell(row=1, column=col_idx, value=header)
 | 
						||
 | 
						||
    # 写入数据行
 | 
						||
    for row_idx, log in enumerate(logs, 2):
 | 
						||
        username = log.user.username if log.user else "未登录"
 | 
						||
 | 
						||
        ws.cell(row=row_idx, column=1, value=log.id)
 | 
						||
        ws.cell(row=row_idx, column=2, value=username)
 | 
						||
        ws.cell(row=row_idx, column=3, value=log.action)
 | 
						||
        ws.cell(row=row_idx, column=4, value=log.target_type or '')
 | 
						||
        ws.cell(row=row_idx, column=5, value=log.target_id or '')
 | 
						||
        ws.cell(row=row_idx, column=6, value=log.ip_address or '')
 | 
						||
        ws.cell(row=row_idx, column=7, value=log.description or '')
 | 
						||
        ws.cell(row=row_idx, column=8, value=log.created_at.strftime('%Y-%m-%d %H:%M:%S'))
 | 
						||
 | 
						||
    # 调整列宽
 | 
						||
    for col_idx, header in enumerate(headers, 1):
 | 
						||
        column_letter = openpyxl.utils.get_column_letter(col_idx)
 | 
						||
        if header == '描述':
 | 
						||
            ws.column_dimensions[column_letter].width = 40
 | 
						||
        else:
 | 
						||
            ws.column_dimensions[column_letter].width = 15
 | 
						||
 | 
						||
    # 保存到内存
 | 
						||
    output = BytesIO()
 | 
						||
    wb.save(output)
 | 
						||
    output.seek(0)
 | 
						||
 | 
						||
    # 编码为Base64
 | 
						||
    xlsx_data = output.getvalue()
 | 
						||
    b64_encoded = base64.b64encode(xlsx_data).decode('utf-8')
 | 
						||
 | 
						||
    # 设置文件名
 | 
						||
    filename = f"system_logs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
 | 
						||
 | 
						||
    return jsonify({
 | 
						||
        'success': True,
 | 
						||
        'message': f'已生成 {len(logs)} 条日志记录',
 | 
						||
        'count': len(logs),
 | 
						||
        'filename': filename,
 | 
						||
        'filedata': b64_encoded,
 | 
						||
        'filetype': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
 | 
						||
    })
 | 
						||
 | 
						||
 | 
						||
@log_bp.route('/api/clear', methods=['POST'])
 | 
						||
@login_required
 | 
						||
@permission_required('view_logs')  # 替代 @admin_required
 | 
						||
def clear_logs():
 | 
						||
    """清空日志API"""
 | 
						||
    data = request.get_json()
 | 
						||
    days = data.get('days', 0)
 | 
						||
 | 
						||
    try:
 | 
						||
        if days > 0:
 | 
						||
            # 清除指定天数前的日志
 | 
						||
            cutoff_date = datetime.now() - timedelta(days=days)
 | 
						||
            deleted = Log.query.filter(Log.created_at < cutoff_date).delete()
 | 
						||
        else:
 | 
						||
            # 清空全部日志
 | 
						||
            deleted = Log.query.delete()
 | 
						||
 | 
						||
        db.session.commit()
 | 
						||
        return jsonify({
 | 
						||
            'success': True,
 | 
						||
            'message': f'成功清除 {deleted} 条日志记录',
 | 
						||
            'count': deleted
 | 
						||
        })
 | 
						||
    except Exception as e:
 | 
						||
        db.session.rollback()
 | 
						||
        return jsonify({
 | 
						||
            'success': False,
 | 
						||
            'message': f'清除日志失败: {str(e)}'
 | 
						||
        }), 500
 |