Book_system/app/controllers/announcement.py
2025-05-14 15:08:06 +08:00

326 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Blueprint, render_template, request, redirect, url_for, flash, jsonify
from app.models.announcement import Announcement
from app.models.log import Log
from app.utils.auth import permission_required # 修改导入
from flask_login import login_required, current_user
from datetime import datetime
from app.models.notification import Notification
from app import db # 为mark_all_as_read函数添加db导入
# 创建蓝图
announcement_bp = Blueprint('announcement', __name__)
@announcement_bp.route('/list', methods=['GET'])
def announcement_list():
"""公告列表页面 - 所有用户可见"""
page = request.args.get('page', 1, type=int)
per_page = 10
# 查询活跃的公告
query = Announcement.query.filter_by(status=1).order_by(
Announcement.is_top.desc(),
Announcement.created_at.desc()
)
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
return render_template('announcement/list.html', pagination=pagination)
@announcement_bp.route('/detail/<int:announcement_id>', methods=['GET'])
def announcement_detail(announcement_id):
"""公告详情页面"""
announcement = Announcement.get_announcement_by_id(announcement_id)
if not announcement or announcement.status == 0:
flash('公告不存在或已被删除', 'error')
return redirect(url_for('announcement.announcement_list'))
return render_template('announcement/detail.html', announcement=announcement)
@announcement_bp.route('/manage', methods=['GET'])
@login_required
@permission_required('manage_announcements') # 替代 @admin_required
def manage_announcements():
"""管理员公告管理页面"""
page = request.args.get('page', 1, type=int)
per_page = 10
search = request.args.get('search', '')
status = request.args.get('status', type=int)
# 构建查询
query = Announcement.query
# 搜索过滤
if search:
query = query.filter(Announcement.title.like(f'%{search}%'))
# 状态过滤
if status is not None:
query = query.filter(Announcement.status == status)
# 排序
query = query.order_by(
Announcement.is_top.desc(),
Announcement.created_at.desc()
)
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
# 记录访问日志
Log.add_log(
action="访问公告管理",
user_id=current_user.id,
ip_address=request.remote_addr,
description=f"管理员 {current_user.username} 访问公告管理页面"
)
return render_template(
'announcement/manage.html',
pagination=pagination,
search=search,
status=status
)
@announcement_bp.route('/add', methods=['GET', 'POST'])
@login_required
@permission_required('manage_announcements') # 替代 @admin_required
def add_announcement():
"""添加公告"""
if request.method == 'POST':
title = request.form.get('title')
content = request.form.get('content')
is_top = request.form.get('is_top') == 'on'
if not title or not content:
flash('标题和内容不能为空', 'error')
return render_template('announcement/add.html')
success, result = Announcement.create_announcement(
title=title,
content=content,
publisher_id=current_user.id,
is_top=is_top
)
if success:
# 记录操作日志
Log.add_log(
action="添加公告",
user_id=current_user.id,
target_type="公告",
target_id=result.id,
ip_address=request.remote_addr,
description=f"管理员 {current_user.username} 添加了新公告: {title}"
)
flash('公告发布成功', 'success')
return redirect(url_for('announcement.manage_announcements'))
else:
flash(f'公告发布失败: {result}', 'error')
return render_template('announcement/add.html')
return render_template('announcement/add.html')
@announcement_bp.route('/edit/<int:announcement_id>', methods=['GET', 'POST'])
@login_required
@permission_required('manage_announcements') # 替代 @admin_required
def edit_announcement(announcement_id):
"""编辑公告"""
announcement = Announcement.get_announcement_by_id(announcement_id)
if not announcement:
flash('公告不存在', 'error')
return redirect(url_for('announcement.manage_announcements'))
if request.method == 'POST':
title = request.form.get('title')
content = request.form.get('content')
is_top = request.form.get('is_top') == 'on'
if not title or not content:
flash('标题和内容不能为空', 'error')
return render_template('announcement/edit.html', announcement=announcement)
success, result = Announcement.update_announcement(
announcement_id=announcement_id,
title=title,
content=content,
is_top=is_top
)
if success:
# 记录操作日志
Log.add_log(
action="编辑公告",
user_id=current_user.id,
target_type="公告",
target_id=announcement_id,
ip_address=request.remote_addr,
description=f"管理员 {current_user.username} 编辑了公告: {title}"
)
flash('公告更新成功', 'success')
return redirect(url_for('announcement.manage_announcements'))
else:
flash(f'公告更新失败: {result}', 'error')
return render_template('announcement/edit.html', announcement=announcement)
return render_template('announcement/edit.html', announcement=announcement)
@announcement_bp.route('/status/<int:announcement_id>', methods=['POST'])
@login_required
@permission_required('manage_announcements') # 替代 @admin_required
def change_status(announcement_id):
"""更改公告状态"""
data = request.get_json()
status = data.get('status')
if status is None or status not in [0, 1]:
return jsonify({'success': False, 'message': '无效的状态值'})
# 查询公告获取标题(用于日志)
announcement = Announcement.get_announcement_by_id(announcement_id)
if not announcement:
return jsonify({'success': False, 'message': '公告不存在'})
success, message = Announcement.change_status(announcement_id, status)
if success:
# 记录状态变更日志
status_text = "发布" if status == 1 else "撤销"
Log.add_log(
action=f"公告{status_text}",
user_id=current_user.id,
target_type="公告",
target_id=announcement_id,
ip_address=request.remote_addr,
description=f"管理员 {current_user.username} {status_text}公告: {announcement.title}"
)
return jsonify({'success': True, 'message': f'公告已{status_text}'})
else:
return jsonify({'success': False, 'message': message})
@announcement_bp.route('/top/<int:announcement_id>', methods=['POST'])
@login_required
@permission_required('manage_announcements') # 替代 @admin_required
def change_top_status(announcement_id):
"""更改公告置顶状态"""
data = request.get_json()
is_top = data.get('is_top')
if is_top is None:
return jsonify({'success': False, 'message': '无效的置顶状态'})
# 查询公告获取标题(用于日志)
announcement = Announcement.get_announcement_by_id(announcement_id)
if not announcement:
return jsonify({'success': False, 'message': '公告不存在'})
success, message = Announcement.change_top_status(announcement_id, is_top)
if success:
# 记录置顶状态变更日志
action_text = "置顶" if is_top else "取消置顶"
Log.add_log(
action=f"公告{action_text}",
user_id=current_user.id,
target_type="公告",
target_id=announcement_id,
ip_address=request.remote_addr,
description=f"管理员 {current_user.username} {action_text}公告: {announcement.title}"
)
return jsonify({'success': True, 'message': f'公告已{action_text}'})
else:
return jsonify({'success': False, 'message': message})
@announcement_bp.route('/latest', methods=['GET'])
def get_latest_announcements():
"""获取最新公告列表用于首页和API"""
limit = request.args.get('limit', 5, type=int)
announcements = Announcement.get_active_announcements(limit=limit)
return jsonify({
'success': True,
'announcements': [announcement.to_dict() for announcement in announcements]
})
@announcement_bp.route('/notifications')
@login_required
def user_notifications():
"""用户个人通知列表页面"""
page = request.args.get('page', 1, type=int)
per_page = 10
unread_only = request.args.get('unread_only') == '1'
pagination = Notification.get_user_notifications(
user_id=current_user.id,
page=page,
per_page=per_page,
unread_only=unread_only
)
return render_template(
'announcement/notifications.html',
pagination=pagination,
unread_only=unread_only
)
@announcement_bp.route('/notification/<int:notification_id>')
@login_required
def view_notification(notification_id):
"""查看单条通知"""
notification = Notification.query.get_or_404(notification_id)
# 检查权限 - 只能查看自己的通知
if notification.user_id != current_user.id:
flash('您无权查看此通知', 'error')
return redirect(url_for('announcement.user_notifications'))
# 标记为已读
if notification.status == 0:
Notification.mark_as_read(notification_id, current_user.id)
# 如果是借阅类型的通知,可能需要跳转到相关页面
if notification.type == 'borrow' and 'borrow_id' in notification.content:
# 这里可以解析content获取borrow_id然后重定向
pass
return render_template('announcement/notification_detail.html', notification=notification)
@announcement_bp.route('/notifications/mark-all-read')
@login_required
def mark_all_as_read():
"""标记所有通知为已读"""
try:
# 获取所有未读通知
unread_notifications = Notification.query.filter_by(
user_id=current_user.id,
status=0
).all()
# 标记为已读
for notification in unread_notifications:
notification.status = 1
notification.read_at = datetime.now()
db.session.commit()
flash('所有通知已标记为已读', 'success')
except Exception as e:
db.session.rollback()
flash(f'操作失败: {str(e)}', 'error')
return redirect(url_for('announcement.user_notifications'))