from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, jsonify, send_file from app.models.book import Book, Category from app.models.user import db from app.utils.auth import login_required, permission_required # 修改导入,替换admin_required为permission_required from flask_login import current_user import os from werkzeug.utils import secure_filename import datetime import pandas as pd import uuid from app.models.log import Log from io import BytesIO import xlsxwriter from sqlalchemy import text book_bp = Blueprint('book', __name__) @book_bp.route('/admin/list') @login_required @permission_required('manage_books') # 替换 @admin_required def admin_book_list(): print(f"DEBUG: admin_book_list 函数被调用,用户={current_user.username},认证状态={current_user.is_authenticated}") page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) # 只显示状态为1的图书(未下架的图书) query = Book.query.filter_by(status=1) # 搜索功能 search = request.args.get('search', '') if search: query = query.filter( (Book.title.contains(search)) | (Book.author.contains(search)) | (Book.isbn.contains(search)) ) # 分类筛选 category_id = request.args.get('category_id', type=int) if category_id: query = query.filter_by(category_id=category_id) # 排序 sort = request.args.get('sort', 'id') order = request.args.get('order', 'desc') if order == 'desc': query = query.order_by(getattr(Book, sort).desc()) else: query = query.order_by(getattr(Book, sort)) pagination = query.paginate(page=page, per_page=per_page) books = pagination.items # 获取所有分类供筛选使用 categories = Category.query.all() # 记录访问日志 Log.add_log( action='访问管理图书列表', user_id=current_user.id, ip_address=request.remote_addr, description=f"查询条件: 搜索={search}, 分类={category_id}, 排序={sort} {order}" ) return render_template('book/list.html', books=books, pagination=pagination, search=search, categories=categories, category_id=category_id, sort=sort, order=order, current_user=current_user, is_admin_view=True) # 图书列表页面 - 不需要修改,已经只有@login_required @book_bp.route('/list') @login_required def book_list(): print("访问图书列表页面") # 调试输出 page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) # 只显示状态为1的图书(未下架的图书) query = Book.query.filter_by(status=1) # 搜索功能 search = request.args.get('search', '') if search: query = query.filter( (Book.title.contains(search)) | (Book.author.contains(search)) | (Book.isbn.contains(search)) ) # 分类筛选 category_id = request.args.get('category_id', type=int) if category_id: query = query.filter_by(category_id=category_id) # 排序 sort = request.args.get('sort', 'id') order = request.args.get('order', 'desc') if order == 'desc': query = query.order_by(getattr(Book, sort).desc()) else: query = query.order_by(getattr(Book, sort)) pagination = query.paginate(page=page, per_page=per_page) books = pagination.items # 获取所有分类供筛选使用 categories = Category.query.all() # 记录访问日志 Log.add_log( action='访问图书列表', user_id=current_user.id, ip_address=request.remote_addr, description=f"查询条件: 搜索={search}, 分类={category_id}, 排序={sort} {order}" ) return render_template('book/list.html', books=books, pagination=pagination, search=search, categories=categories, category_id=category_id, sort=sort, order=order, current_user=current_user) # 图书详情页面 - 不需要修改,已经只有@login_required @book_bp.route('/detail/') @login_required def book_detail(book_id): book = Book.query.get_or_404(book_id) # 添加当前时间用于判断借阅是否逾期 now = datetime.datetime.now() # 如果用户是管理员,预先查询并排序借阅记录 borrow_records = [] # 使用current_user代替g.user if current_user.is_authenticated and current_user.role_id == 1: from app.models.borrow import BorrowRecord borrow_records = BorrowRecord.query.filter_by(book_id=book_id).order_by(BorrowRecord.borrow_date.desc()).limit( 10).all() # 记录访问日志 Log.add_log( action='查看图书详情', user_id=current_user.id, target_type='book', target_id=book_id, ip_address=request.remote_addr, description=f"查看图书: {book.title}" ) return render_template( 'book/detail.html', book=book, current_user=current_user, borrow_records=borrow_records, now=now ) # 添加图书页面 @book_bp.route('/add', methods=['GET', 'POST']) @login_required @permission_required('manage_books') # 替换 @admin_required def add_book(): if request.method == 'POST': title = request.form.get('title') author = request.form.get('author') publisher = request.form.get('publisher') category_id = request.form.get('category_id') tags = request.form.get('tags') isbn = request.form.get('isbn') publish_year = request.form.get('publish_year') description = request.form.get('description') stock = request.form.get('stock', type=int, default=0) price = request.form.get('price') # 表单验证 errors = [] if not title: errors.append('书名不能为空') if not author: errors.append('作者不能为空') # 检查ISBN是否已存在(如果提供了ISBN) if isbn: existing_book = Book.query.filter_by(isbn=isbn).first() if existing_book: errors.append(f'ISBN "{isbn}" 已存在,请检查ISBN或查找现有图书') if errors: for error in errors: flash(error, 'danger') categories = Category.query.all() # 保留已填写的表单数据 book_data = { 'title': title, 'author': author, 'publisher': publisher, 'category_id': category_id, 'tags': tags, 'isbn': isbn, 'publish_year': publish_year, 'description': description, 'stock': stock, 'price': price } return render_template('book/add.html', categories=categories, current_user=current_user, book=book_data) # 处理封面图片上传 cover_url = None if 'cover' in request.files: cover_file = request.files['cover'] if cover_file and cover_file.filename != '': try: # 更清晰的文件命名 original_filename = secure_filename(cover_file.filename) # 保留原始文件扩展名 _, ext = os.path.splitext(original_filename) if not ext: ext = '.jpg' # 默认扩展名 filename = f"{uuid.uuid4()}{ext}" upload_folder = os.path.join(current_app.static_folder, 'covers') # 确保上传目录存在 if not os.path.exists(upload_folder): os.makedirs(upload_folder) file_path = os.path.join(upload_folder, filename) cover_file.save(file_path) cover_url = f'/static/covers/{filename}' except Exception as e: current_app.logger.error(f"封面上传失败: {str(e)}") flash(f"封面上传失败: {str(e)}", 'warning') try: # 创建新图书 book = Book( title=title, author=author, publisher=publisher, category_id=category_id, tags=tags, isbn=isbn, publish_year=publish_year, description=description, cover_url=cover_url, stock=stock, price=price, status=1, created_at=datetime.datetime.now(), updated_at=datetime.datetime.now() ) db.session.add(book) # 先提交以获取book的id db.session.commit() # 记录库存日志 - 在获取 book.id 后 if stock and int(stock) > 0: from app.models.inventory import InventoryLog inventory_log = InventoryLog( book_id=book.id, change_type='入库', change_amount=stock, after_stock=stock, operator_id=current_user.id, remark='新书入库', changed_at=datetime.datetime.now() ) db.session.add(inventory_log) db.session.commit() # 记录操作日志 Log.add_log( action='添加图书', user_id=current_user.id, target_type='book', target_id=book.id, ip_address=request.remote_addr, description=f"添加图书: {title}, ISBN: {isbn}, 初始库存: {stock}" ) flash(f'《{title}》添加成功', 'success') return redirect(url_for('book.book_list')) except Exception as e: db.session.rollback() error_msg = str(e) # 记录详细错误日志 current_app.logger.error(f"添加图书失败: {error_msg}") # 记录操作失败日志 Log.add_log( action='添加图书失败', user_id=current_user.id, ip_address=request.remote_addr, description=f"添加图书失败: {title}, 错误: {error_msg}" ) flash(f'添加图书失败: {error_msg}', 'danger') categories = Category.query.all() # 保留已填写的表单数据 book_data = { 'title': title, 'author': author, 'publisher': publisher, 'category_id': category_id, 'tags': tags, 'isbn': isbn, 'publish_year': publish_year, 'description': description, 'stock': stock, 'price': price } return render_template('book/add.html', categories=categories, current_user=current_user, book=book_data) categories = Category.query.all() return render_template('book/add.html', categories=categories, current_user=current_user) # 编辑图书 @book_bp.route('/edit/', methods=['GET', 'POST']) @login_required @permission_required('manage_books') # 替换 @admin_required def edit_book(book_id): book = Book.query.get_or_404(book_id) if request.method == 'POST': # 获取表单数据 title = request.form.get('title') author = request.form.get('author') publisher = request.form.get('publisher') category_id = request.form.get('category_id') tags = request.form.get('tags') isbn = request.form.get('isbn') publish_year = request.form.get('publish_year') description = request.form.get('description') price = request.form.get('price') status = request.form.get('status', type=int) # 基本验证 if not title or not author: flash('书名和作者不能为空', 'danger') categories = Category.query.all() return render_template('book/edit.html', book=book, categories=categories, current_user=current_user) # ISBN验证 if isbn and isbn.strip(): # 确保ISBN不是空字符串 # 移除连字符和空格 clean_isbn = isbn.replace('-', '').replace(' ', '') # 长度检查 if len(clean_isbn) != 10 and len(clean_isbn) != 13: flash('ISBN必须是10位或13位', 'danger') categories = Category.query.all() return render_template('book/edit.html', book=book, categories=categories, current_user=current_user) # ISBN-10验证 if len(clean_isbn) == 10: # 检查前9位是否为数字 if not clean_isbn[:9].isdigit(): flash('ISBN-10的前9位必须是数字', 'danger') categories = Category.query.all() return render_template('book/edit.html', book=book, categories=categories, current_user=current_user) # 检查最后一位是否为数字或'X' if not (clean_isbn[9].isdigit() or clean_isbn[9].upper() == 'X'): flash('ISBN-10的最后一位必须是数字或X', 'danger') categories = Category.query.all() return render_template('book/edit.html', book=book, categories=categories, current_user=current_user) # 校验和验证 sum = 0 for i in range(9): sum += int(clean_isbn[i]) * (10 - i) check_digit = 10 if clean_isbn[9].upper() == 'X' else int(clean_isbn[9]) sum += check_digit if sum % 11 != 0: flash('ISBN-10校验和无效', 'danger') categories = Category.query.all() return render_template('book/edit.html', book=book, categories=categories, current_user=current_user) # ISBN-13验证 if len(clean_isbn) == 13: # 检查是否全是数字 if not clean_isbn.isdigit(): flash('ISBN-13必须全是数字', 'danger') categories = Category.query.all() return render_template('book/edit.html', book=book, categories=categories, current_user=current_user) # 校验和验证 sum = 0 for i in range(12): sum += int(clean_isbn[i]) * (1 if i % 2 == 0 else 3) check_digit = (10 - (sum % 10)) % 10 if check_digit != int(clean_isbn[12]): flash('ISBN-13校验和无效', 'danger') categories = Category.query.all() return render_template('book/edit.html', book=book, categories=categories, current_user=current_user) # 处理库存变更 new_stock = request.form.get('stock', type=int) or 0 # 默认为0而非None if new_stock != book.stock: from app.models.inventory import InventoryLog change_amount = new_stock - book.stock change_type = '入库' if change_amount > 0 else '出库' inventory_log = InventoryLog( book_id=book.id, change_type=change_type, change_amount=abs(change_amount), after_stock=new_stock, operator_id=current_user.id, remark=f'管理员编辑图书库存 - {book.title}', changed_at=datetime.datetime.now() ) db.session.add(inventory_log) book.stock = new_stock # 处理封面图片上传 if 'cover' in request.files: cover_file = request.files['cover'] if cover_file and cover_file.filename != '': filename = secure_filename(f"{uuid.uuid4()}_{cover_file.filename}") upload_folder = os.path.join(current_app.static_folder, 'covers') # 确保上传目录存在 if not os.path.exists(upload_folder): os.makedirs(upload_folder) file_path = os.path.join(upload_folder, filename) cover_file.save(file_path) book.cover_url = f'/static/covers/{filename}' # 记录更新前的图书信息 old_info = f"原信息: 书名={book.title}, 作者={book.author}, ISBN={book.isbn}, 库存={book.stock}" # 更新图书信息 book.title = title book.author = author book.publisher = publisher book.category_id = category_id book.tags = tags book.isbn = isbn book.publish_year = publish_year book.description = description book.price = price book.status = status book.updated_at = datetime.datetime.now() try: db.session.commit() # 记录操作日志 Log.add_log( action='编辑图书', user_id=current_user.id, target_type='book', target_id=book.id, ip_address=request.remote_addr, description=f"编辑图书: {title}, ISBN: {isbn}, 新库存: {new_stock}\n{old_info}" ) flash('图书信息更新成功', 'success') return redirect(url_for('book.book_list')) except Exception as e: db.session.rollback() # 记录操作失败日志 Log.add_log( action='编辑图书失败', user_id=current_user.id, target_type='book', target_id=book.id, ip_address=request.remote_addr, description=f"编辑图书失败: {title}, 错误: {str(e)}" ) flash(f'保存失败: {str(e)}', 'danger') categories = Category.query.all() return render_template('book/edit.html', book=book, categories=categories, current_user=current_user) # GET 请求 categories = Category.query.all() return render_template('book/edit.html', book=book, categories=categories, current_user=current_user) # 删除图书 @book_bp.route('/delete/', methods=['POST']) @login_required @permission_required('manage_books') # 替换 @admin_required def delete_book(book_id): book = Book.query.get_or_404(book_id) # 检查该书是否有借阅记录 from app.models.borrow import BorrowRecord active_borrows = BorrowRecord.query.filter_by(book_id=book_id, status=1).count() if active_borrows > 0: # 记录操作失败日志 Log.add_log( action='删除图书失败', user_id=current_user.id, target_type='book', target_id=book_id, ip_address=request.remote_addr, description=f"删除图书失败: {book.title}, 原因: 该图书有未归还的借阅记录" ) return jsonify({'success': False, 'message': '该图书有未归还的借阅记录,无法删除'}) # 考虑软删除而不是物理删除 book.status = 0 # 0表示已删除/下架 book.updated_at = datetime.datetime.now() db.session.commit() # 记录操作日志 Log.add_log( action='下架图书', user_id=current_user.id, target_type='book', target_id=book_id, ip_address=request.remote_addr, description=f"下架图书: {book.title}, ISBN: {book.isbn}" ) return jsonify({'success': True, 'message': '图书已成功下架'}) # 图书分类管理 @book_bp.route('/categories', methods=['GET']) @login_required @permission_required('manage_categories') # 替换 @admin_required def category_list(): categories = Category.query.all() # 记录访问日志 Log.add_log( action='访问分类管理', user_id=current_user.id, ip_address=request.remote_addr, description="访问图书分类管理页面" ) return render_template('book/categories.html', categories=categories, current_user=current_user) # 添加分类 @book_bp.route('/categories/add', methods=['POST']) @login_required @permission_required('manage_categories') # 替换 @admin_required def add_category(): name = request.form.get('name') parent_id = request.form.get('parent_id') or None sort = request.form.get('sort', 0, type=int) if not name: return jsonify({'success': False, 'message': '分类名称不能为空'}) category = Category(name=name, parent_id=parent_id, sort=sort) db.session.add(category) db.session.commit() # 记录操作日志 Log.add_log( action='添加图书分类', user_id=current_user.id, ip_address=request.remote_addr, description=f"添加图书分类: {name}, 上级分类ID: {parent_id}, 排序: {sort}" ) return jsonify({'success': True, 'message': '分类添加成功', 'id': category.id, 'name': category.name}) # 编辑分类 @book_bp.route('/categories/edit/', methods=['POST']) @login_required @permission_required('manage_categories') # 替换 @admin_required def edit_category(category_id): category = Category.query.get_or_404(category_id) old_name = category.name old_parent_id = category.parent_id old_sort = category.sort name = request.form.get('name') parent_id = request.form.get('parent_id') or None sort = request.form.get('sort', 0, type=int) if not name: return jsonify({'success': False, 'message': '分类名称不能为空'}) category.name = name category.parent_id = parent_id category.sort = sort db.session.commit() # 记录操作日志 Log.add_log( action='编辑图书分类', user_id=current_user.id, target_type='category', target_id=category_id, ip_address=request.remote_addr, description=f"编辑图书分类: 从 [名称={old_name}, 上级={old_parent_id}, 排序={old_sort}] 修改为 [名称={name}, 上级={parent_id}, 排序={sort}]" ) return jsonify({'success': True, 'message': '分类更新成功'}) # 删除分类 @book_bp.route('/categories/delete/', methods=['POST']) @login_required @permission_required('manage_categories') # 替换 @admin_required def delete_category(category_id): category = Category.query.get_or_404(category_id) # 检查是否有书籍使用此分类 books_count = Book.query.filter_by(category_id=category_id).count() if books_count > 0: # 记录操作失败日志 Log.add_log( action='删除图书分类失败', user_id=current_user.id, target_type='category', target_id=category_id, ip_address=request.remote_addr, description=f"删除图书分类失败: {category.name}, 原因: 该分类下有{books_count}本图书" ) return jsonify({'success': False, 'message': f'该分类下有{books_count}本图书,无法删除'}) # 检查是否有子分类 children_count = Category.query.filter_by(parent_id=category_id).count() if children_count > 0: # 记录操作失败日志 Log.add_log( action='删除图书分类失败', user_id=current_user.id, target_type='category', target_id=category_id, ip_address=request.remote_addr, description=f"删除图书分类失败: {category.name}, 原因: 该分类下有{children_count}个子分类" ) return jsonify({'success': False, 'message': f'该分类下有{children_count}个子分类,无法删除'}) category_name = category.name # 保存分类名称以便记录日志 db.session.delete(category) db.session.commit() # 记录操作日志 Log.add_log( action='删除图书分类', user_id=current_user.id, target_type='category', target_id=category_id, ip_address=request.remote_addr, description=f"删除图书分类: {category_name}" ) return jsonify({'success': True, 'message': '分类删除成功'}) # 批量导入图书 @book_bp.route('/import', methods=['GET', 'POST']) @login_required @permission_required('import_export_books') def import_books(): if request.method == 'POST': if 'file' not in request.files: flash('未选择文件', 'danger') return redirect(request.url) file = request.files['file'] if file.filename == '': flash('未选择文件', 'danger') return redirect(request.url) if file and file.filename.endswith(('.xlsx', '.xls')): try: # 添加详细日志 current_app.logger.info(f"开始导入Excel文件: {file.filename}") # 读取Excel文件 df = pd.read_excel(file) current_app.logger.info(f"成功读取Excel文件,共有{len(df)}行数据") # 打印DataFrame的列名和前几行数据,帮助诊断问题 current_app.logger.info(f"Excel文件列名: {df.columns.tolist()}") current_app.logger.info(f"Excel文件前两行数据:\n{df.head(2)}") success_count = 0 update_count = 0 # 新增:更新计数 error_count = 0 errors = [] # 检查必填列是否存在 required_columns = ['title', 'author'] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: error_msg = f"Excel文件缺少必要的列: {', '.join(missing_columns)}" current_app.logger.error(error_msg) flash(error_msg, 'danger') return redirect(request.url) # 处理每一行数据 for index, row in df.iterrows(): try: current_app.logger.debug(f"处理第{index + 2}行数据") # 检查必填字段 if pd.isna(row.get('title')) or pd.isna(row.get('author')): error_msg = f'第{index + 2}行: 书名或作者为空' current_app.logger.warning(error_msg) errors.append(error_msg) error_count += 1 continue # 处理数据类型转换 try: stock = int(row.get('stock')) if not pd.isna(row.get('stock')) else 0 except ValueError: error_msg = f'第{index + 2}行: 库存数量必须是整数' current_app.logger.warning(error_msg) errors.append(error_msg) error_count += 1 continue try: price = float(row.get('price')) if not pd.isna(row.get('price')) else None except ValueError: error_msg = f'第{index + 2}行: 价格必须是数字' current_app.logger.warning(error_msg) errors.append(error_msg) error_count += 1 continue try: category_id = int(row.get('category_id')) if not pd.isna(row.get('category_id')) else None if category_id and not Category.query.get(category_id): error_msg = f'第{index + 2}行: 分类ID {category_id} 不存在' current_app.logger.warning(error_msg) errors.append(error_msg) error_count += 1 continue except ValueError: error_msg = f'第{index + 2}行: 分类ID必须是整数' current_app.logger.warning(error_msg) errors.append(error_msg) error_count += 1 continue # 检查ISBN是否已存在 isbn = row.get('isbn') existing_book = None if isbn and not pd.isna(isbn): isbn = str(isbn) existing_book = Book.query.filter_by(isbn=isbn).first() # 如果ISBN已存在,检查状态 if existing_book: if existing_book.status == 1: # 活跃的图书,不允许重复导入 error_msg = f'第{index + 2}行: ISBN {isbn} 已存在于活跃图书中' current_app.logger.warning(error_msg) errors.append(error_msg) error_count += 1 continue else: # 已软删除的图书,更新它 current_app.logger.info(f"第{index + 2}行: 发现已删除的ISBN {isbn},将更新该记录") # 更新图书信息 existing_book.title = row.get('title') existing_book.author = row.get('author') existing_book.publisher = row.get('publisher') if not pd.isna( row.get('publisher')) else None existing_book.category_id = category_id existing_book.tags = row.get('tags') if not pd.isna(row.get('tags')) else None existing_book.publish_year = str(row.get('publish_year')) if not pd.isna( row.get('publish_year')) else None existing_book.description = row.get('description') if not pd.isna( row.get('description')) else None existing_book.cover_url = row.get('cover_url') if not pd.isna( row.get('cover_url')) else None existing_book.price = price existing_book.status = 1 # 重新激活图书 existing_book.updated_at = datetime.datetime.now() # 处理库存变更 stock_change = stock - existing_book.stock existing_book.stock = stock # 创建库存日志 if stock_change != 0: from app.models.inventory import InventoryLog change_type = '入库' if stock_change > 0 else '出库' inventory_log = InventoryLog( book_id=existing_book.id, change_type=change_type, change_amount=abs(stock_change), after_stock=stock, operator_id=current_user.id, remark='批量导入更新图书', changed_at=datetime.datetime.now() ) db.session.add(inventory_log) update_count += 1 continue # 继续处理下一行 # 创建新书籍记录 book = Book( title=row.get('title'), author=row.get('author'), publisher=row.get('publisher') if not pd.isna(row.get('publisher')) else None, category_id=category_id, tags=row.get('tags') if not pd.isna(row.get('tags')) else None, isbn=isbn, publish_year=str(row.get('publish_year')) if not pd.isna(row.get('publish_year')) else None, description=row.get('description') if not pd.isna(row.get('description')) else None, cover_url=row.get('cover_url') if not pd.isna(row.get('cover_url')) else None, stock=stock, price=price, status=1, created_at=datetime.datetime.now(), updated_at=datetime.datetime.now() ) db.session.add(book) # 提交以获取book的id db.session.flush() current_app.logger.debug(f"书籍添加成功: {book.title}, ID: {book.id}") # 创建库存日志 if book.stock > 0: from app.models.inventory import InventoryLog inventory_log = InventoryLog( book_id=book.id, change_type='入库', change_amount=book.stock, after_stock=book.stock, operator_id=current_user.id, remark='批量导入图书', changed_at=datetime.datetime.now() ) db.session.add(inventory_log) current_app.logger.debug(f"库存日志添加成功: 书籍ID {book.id}, 数量 {book.stock}") success_count += 1 except Exception as e: error_msg = f'第{index + 2}行: {str(e)}' current_app.logger.error(f"处理第{index + 2}行时出错: {str(e)}") errors.append(error_msg) error_count += 1 db.session.rollback() # 每行错误单独回滚 try: # 最终提交所有成功的记录 if success_count > 0 or update_count > 0: db.session.commit() current_app.logger.info(f"导入完成: 新增{success_count}条,更新{update_count}条,失败{error_count}条") # 记录操作日志 Log.add_log( action='批量导入图书', user_id=current_user.id, ip_address=request.remote_addr, description=f"批量导入图书: 新增{success_count}条,更新{update_count}条,失败{error_count}条,文件名:{file.filename}" ) # 输出详细的错误信息 if success_count > 0 or update_count > 0: flash(f'导入完成: 新增{success_count}条,更新{update_count}条,失败{error_count}条', 'success') else: flash(f'导入完成: 新增{success_count}条,更新{update_count}条,失败{error_count}条', 'warning') if errors: error_display = '
'.join(errors[:10]) if len(errors) > 10: error_display += f'
...等共{len(errors)}个错误' flash(error_display, 'warning') return redirect(url_for('book.book_list')) except Exception as commit_error: db.session.rollback() error_msg = f"提交数据库事务失败: {str(commit_error)}" current_app.logger.error(error_msg) flash(error_msg, 'danger') return redirect(request.url) except Exception as e: db.session.rollback() # 记录详细错误日志 error_msg = f"批量导入图书失败: {str(e)}" current_app.logger.error(error_msg) # 记录操作失败日志 Log.add_log( action='批量导入图书失败', user_id=current_user.id, ip_address=request.remote_addr, description=f"批量导入图书失败: {str(e)}, 文件名:{file.filename}" ) flash(f'导入失败: {str(e)}', 'danger') return redirect(request.url) else: flash('只支持Excel文件(.xlsx, .xls)', 'danger') return redirect(request.url) return render_template('book/import.html', current_user=current_user) # 导出图书 @book_bp.route('/export', methods=['GET']) @login_required @permission_required('import_export_books') def export_books(): try: # 获取所有活跃的图书 books = Book.query.filter_by(status=1).all() # 创建工作簿和工作表 output = BytesIO() workbook = xlsxwriter.Workbook(output) worksheet = workbook.add_worksheet() # 添加表头 headers = ['ID', '书名', '作者', '出版社', '分类', '标签', 'ISBN', '出版年份', '描述', '封面链接', '库存', '价格', '创建时间', '更新时间'] for col_num, header in enumerate(headers): worksheet.write(0, col_num, header) # 不使用status过滤条件,因为Category模型没有status字段 category_dict = {} try: categories = db.session.execute( text("SELECT id, name FROM categories") ).fetchall() for cat in categories: category_dict[cat[0]] = cat[1] except Exception as ex: current_app.logger.warning(f"获取分类失败: {str(ex)}") # 添加数据行 for row_num, book in enumerate(books, 1): # 获取分类名称(如果有) category_name = category_dict.get(book.category_id, "") if book.category_id else "" # 格式化日期 created_at = book.created_at.strftime('%Y-%m-%d %H:%M:%S') if book.created_at else "" updated_at = book.updated_at.strftime('%Y-%m-%d %H:%M:%S') if book.updated_at else "" # 写入图书数据 worksheet.write(row_num, 0, book.id) worksheet.write(row_num, 1, book.title) worksheet.write(row_num, 2, book.author) worksheet.write(row_num, 3, book.publisher or "") worksheet.write(row_num, 4, category_name) worksheet.write(row_num, 5, book.tags or "") worksheet.write(row_num, 6, book.isbn or "") worksheet.write(row_num, 7, book.publish_year or "") worksheet.write(row_num, 8, book.description or "") worksheet.write(row_num, 9, book.cover_url or "") worksheet.write(row_num, 10, book.stock) worksheet.write(row_num, 11, float(book.price) if book.price else 0) worksheet.write(row_num, 12, created_at) worksheet.write(row_num, 13, updated_at) # 关闭工作簿 workbook.close() # 设置响应 output.seek(0) # 记录操作日志 Log.add_log( action='导出图书', user_id=current_user.id, ip_address=request.remote_addr, description=f"导出了{len(books)}本图书" ) # 返回文件 return send_file( output, as_attachment=True, download_name=f"图书导出_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}.xlsx", mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' ) except Exception as e: current_app.logger.error(f"导出图书失败: {str(e)}") flash(f'导出失败: {str(e)}', 'danger') return redirect(url_for('book.book_list')) @book_bp.route('/test-permissions') def test_permissions(): """测试当前用户权限""" if not current_user.is_authenticated: return "未登录" return f"""

用户权限信息

用户名: {current_user.username}

角色ID: {current_user.role_id}

是否管理员: {'是' if current_user.role_id == 1 else '否'}

尝试访问管理页面

""" # 图书浏览页面 - 不需要修改,已经只有@login_required @book_bp.route('/browse') @login_required def browse_books(): """图书浏览页面 - 面向普通用户的友好界面""" page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 12, type=int) # 增加每页数量 # 只显示状态为1的图书(未下架的图书) query = Book.query.filter_by(status=1) # 搜索功能 search = request.args.get('search', '') if search: query = query.filter( (Book.title.contains(search)) | (Book.author.contains(search)) | (Book.isbn.contains(search)) ) # 分类筛选 category_id = request.args.get('category_id', type=int) if category_id: query = query.filter_by(category_id=category_id) # 排序 sort = request.args.get('sort', 'id') order = request.args.get('order', 'desc') if order == 'desc': query = query.order_by(getattr(Book, sort).desc()) else: query = query.order_by(getattr(Book, sort)) pagination = query.paginate(page=page, per_page=per_page) books = pagination.items # 获取所有分类供筛选使用 categories = Category.query.all() # 记录访问日志 Log.add_log( action='浏览图书', user_id=current_user.id, ip_address=request.remote_addr, description=f"浏览图书: 搜索={search}, 分类={category_id}, 排序={sort} {order}" ) return render_template('book/browse.html', books=books, pagination=pagination, search=search, categories=categories, category_id=category_id, sort=sort, order=order) @book_bp.route('/template/download') @login_required @permission_required('import_export_books') def download_template(): """下载图书导入模板""" import tempfile import os from openpyxl import Workbook from openpyxl.styles import Font, PatternFill, Alignment, Border, Side from openpyxl.utils import get_column_letter from flask import after_this_request, current_app as app, send_file # 创建工作簿和工作表 wb = Workbook() ws = wb.active ws.title = "图书导入模板" # 定义样式 header_font = Font(name='微软雅黑', size=11, bold=True, color="FFFFFF") header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid") required_font = Font(name='微软雅黑', size=11, bold=True, color="FF0000") optional_font = Font(name='微软雅黑', size=11) center_alignment = Alignment(horizontal='center', vertical='center', wrap_text=True) left_alignment = Alignment(vertical='center', wrap_text=True) # 定义边框样式 thin_border = Border( left=Side(style='thin'), right=Side(style='thin'), top=Side(style='thin'), bottom=Side(style='thin') ) # 定义列宽 column_widths = { 'A': 30, # title 'B': 20, # author 'C': 20, # publisher 'D': 15, # category_id 'E': 25, # tags 'F': 15, # isbn 'G': 15, # publish_year 'H': 40, # description 'I': 30, # cover_url 'J': 10, # stock 'K': 10, # price } # 设置列宽 for col, width in column_widths.items(): ws.column_dimensions[col].width = width # 定义字段信息 fields = [ {'name': 'title', 'desc': '图书标题', 'required': True, 'example': '哈利·波特与魔法石'}, {'name': 'author', 'desc': '作者名称', 'required': True, 'example': 'J.K.罗琳'}, {'name': 'publisher', 'desc': '出版社', 'required': False, 'example': '人民文学出版社'}, {'name': 'category_id', 'desc': '分类ID', 'required': False, 'example': '1'}, {'name': 'tags', 'desc': '标签', 'required': False, 'example': '魔幻,冒险,青少年文学'}, {'name': 'isbn', 'desc': 'ISBN编号', 'required': False, 'example': '9787020042494'}, {'name': 'publish_year', 'desc': '出版年份', 'required': False, 'example': '2000'}, {'name': 'description', 'desc': '图书简介', 'required': False, 'example': '这是一本关于魔法的书籍,讲述了小男孩哈利...'}, {'name': 'cover_url', 'desc': '封面图片URL', 'required': False, 'example': 'https://example.com/covers/hp.jpg'}, {'name': 'stock', 'desc': '库存数量', 'required': False, 'example': '10'}, {'name': 'price', 'desc': '价格', 'required': False, 'example': '39.5'}, ] # 添加标题行 ws.append([f"{field['name']}" for field in fields]) # 设置标题行样式 for col_idx, field in enumerate(fields, start=1): cell = ws.cell(row=1, column=col_idx) cell.font = header_font cell.fill = header_fill cell.alignment = center_alignment cell.border = thin_border # 添加示例数据行 ws.append([field['example'] for field in fields]) # 设置示例行样式 for col_idx, field in enumerate(fields, start=1): cell = ws.cell(row=2, column=col_idx) cell.alignment = left_alignment cell.border = thin_border if field['required']: cell.font = required_font else: cell.font = optional_font # 冻结首行 ws.freeze_panes = "A2" # 添加说明工作表 instructions_ws = wb.create_sheet(title="填写说明") # 设置说明工作表的列宽 instructions_ws.column_dimensions['A'].width = 15 instructions_ws.column_dimensions['B'].width = 20 instructions_ws.column_dimensions['C'].width = 60 # 添加说明标题 instructions_ws.append(["字段名", "说明", "备注"]) # 设置标题行样式 for col_idx in range(1, 4): cell = instructions_ws.cell(row=1, column=col_idx) cell.font = header_font cell.fill = header_fill cell.alignment = center_alignment cell.border = thin_border # 字段详细说明 notes = { 'title': "图书的完整名称,例如:'哈利·波特与魔法石'。必须填写且不能为空。", 'author': "图书的作者名称,例如:'J.K.罗琳'。必须填写且不能为空。如有多个作者,请用逗号分隔。", 'publisher': "出版社名称,例如:'人民文学出版社'。", 'category_id': "图书分类的系统ID。请在系统的分类管理页面查看ID。如果不确定,可以留空,系统将使用默认分类。", 'tags': "图书的标签,多个标签请用英文逗号分隔,例如:'魔幻,冒险,青少年文学'。", 'isbn': "图书的ISBN号码,例如:'9787020042494'。请输入完整的13位或10位ISBN。", 'publish_year': "图书的出版年份,例如:'2000'。请输入4位数字年份。", 'description': "图书的简介或摘要。可以输入详细的描述信息,系统支持换行和基本格式。", 'cover_url': "图书封面的在线URL地址。如果有图片网址,系统将自动下载并设置为封面。", 'stock': "图书的库存数量,例如:'10'。请输入整数。", 'price': "图书的价格,例如:'39.5'。可以输入小数。" } # 添加字段说明 for idx, field in enumerate(fields, start=2): required_text = "【必填】" if field['required'] else "【选填】" instructions_ws.append([ field['name'], f"{field['desc']} {required_text}", notes.get(field['name'], "") ]) # 设置单元格样式 for col_idx in range(1, 4): cell = instructions_ws.cell(row=idx, column=col_idx) cell.alignment = left_alignment cell.border = thin_border if field['required'] and col_idx == 2: cell.font = required_font else: cell.font = optional_font # 添加示例行 instructions_ws.append(["", "", ""]) instructions_ws.append(["示例", "", "以下是完整的示例数据:"]) example_data = [ {"title": "哈利·波特与魔法石", "author": "J.K.罗琳", "publisher": "人民文学出版社", "category_id": "1", "tags": "魔幻,冒险,青少年文学", "isbn": "9787020042494", "publish_year": "2000", "description": "这是一本关于魔法的书籍,讲述了小男孩哈利...", "cover_url": "https://example.com/covers/hp.jpg", "stock": "10", "price": "39.5"}, {"title": "三体", "author": "刘慈欣", "publisher": "重庆出版社", "category_id": "2", "tags": "科幻,硬科幻,中国科幻", "isbn": "9787536692930", "publish_year": "2008", "description": "文化大革命如火如荼进行的同时...", "cover_url": "https://example.com/covers/threebody.jpg", "stock": "15", "price": "59.8"}, {"title": "平凡的世界", "author": "路遥", "publisher": "北京十月文艺出版社", "category_id": "3", "tags": "文学,现实主义,农村", "isbn": "9787530216781", "publish_year": "2017", "description": "这是一部全景式地表现中国当代城乡社会...", "cover_url": "https://example.com/covers/world.jpg", "stock": "8", "price": "128.0"} ] # 添加3个完整示例到主模板工作表 for example in example_data: row_data = [example.get(field['name'], '') for field in fields] ws.append(row_data) # 设置示例数据样式 for row_idx in range(3, 6): for col_idx in range(1, len(fields) + 1): cell = ws.cell(row=row_idx, column=col_idx) cell.alignment = left_alignment cell.border = thin_border # 使用临时文件保存工作簿 fd, temp_path = tempfile.mkstemp(suffix='.xlsx') os.close(fd) try: wb.save(temp_path) # 注册一个函数在响应完成后删除临时文件 @after_this_request def remove_file(response): try: os.unlink(temp_path) except Exception as error: app.logger.error("删除临时文件出错: %s", error) return response response = send_file( temp_path, as_attachment=True, download_name='图书导入模板.xlsx', mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' ) # 添加防止缓存的头信息 response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate" response.headers["Pragma"] = "no-cache" response.headers["Expires"] = "0" return response except Exception as e: # 确保在出错时也删除临时文件 try: os.unlink(temp_path) except: pass app.logger.error("生成模板文件出错: %s", str(e)) return {"error": "生成模板文件失败,请稍后再试"}, 500