from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, g, jsonify from app.models.book import Book, Category from app.models.user import db from app.utils.auth import login_required, admin_required import os from werkzeug.utils import secure_filename import datetime import pandas as pd import uuid book_bp = Blueprint('book', __name__) @book_bp.route('/admin/list') @login_required @admin_required def admin_book_list(): page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) # 只显示状态为1的图书(未下架的图书) query = Book.query.filter_by(status=1) # 搜索功能 search = request.args.get('search', '') if search: query = query.filter( (Book.title.contains(search)) | (Book.author.contains(search)) | (Book.isbn.contains(search)) ) # 分类筛选 category_id = request.args.get('category_id', type=int) if category_id: query = query.filter_by(category_id=category_id) # 排序 sort = request.args.get('sort', 'id') order = request.args.get('order', 'desc') if order == 'desc': query = query.order_by(getattr(Book, sort).desc()) else: query = query.order_by(getattr(Book, sort)) pagination = query.paginate(page=page, per_page=per_page) books = pagination.items # 获取所有分类供筛选使用 categories = Category.query.all() return render_template('book/list.html', books=books, pagination=pagination, search=search, categories=categories, category_id=category_id, sort=sort, order=order, current_user=g.user, is_admin_view=True) # 指明这是管理视图 # 图书列表页面 @book_bp.route('/list') @login_required def book_list(): print("访问图书列表页面") # 调试输出 page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) # 只显示状态为1的图书(未下架的图书) query = Book.query.filter_by(status=1) # 搜索功能 search = request.args.get('search', '') if search: query = query.filter( (Book.title.contains(search)) | (Book.author.contains(search)) | (Book.isbn.contains(search)) ) # 分类筛选 category_id = request.args.get('category_id', type=int) if category_id: query = query.filter_by(category_id=category_id) # 排序 sort = request.args.get('sort', 'id') order = request.args.get('order', 'desc') if order == 'desc': query = query.order_by(getattr(Book, sort).desc()) else: query = query.order_by(getattr(Book, sort)) pagination = query.paginate(page=page, per_page=per_page) books = pagination.items # 获取所有分类供筛选使用 categories = Category.query.all() return render_template('book/list.html', books=books, pagination=pagination, search=search, categories=categories, category_id=category_id, sort=sort, order=order, current_user=g.user) # 图书详情页面 @book_bp.route('/detail/') @login_required def book_detail(book_id): book = Book.query.get_or_404(book_id) # 添加当前时间用于判断借阅是否逾期 now = datetime.datetime.now() # 如果用户是管理员,预先查询并排序借阅记录 borrow_records = [] if g.user.role_id == 1: # 假设 role_id 1 为管理员 from app.models.borrow import BorrowRecord borrow_records = BorrowRecord.query.filter_by(book_id=book_id).order_by(BorrowRecord.borrow_date.desc()).limit( 10).all() return render_template( 'book/detail.html', book=book, current_user=g.user, borrow_records=borrow_records, now=now ) # 添加图书页面 # 添加图书页面 @book_bp.route('/add', methods=['GET', 'POST']) @login_required @admin_required def add_book(): if request.method == 'POST': title = request.form.get('title') author = request.form.get('author') publisher = request.form.get('publisher') category_id = request.form.get('category_id') tags = request.form.get('tags') isbn = request.form.get('isbn') publish_year = request.form.get('publish_year') description = request.form.get('description') stock = request.form.get('stock', type=int, default=0) price = request.form.get('price') # 表单验证 errors = [] if not title: errors.append('书名不能为空') if not author: errors.append('作者不能为空') # 检查ISBN是否已存在(如果提供了ISBN) if isbn: existing_book = Book.query.filter_by(isbn=isbn).first() if existing_book: errors.append(f'ISBN "{isbn}" 已存在,请检查ISBN或查找现有图书') if errors: for error in errors: flash(error, 'danger') categories = Category.query.all() # 保留已填写的表单数据 book_data = { 'title': title, 'author': author, 'publisher': publisher, 'category_id': category_id, 'tags': tags, 'isbn': isbn, 'publish_year': publish_year, 'description': description, 'stock': stock, 'price': price } return render_template('book/add.html', categories=categories, current_user=g.user, book=book_data) # 处理封面图片上传 cover_url = None if 'cover' in request.files: cover_file = request.files['cover'] if cover_file and cover_file.filename != '': try: # 更清晰的文件命名 original_filename = secure_filename(cover_file.filename) # 保留原始文件扩展名 _, ext = os.path.splitext(original_filename) if not ext: ext = '.jpg' # 默认扩展名 filename = f"{uuid.uuid4()}{ext}" upload_folder = os.path.join(current_app.static_folder, 'uploads', 'covers') # 确保上传目录存在 if not os.path.exists(upload_folder): os.makedirs(upload_folder) file_path = os.path.join(upload_folder, filename) cover_file.save(file_path) cover_url = f'/static/uploads/covers/{filename}' except Exception as e: current_app.logger.error(f"封面上传失败: {str(e)}") flash(f"封面上传失败: {str(e)}", 'warning') try: # 创建新图书 book = Book( title=title, author=author, publisher=publisher, category_id=category_id, tags=tags, isbn=isbn, publish_year=publish_year, description=description, cover_url=cover_url, stock=stock, price=price, status=1, created_at=datetime.datetime.now(), updated_at=datetime.datetime.now() ) db.session.add(book) # 先提交以获取book的id db.session.commit() # 记录库存日志 - 在获取 book.id 后 if stock and int(stock) > 0: from app.models.inventory import InventoryLog inventory_log = InventoryLog( book_id=book.id, change_type='入库', change_amount=stock, after_stock=stock, operator_id=g.user.id, remark='新书入库', changed_at=datetime.datetime.now() ) db.session.add(inventory_log) db.session.commit() flash(f'《{title}》添加成功', 'success') return redirect(url_for('book.book_list')) except Exception as e: db.session.rollback() error_msg = str(e) # 记录详细错误日志 current_app.logger.error(f"添加图书失败: {error_msg}") flash(f'添加图书失败: {error_msg}', 'danger') categories = Category.query.all() # 保留已填写的表单数据 book_data = { 'title': title, 'author': author, 'publisher': publisher, 'category_id': category_id, 'tags': tags, 'isbn': isbn, 'publish_year': publish_year, 'description': description, 'stock': stock, 'price': price } return render_template('book/add.html', categories=categories, current_user=g.user, book=book_data) categories = Category.query.all() return render_template('book/add.html', categories=categories, current_user=g.user) # 编辑图书 @book_bp.route('/edit/', methods=['GET', 'POST']) @login_required @admin_required def edit_book(book_id): book = Book.query.get_or_404(book_id) if request.method == 'POST': title = request.form.get('title') author = request.form.get('author') publisher = request.form.get('publisher') category_id = request.form.get('category_id') tags = request.form.get('tags') isbn = request.form.get('isbn') publish_year = request.form.get('publish_year') description = request.form.get('description') price = request.form.get('price') status = request.form.get('status', type=int) if not title or not author: flash('书名和作者不能为空', 'danger') categories = Category.query.all() return render_template('book/edit.html', book=book, categories=categories, current_user=g.user) # 处理库存变更 new_stock = request.form.get('stock', type=int) if new_stock != book.stock: from app.models.inventory import InventoryLog change_amount = new_stock - book.stock change_type = '入库' if change_amount > 0 else '出库' inventory_log = InventoryLog( book_id=book.id, change_type=change_type, change_amount=abs(change_amount), after_stock=new_stock, operator_id=g.user.id, remark=f'管理员编辑图书库存 - {book.title}', changed_at=datetime.datetime.now() ) db.session.add(inventory_log) book.stock = new_stock # 处理封面图片上传 if 'cover' in request.files: cover_file = request.files['cover'] if cover_file and cover_file.filename != '': filename = secure_filename(f"{uuid.uuid4()}_{cover_file.filename}") upload_folder = os.path.join(current_app.static_folder, 'uploads/covers') # 确保上传目录存在 if not os.path.exists(upload_folder): os.makedirs(upload_folder) file_path = os.path.join(upload_folder, filename) cover_file.save(file_path) book.cover_url = f'/static/covers/{filename}' # 更新图书信息 book.title = title book.author = author book.publisher = publisher book.category_id = category_id book.tags = tags book.isbn = isbn book.publish_year = publish_year book.description = description book.price = price book.status = status book.updated_at = datetime.datetime.now() db.session.commit() flash('图书信息更新成功', 'success') return redirect(url_for('book.book_list')) categories = Category.query.all() return render_template('book/edit.html', book=book, categories=categories, current_user=g.user) # 删除图书 @book_bp.route('/delete/', methods=['POST']) @login_required @admin_required def delete_book(book_id): book = Book.query.get_or_404(book_id) # 检查该书是否有借阅记录 from app.models.borrow import BorrowRecord active_borrows = BorrowRecord.query.filter_by(book_id=book_id, status=1).count() if active_borrows > 0: return jsonify({'success': False, 'message': '该图书有未归还的借阅记录,无法删除'}) # 考虑软删除而不是物理删除 book.status = 0 # 0表示已删除/下架 book.updated_at = datetime.datetime.now() db.session.commit() return jsonify({'success': True, 'message': '图书已成功下架'}) # 图书分类管理 @book_bp.route('/categories', methods=['GET']) @login_required @admin_required def category_list(): categories = Category.query.all() return render_template('book/categories.html', categories=categories, current_user=g.user) # 添加分类 @book_bp.route('/categories/add', methods=['POST']) @login_required @admin_required def add_category(): name = request.form.get('name') parent_id = request.form.get('parent_id') or None sort = request.form.get('sort', 0, type=int) if not name: return jsonify({'success': False, 'message': '分类名称不能为空'}) category = Category(name=name, parent_id=parent_id, sort=sort) db.session.add(category) db.session.commit() return jsonify({'success': True, 'message': '分类添加成功', 'id': category.id, 'name': category.name}) # 编辑分类 @book_bp.route('/categories/edit/', methods=['POST']) @login_required @admin_required def edit_category(category_id): category = Category.query.get_or_404(category_id) name = request.form.get('name') parent_id = request.form.get('parent_id') or None sort = request.form.get('sort', 0, type=int) if not name: return jsonify({'success': False, 'message': '分类名称不能为空'}) category.name = name category.parent_id = parent_id category.sort = sort db.session.commit() return jsonify({'success': True, 'message': '分类更新成功'}) # 删除分类 @book_bp.route('/categories/delete/', methods=['POST']) @login_required @admin_required def delete_category(category_id): category = Category.query.get_or_404(category_id) # 检查是否有书籍使用此分类 books_count = Book.query.filter_by(category_id=category_id).count() if books_count > 0: return jsonify({'success': False, 'message': f'该分类下有{books_count}本图书,无法删除'}) # 检查是否有子分类 children_count = Category.query.filter_by(parent_id=category_id).count() if children_count > 0: return jsonify({'success': False, 'message': f'该分类下有{children_count}个子分类,无法删除'}) db.session.delete(category) db.session.commit() return jsonify({'success': True, 'message': '分类删除成功'}) # 批量导入图书 @book_bp.route('/import', methods=['GET', 'POST']) @login_required @admin_required def import_books(): if request.method == 'POST': if 'file' not in request.files: flash('未选择文件', 'danger') return redirect(request.url) file = request.files['file'] if file.filename == '': flash('未选择文件', 'danger') return redirect(request.url) if file and file.filename.endswith(('.xlsx', '.xls')): try: # 读取Excel文件 df = pd.read_excel(file) success_count = 0 error_count = 0 errors = [] # 处理每一行数据 for index, row in df.iterrows(): try: # 检查必填字段 if pd.isna(row.get('title')) or pd.isna(row.get('author')): errors.append(f'第{index + 2}行: 书名或作者为空') error_count += 1 continue # 检查ISBN是否已存在 isbn = row.get('isbn') if isbn and not pd.isna(isbn) and Book.query.filter_by(isbn=str(isbn)).first(): errors.append(f'第{index + 2}行: ISBN {isbn} 已存在') error_count += 1 continue # 创建新书籍记录 book = Book( title=row.get('title'), author=row.get('author'), publisher=row.get('publisher') if not pd.isna(row.get('publisher')) else None, category_id=row.get('category_id') if not pd.isna(row.get('category_id')) else None, tags=row.get('tags') if not pd.isna(row.get('tags')) else None, isbn=str(row.get('isbn')) if not pd.isna(row.get('isbn')) else None, publish_year=str(row.get('publish_year')) if not pd.isna(row.get('publish_year')) else None, description=row.get('description') if not pd.isna(row.get('description')) else None, cover_url=row.get('cover_url') if not pd.isna(row.get('cover_url')) else None, stock=int(row.get('stock')) if not pd.isna(row.get('stock')) else 0, price=float(row.get('price')) if not pd.isna(row.get('price')) else None, status=1, created_at=datetime.datetime.now(), updated_at=datetime.datetime.now() ) db.session.add(book) # 提交以获取book的id db.session.flush() # 创建库存日志 if book.stock > 0: from app.models.inventory import InventoryLog inventory_log = InventoryLog( book_id=book.id, change_type='入库', change_amount=book.stock, after_stock=book.stock, operator_id=g.user.id, remark='批量导入图书', changed_at=datetime.datetime.now() ) db.session.add(inventory_log) success_count += 1 except Exception as e: errors.append(f'第{index + 2}行: {str(e)}') error_count += 1 db.session.commit() flash(f'导入完成: 成功{success_count}条,失败{error_count}条', 'info') if errors: flash('
'.join(errors[:10]) + (f'
...等共{len(errors)}个错误' if len(errors) > 10 else ''), 'warning') return redirect(url_for('book.book_list')) except Exception as e: flash(f'导入失败: {str(e)}', 'danger') return redirect(request.url) else: flash('只支持Excel文件(.xlsx, .xls)', 'danger') return redirect(request.url) return render_template('book/import.html', current_user=g.user) # 导出图书 @book_bp.route('/export') @login_required @admin_required def export_books(): # 获取查询参数 search = request.args.get('search', '') category_id = request.args.get('category_id', type=int) query = Book.query if search: query = query.filter( (Book.title.contains(search)) | (Book.author.contains(search)) | (Book.isbn.contains(search)) ) if category_id: query = query.filter_by(category_id=category_id) books = query.all() # 创建DataFrame data = [] for book in books: category_name = book.category.name if book.category else "" data.append({ 'id': book.id, 'title': book.title, 'author': book.author, 'publisher': book.publisher, 'category': category_name, 'tags': book.tags, 'isbn': book.isbn, 'publish_year': book.publish_year, 'description': book.description, 'stock': book.stock, 'price': book.price, 'status': '上架' if book.status == 1 else '下架', 'created_at': book.created_at.strftime('%Y-%m-%d %H:%M:%S') if book.created_at else '', 'updated_at': book.updated_at.strftime('%Y-%m-%d %H:%M:%S') if book.updated_at else '' }) df = pd.DataFrame(data) # 创建临时文件 timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S') filename = f'books_export_{timestamp}.xlsx' filepath = os.path.join(current_app.static_folder, 'temp', filename) # 确保目录存在 os.makedirs(os.path.dirname(filepath), exist_ok=True) # 写入Excel df.to_excel(filepath, index=False) # 提供下载链接 return redirect(url_for('static', filename=f'temp/{filename}'))