2025-05-17 00:11:20 +08:00

287 lines
9.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Blueprint, render_template, request, jsonify
from flask_login import current_user, login_required
from app.models.log import Log
from app.models.user import User, db # 导入db
from app.utils.auth import permission_required # 更改为导入permission_required装饰器
from datetime import datetime, timedelta
# 创建蓝图
log_bp = Blueprint('log', __name__, url_prefix='/log')
@log_bp.route('/list')
@login_required
@permission_required('view_logs') # 替代 @admin_required
def log_list():
"""日志列表页面"""
# 获取筛选参数
page = request.args.get('page', 1, type=int)
user_id = request.args.get('user_id', type=int)
action = request.args.get('action')
target_type = request.args.get('target_type')
# 处理日期范围参数
date_range = request.args.get('date_range', '7') # 默认显示7天内的日志
end_date = datetime.now()
start_date = None
if date_range == '1':
start_date = end_date - timedelta(days=1)
elif date_range == '7':
start_date = end_date - timedelta(days=7)
elif date_range == '30':
start_date = end_date - timedelta(days=30)
elif date_range == 'custom':
start_date_str = request.args.get('start_date')
end_date_str = request.args.get('end_date')
if start_date_str:
start_date = datetime.strptime(start_date_str, '%Y-%m-%d')
if end_date_str:
end_date = datetime.strptime(end_date_str + ' 23:59:59', '%Y-%m-%d %H:%M:%S')
# 获取分页数据
pagination = Log.get_logs(
page=page,
per_page=20,
user_id=user_id,
action=action,
target_type=target_type,
start_date=start_date,
end_date=end_date
)
# 获取用户列表和操作类型列表,用于筛选
users = User.query.all()
# 统计各类操作的数量
action_types = db.session.query(Log.action, db.func.count(Log.id)) \
.group_by(Log.action).all()
target_types = db.session.query(Log.target_type, db.func.count(Log.id)) \
.filter(Log.target_type != None) \
.group_by(Log.target_type).all()
return render_template(
'log/list.html',
pagination=pagination,
users=users,
action_types=action_types,
target_types=target_types,
filters={
'user_id': user_id,
'action': action,
'target_type': target_type,
'date_range': date_range,
'start_date': start_date.strftime('%Y-%m-%d') if start_date else '',
'end_date': end_date.strftime('%Y-%m-%d') if end_date != datetime.now() else ''
}
)
@log_bp.route('/detail/<int:log_id>')
@login_required
@permission_required('view_logs') # 替代 @admin_required
def log_detail(log_id):
"""日志详情页面"""
log = Log.query.get_or_404(log_id)
return render_template('log/detail.html', log=log)
@log_bp.route('/api/export', methods=['POST'])
@login_required
@permission_required('view_logs') # 替代 @admin_required
def export_logs():
"""导出日志API"""
data = request.get_json()
user_id = data.get('user_id')
action = data.get('action')
target_type = data.get('target_type')
start_date_str = data.get('start_date')
end_date_str = data.get('end_date')
export_format = data.get('format', 'csv') # 获取导出格式参数
# 处理日期范围
start_date = None
end_date = datetime.now()
if start_date_str:
start_date = datetime.strptime(start_date_str, '%Y-%m-%d')
if end_date_str:
end_date = datetime.strptime(end_date_str + ' 23:59:59', '%Y-%m-%d %H:%M:%S')
# 查询日志
query = Log.query.order_by(Log.created_at.desc())
if user_id:
query = query.filter(Log.user_id == user_id)
if action:
query = query.filter(Log.action == action)
if target_type:
query = query.filter(Log.target_type == target_type)
if start_date:
query = query.filter(Log.created_at >= start_date)
if end_date:
query = query.filter(Log.created_at <= end_date)
logs = query.all()
try:
# 根据格式选择导出方法
if export_format == 'xlsx':
return export_as_xlsx(logs)
else:
return export_as_csv(logs)
except Exception as e:
# 记录错误以便调试
import traceback
error_details = traceback.format_exc()
current_app.logger.error(f"Export error: {str(e)}\n{error_details}")
return jsonify({
'success': False,
'message': f'导出失败: {str(e)}'
}), 500
def export_as_csv(logs):
"""导出为CSV格式"""
import csv
from io import StringIO
import base64
# 创建CSV文件
output = StringIO()
output.write('\ufeff') # 添加BOM标记解决Excel中文乱码
csv_writer = csv.writer(output)
# 写入标题行
csv_writer.writerow(['ID', '用户', '操作类型', '目标类型', '目标ID', 'IP地址', '描述', '创建时间'])
# 写入数据行
for log in logs:
username = log.user.username if log.user else "未登录"
csv_writer.writerow([
log.id,
username,
log.action,
log.target_type or '',
log.target_id or '',
log.ip_address or '',
log.description or '',
log.created_at.strftime('%Y-%m-%d %H:%M:%S')
])
# 获取CSV字符串并进行Base64编码
csv_string = output.getvalue()
csv_bytes = csv_string.encode('utf-8')
b64_encoded = base64.b64encode(csv_bytes).decode('utf-8')
# 设置文件名
filename = f"system_logs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
return jsonify({
'success': True,
'message': f'已生成 {len(logs)} 条日志记录',
'count': len(logs),
'filename': filename,
'filedata': b64_encoded,
'filetype': 'text/csv'
})
def export_as_xlsx(logs):
"""导出为XLSX格式"""
import base64
from io import BytesIO
try:
# 动态导入openpyxl如果不存在则抛出异常
import openpyxl
except ImportError:
raise Exception("未安装openpyxl库无法导出Excel格式。请安装后重试: pip install openpyxl")
# 创建工作簿和工作表
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "系统日志"
# 写入标题行
headers = ['ID', '用户', '操作类型', '目标类型', '目标ID', 'IP地址', '描述', '创建时间']
for col_idx, header in enumerate(headers, 1):
ws.cell(row=1, column=col_idx, value=header)
# 写入数据行
for row_idx, log in enumerate(logs, 2):
username = log.user.username if log.user else "未登录"
ws.cell(row=row_idx, column=1, value=log.id)
ws.cell(row=row_idx, column=2, value=username)
ws.cell(row=row_idx, column=3, value=log.action)
ws.cell(row=row_idx, column=4, value=log.target_type or '')
ws.cell(row=row_idx, column=5, value=log.target_id or '')
ws.cell(row=row_idx, column=6, value=log.ip_address or '')
ws.cell(row=row_idx, column=7, value=log.description or '')
ws.cell(row=row_idx, column=8, value=log.created_at.strftime('%Y-%m-%d %H:%M:%S'))
# 调整列宽
for col_idx, header in enumerate(headers, 1):
column_letter = openpyxl.utils.get_column_letter(col_idx)
if header == '描述':
ws.column_dimensions[column_letter].width = 40
else:
ws.column_dimensions[column_letter].width = 15
# 保存到内存
output = BytesIO()
wb.save(output)
output.seek(0)
# 编码为Base64
xlsx_data = output.getvalue()
b64_encoded = base64.b64encode(xlsx_data).decode('utf-8')
# 设置文件名
filename = f"system_logs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
return jsonify({
'success': True,
'message': f'已生成 {len(logs)} 条日志记录',
'count': len(logs),
'filename': filename,
'filedata': b64_encoded,
'filetype': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
})
@log_bp.route('/api/clear', methods=['POST'])
@login_required
@permission_required('view_logs') # 替代 @admin_required
def clear_logs():
"""清空日志API"""
data = request.get_json()
days = data.get('days', 0)
try:
if days > 0:
# 清除指定天数前的日志
cutoff_date = datetime.now() - timedelta(days=days)
deleted = Log.query.filter(Log.created_at < cutoff_date).delete()
else:
# 清空全部日志
deleted = Log.query.delete()
db.session.commit()
return jsonify({
'success': True,
'message': f'成功清除 {deleted} 条日志记录',
'count': deleted
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'message': f'清除日志失败: {str(e)}'
}), 500