from flask import Flask, render_template, session, g, Markup, redirect, url_for from flask_login import LoginManager from app.models.database import db from app.models.user import User from app.controllers.user import user_bp from app.controllers.book import book_bp from app.controllers.borrow import borrow_bp from app.controllers.inventory import inventory_bp from flask_login import LoginManager, current_user from app.controllers.statistics import statistics_bp from app.controllers.announcement import announcement_bp from app.models.notification import Notification from app.controllers.log import log_bp import os from datetime import datetime login_manager = LoginManager() def create_app(config=None): app = Flask(__name__) # 加载默认配置 app.config.from_object('config') # 如果提供了配置对象,则加载它 if config: if isinstance(config, dict): app.config.update(config) else: app.config.from_object(config) # 从环境变量指定的文件加载配置(如果有) app.config.from_envvar('APP_CONFIG_FILE', silent=True) # 初始化数据库 db.init_app(app) # 初始化 Flask-Login login_manager.init_app(app) login_manager.login_view = 'user.login' @login_manager.user_loader def load_user(user_id): return User.query.get(int(user_id)) from app.utils.template_helpers import register_template_helpers # 注册蓝图 register_template_helpers(app) app.register_blueprint(user_bp, url_prefix='/user') app.register_blueprint(book_bp, url_prefix='/book') app.register_blueprint(borrow_bp, url_prefix='/borrow') app.register_blueprint(statistics_bp) app.register_blueprint(inventory_bp) app.register_blueprint(log_bp) app.register_blueprint(announcement_bp, url_prefix='/announcement') # 创建数据库表 with app.app_context(): # 先导入基础模型 from app.models.user import User, Role from app.models.book import Book, Category # 创建表 db.create_all() # 再导入依赖模型 - 但不在这里定义关系 from app.models.borrow import BorrowRecord from app.models.inventory import InventoryLog from app.models.log import Log # 移除这些重复的关系定义 # Book.borrow_records = db.relationship('BorrowRecord', backref='book', lazy='dynamic') # Book.inventory_logs = db.relationship('InventoryLog', backref='book', lazy='dynamic') # Category.books = db.relationship('Book', backref='category', lazy='dynamic') # 创建默认角色 from app.models.user import Role if not Role.query.filter_by(id=1).first(): admin_role = Role(id=1, role_name='管理员', description='系统管理员') db.session.add(admin_role) if not Role.query.filter_by(id=2).first(): user_role = Role(id=2, role_name='普通用户', description='普通用户') db.session.add(user_role) # 创建管理员账号 if not User.query.filter_by(username='admin').first(): admin = User( username='admin', password='admin123', email='admin@example.com', role_id=1, nickname='系统管理员' ) db.session.add(admin) # 创建基础分类 from app.models.book import Category if not Category.query.first(): categories = [ Category(name='文学', sort=1), Category(name='计算机', sort=2), Category(name='历史', sort=3), Category(name='科学', sort=4), Category(name='艺术', sort=5), Category(name='经济', sort=6), Category(name='哲学', sort=7), Category(name='教育', sort=8) ] db.session.add_all(categories) db.session.commit() # 其余代码保持不变... @app.before_request def load_logged_in_user(): user_id = session.get('user_id') if user_id is None: g.user = None else: g.user = User.query.get(user_id) @app.route('/') def index(): from app.models.book import Book from app.models.user import User from app.models.borrow import BorrowRecord from app.models.announcement import Announcement from app.models.notification import Notification from sqlalchemy import func, desc from flask_login import current_user # 获取统计数据 stats = { 'total_books': Book.query.count(), 'total_users': User.query.count(), 'active_borrows': BorrowRecord.query.filter(BorrowRecord.return_date.is_(None)).count(), 'user_borrows': 0 } # 如果用户已登录,获取其待还图书数量 if current_user.is_authenticated: stats['user_borrows'] = BorrowRecord.query.filter( BorrowRecord.user_id == current_user.id, BorrowRecord.return_date.is_(None) ).count() # 获取最新图书 latest_books = Book.query.filter_by(status=1).order_by(Book.created_at.desc()).limit(4).all() # 获取热门图书(根据借阅次数) try: # 这里假设你的数据库中有表记录借阅次数 popular_books_query = db.session.query( Book, func.count(BorrowRecord.id).label('borrow_count') ).join( BorrowRecord, Book.id == BorrowRecord.book_id, isouter=True ).filter( Book.status == 1 ).group_by( Book.id ).order_by( desc('borrow_count') ).limit(5) # 提取图书对象并添加借阅计数 popular_books = [] for book, count in popular_books_query: book.borrow_count = count popular_books.append(book) except Exception as e: # 如果查询有问题,使用最新的书作为备选 popular_books = latest_books.copy() if latest_books else [] print(f"获取热门图书失败: {str(e)}") # 获取最新公告 announcements = Announcement.query.filter_by(status=1).order_by( Announcement.is_top.desc(), Announcement.created_at.desc() ).limit(3).all() now = datetime.now() # 获取用户的未读通知 user_notifications = [] if current_user.is_authenticated: user_notifications = Notification.query.filter_by( user_id=current_user.id, status=0 ).order_by( Notification.created_at.desc() ).limit(5).all() return render_template('index.html', stats=stats, latest_books=latest_books, popular_books=popular_books, announcements=announcements, user_notifications=user_notifications, now=now ) @app.errorhandler(404) def page_not_found(e): return render_template('404.html'), 404 @app.template_filter('nl2br') def nl2br_filter(s): if s: return Markup(s.replace('\n', '
')) return s @app.context_processor def utility_processor(): def get_unread_notifications_count(user_id): if user_id: return Notification.get_unread_count(user_id) return 0 def get_recent_notifications(user_id, limit=5): if user_id: # 按时间倒序获取最近的几条通知 notifications = Notification.query.filter_by(user_id=user_id) \ .order_by(Notification.created_at.desc()) \ .limit(limit) \ .all() return notifications return [] return dict( get_unread_notifications_count=get_unread_notifications_count, get_recent_notifications=get_recent_notifications ) return app @app.context_processor def inject_now(): return {'now': datetime.datetime.now()}