2025-05-16 22:50:25 +08:00

1309 lines
52 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, jsonify, send_file
from app.models.book import Book, Category
from app.models.user import db
from app.utils.auth import login_required, permission_required # 修改导入替换admin_required为permission_required
from flask_login import current_user
import os
from werkzeug.utils import secure_filename
import datetime
import pandas as pd
import uuid
from app.models.log import Log
from io import BytesIO
import xlsxwriter
from sqlalchemy import text
book_bp = Blueprint('book', __name__)
@book_bp.route('/admin/list')
@login_required
@permission_required('manage_books') # 替换 @admin_required
def admin_book_list():
print(f"DEBUG: admin_book_list 函数被调用,用户={current_user.username},认证状态={current_user.is_authenticated}")
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
# 只显示状态为1的图书未下架的图书
query = Book.query.filter_by(status=1)
# 搜索功能
search = request.args.get('search', '')
if search:
query = query.filter(
(Book.title.contains(search)) |
(Book.author.contains(search)) |
(Book.isbn.contains(search))
)
# 分类筛选
category_id = request.args.get('category_id', type=int)
if category_id:
query = query.filter_by(category_id=category_id)
# 排序
sort = request.args.get('sort', 'id')
order = request.args.get('order', 'desc')
if order == 'desc':
query = query.order_by(getattr(Book, sort).desc())
else:
query = query.order_by(getattr(Book, sort))
pagination = query.paginate(page=page, per_page=per_page)
books = pagination.items
# 获取所有分类供筛选使用
categories = Category.query.all()
# 记录访问日志
Log.add_log(
action='访问管理图书列表',
user_id=current_user.id,
ip_address=request.remote_addr,
description=f"查询条件: 搜索={search}, 分类={category_id}, 排序={sort} {order}"
)
return render_template('book/list.html',
books=books,
pagination=pagination,
search=search,
categories=categories,
category_id=category_id,
sort=sort,
order=order,
current_user=current_user,
is_admin_view=True)
# 图书列表页面 - 不需要修改,已经只有@login_required
@book_bp.route('/list')
@login_required
def book_list():
print("访问图书列表页面") # 调试输出
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
# 只显示状态为1的图书未下架的图书
query = Book.query.filter_by(status=1)
# 搜索功能
search = request.args.get('search', '')
if search:
query = query.filter(
(Book.title.contains(search)) |
(Book.author.contains(search)) |
(Book.isbn.contains(search))
)
# 分类筛选
category_id = request.args.get('category_id', type=int)
if category_id:
query = query.filter_by(category_id=category_id)
# 排序
sort = request.args.get('sort', 'id')
order = request.args.get('order', 'desc')
if order == 'desc':
query = query.order_by(getattr(Book, sort).desc())
else:
query = query.order_by(getattr(Book, sort))
pagination = query.paginate(page=page, per_page=per_page)
books = pagination.items
# 获取所有分类供筛选使用
categories = Category.query.all()
# 记录访问日志
Log.add_log(
action='访问图书列表',
user_id=current_user.id,
ip_address=request.remote_addr,
description=f"查询条件: 搜索={search}, 分类={category_id}, 排序={sort} {order}"
)
return render_template('book/list.html',
books=books,
pagination=pagination,
search=search,
categories=categories,
category_id=category_id,
sort=sort,
order=order,
current_user=current_user)
# 图书详情页面 - 不需要修改,已经只有@login_required
@book_bp.route('/detail/<int:book_id>')
@login_required
def book_detail(book_id):
book = Book.query.get_or_404(book_id)
# 添加当前时间用于判断借阅是否逾期
now = datetime.datetime.now()
# 如果用户是管理员,预先查询并排序借阅记录
borrow_records = []
# 使用current_user代替g.user
if current_user.is_authenticated and current_user.role_id == 1:
from app.models.borrow import BorrowRecord
borrow_records = BorrowRecord.query.filter_by(book_id=book_id).order_by(BorrowRecord.borrow_date.desc()).limit(
10).all()
# 记录访问日志
Log.add_log(
action='查看图书详情',
user_id=current_user.id,
target_type='book',
target_id=book_id,
ip_address=request.remote_addr,
description=f"查看图书: {book.title}"
)
return render_template(
'book/detail.html',
book=book,
current_user=current_user,
borrow_records=borrow_records,
now=now
)
# 添加图书页面
@book_bp.route('/add', methods=['GET', 'POST'])
@login_required
@permission_required('manage_books') # 替换 @admin_required
def add_book():
if request.method == 'POST':
title = request.form.get('title')
author = request.form.get('author')
publisher = request.form.get('publisher')
category_id = request.form.get('category_id')
tags = request.form.get('tags')
isbn = request.form.get('isbn')
publish_year = request.form.get('publish_year')
description = request.form.get('description')
stock = request.form.get('stock', type=int, default=0)
price = request.form.get('price')
# 表单验证
errors = []
if not title:
errors.append('书名不能为空')
if not author:
errors.append('作者不能为空')
# 检查ISBN是否已存在(如果提供了ISBN)
if isbn:
existing_book = Book.query.filter_by(isbn=isbn).first()
if existing_book:
errors.append(f'ISBN "{isbn}" 已存在请检查ISBN或查找现有图书')
if errors:
for error in errors:
flash(error, 'danger')
categories = Category.query.all()
# 保留已填写的表单数据
book_data = {
'title': title,
'author': author,
'publisher': publisher,
'category_id': category_id,
'tags': tags,
'isbn': isbn,
'publish_year': publish_year,
'description': description,
'stock': stock,
'price': price
}
return render_template('book/add.html', categories=categories,
current_user=current_user, book=book_data)
# 处理封面图片上传
cover_url = None
if 'cover' in request.files:
cover_file = request.files['cover']
if cover_file and cover_file.filename != '':
try:
# 更清晰的文件命名
original_filename = secure_filename(cover_file.filename)
# 保留原始文件扩展名
_, ext = os.path.splitext(original_filename)
if not ext:
ext = '.jpg' # 默认扩展名
filename = f"{uuid.uuid4()}{ext}"
upload_folder = os.path.join(current_app.static_folder, 'covers')
# 确保上传目录存在
if not os.path.exists(upload_folder):
os.makedirs(upload_folder)
file_path = os.path.join(upload_folder, filename)
cover_file.save(file_path)
cover_url = f'/static/covers/{filename}'
except Exception as e:
current_app.logger.error(f"封面上传失败: {str(e)}")
flash(f"封面上传失败: {str(e)}", 'warning')
try:
# 创建新图书
book = Book(
title=title,
author=author,
publisher=publisher,
category_id=category_id,
tags=tags,
isbn=isbn,
publish_year=publish_year,
description=description,
cover_url=cover_url,
stock=stock,
price=price,
status=1,
created_at=datetime.datetime.now(),
updated_at=datetime.datetime.now()
)
db.session.add(book)
# 先提交以获取book的id
db.session.commit()
# 记录库存日志 - 在获取 book.id 后
if stock and int(stock) > 0:
from app.models.inventory import InventoryLog
inventory_log = InventoryLog(
book_id=book.id,
change_type='入库',
change_amount=stock,
after_stock=stock,
operator_id=current_user.id,
remark='新书入库',
changed_at=datetime.datetime.now()
)
db.session.add(inventory_log)
db.session.commit()
# 记录操作日志
Log.add_log(
action='添加图书',
user_id=current_user.id,
target_type='book',
target_id=book.id,
ip_address=request.remote_addr,
description=f"添加图书: {title}, ISBN: {isbn}, 初始库存: {stock}"
)
flash(f'{title}》添加成功', 'success')
return redirect(url_for('book.book_list'))
except Exception as e:
db.session.rollback()
error_msg = str(e)
# 记录详细错误日志
current_app.logger.error(f"添加图书失败: {error_msg}")
# 记录操作失败日志
Log.add_log(
action='添加图书失败',
user_id=current_user.id,
ip_address=request.remote_addr,
description=f"添加图书失败: {title}, 错误: {error_msg}"
)
flash(f'添加图书失败: {error_msg}', 'danger')
categories = Category.query.all()
# 保留已填写的表单数据
book_data = {
'title': title,
'author': author,
'publisher': publisher,
'category_id': category_id,
'tags': tags,
'isbn': isbn,
'publish_year': publish_year,
'description': description,
'stock': stock,
'price': price
}
return render_template('book/add.html', categories=categories,
current_user=current_user, book=book_data)
categories = Category.query.all()
return render_template('book/add.html', categories=categories, current_user=current_user)
# 编辑图书
@book_bp.route('/edit/<int:book_id>', methods=['GET', 'POST'])
@login_required
@permission_required('manage_books') # 替换 @admin_required
def edit_book(book_id):
book = Book.query.get_or_404(book_id)
if request.method == 'POST':
# 获取表单数据
title = request.form.get('title')
author = request.form.get('author')
publisher = request.form.get('publisher')
category_id = request.form.get('category_id')
tags = request.form.get('tags')
isbn = request.form.get('isbn')
publish_year = request.form.get('publish_year')
description = request.form.get('description')
price = request.form.get('price')
status = request.form.get('status', type=int)
# 基本验证
if not title or not author:
flash('书名和作者不能为空', 'danger')
categories = Category.query.all()
return render_template('book/edit.html', book=book, categories=categories, current_user=current_user)
# ISBN验证
if isbn and isbn.strip(): # 确保ISBN不是空字符串
# 移除连字符和空格
clean_isbn = isbn.replace('-', '').replace(' ', '')
# 长度检查
if len(clean_isbn) != 10 and len(clean_isbn) != 13:
flash('ISBN必须是10位或13位', 'danger')
categories = Category.query.all()
return render_template('book/edit.html', book=book, categories=categories, current_user=current_user)
# ISBN-10验证
if len(clean_isbn) == 10:
# 检查前9位是否为数字
if not clean_isbn[:9].isdigit():
flash('ISBN-10的前9位必须是数字', 'danger')
categories = Category.query.all()
return render_template('book/edit.html', book=book, categories=categories, current_user=current_user)
# 检查最后一位是否为数字或'X'
if not (clean_isbn[9].isdigit() or clean_isbn[9].upper() == 'X'):
flash('ISBN-10的最后一位必须是数字或X', 'danger')
categories = Category.query.all()
return render_template('book/edit.html', book=book, categories=categories, current_user=current_user)
# 校验和验证
sum = 0
for i in range(9):
sum += int(clean_isbn[i]) * (10 - i)
check_digit = 10 if clean_isbn[9].upper() == 'X' else int(clean_isbn[9])
sum += check_digit
if sum % 11 != 0:
flash('ISBN-10校验和无效', 'danger')
categories = Category.query.all()
return render_template('book/edit.html', book=book, categories=categories, current_user=current_user)
# ISBN-13验证
if len(clean_isbn) == 13:
# 检查是否全是数字
if not clean_isbn.isdigit():
flash('ISBN-13必须全是数字', 'danger')
categories = Category.query.all()
return render_template('book/edit.html', book=book, categories=categories, current_user=current_user)
# 校验和验证
sum = 0
for i in range(12):
sum += int(clean_isbn[i]) * (1 if i % 2 == 0 else 3)
check_digit = (10 - (sum % 10)) % 10
if check_digit != int(clean_isbn[12]):
flash('ISBN-13校验和无效', 'danger')
categories = Category.query.all()
return render_template('book/edit.html', book=book, categories=categories, current_user=current_user)
# 处理库存变更
new_stock = request.form.get('stock', type=int) or 0 # 默认为0而非None
if new_stock != book.stock:
from app.models.inventory import InventoryLog
change_amount = new_stock - book.stock
change_type = '入库' if change_amount > 0 else '出库'
inventory_log = InventoryLog(
book_id=book.id,
change_type=change_type,
change_amount=abs(change_amount),
after_stock=new_stock,
operator_id=current_user.id,
remark=f'管理员编辑图书库存 - {book.title}',
changed_at=datetime.datetime.now()
)
db.session.add(inventory_log)
book.stock = new_stock
# 处理封面图片上传
if 'cover' in request.files:
cover_file = request.files['cover']
if cover_file and cover_file.filename != '':
filename = secure_filename(f"{uuid.uuid4()}_{cover_file.filename}")
upload_folder = os.path.join(current_app.static_folder, 'covers')
# 确保上传目录存在
if not os.path.exists(upload_folder):
os.makedirs(upload_folder)
file_path = os.path.join(upload_folder, filename)
cover_file.save(file_path)
book.cover_url = f'/static/covers/{filename}'
# 记录更新前的图书信息
old_info = f"原信息: 书名={book.title}, 作者={book.author}, ISBN={book.isbn}, 库存={book.stock}"
# 更新图书信息
book.title = title
book.author = author
book.publisher = publisher
book.category_id = category_id
book.tags = tags
book.isbn = isbn
book.publish_year = publish_year
book.description = description
book.price = price
book.status = status
book.updated_at = datetime.datetime.now()
try:
db.session.commit()
# 记录操作日志
Log.add_log(
action='编辑图书',
user_id=current_user.id,
target_type='book',
target_id=book.id,
ip_address=request.remote_addr,
description=f"编辑图书: {title}, ISBN: {isbn}, 新库存: {new_stock}\n{old_info}"
)
flash('图书信息更新成功', 'success')
return redirect(url_for('book.book_list'))
except Exception as e:
db.session.rollback()
# 记录操作失败日志
Log.add_log(
action='编辑图书失败',
user_id=current_user.id,
target_type='book',
target_id=book.id,
ip_address=request.remote_addr,
description=f"编辑图书失败: {title}, 错误: {str(e)}"
)
flash(f'保存失败: {str(e)}', 'danger')
categories = Category.query.all()
return render_template('book/edit.html', book=book, categories=categories, current_user=current_user)
# GET 请求
categories = Category.query.all()
return render_template('book/edit.html', book=book, categories=categories, current_user=current_user)
# 删除图书
@book_bp.route('/delete/<int:book_id>', methods=['POST'])
@login_required
@permission_required('manage_books') # 替换 @admin_required
def delete_book(book_id):
book = Book.query.get_or_404(book_id)
# 检查该书是否有借阅记录
from app.models.borrow import BorrowRecord
active_borrows = BorrowRecord.query.filter_by(book_id=book_id, status=1).count()
if active_borrows > 0:
# 记录操作失败日志
Log.add_log(
action='删除图书失败',
user_id=current_user.id,
target_type='book',
target_id=book_id,
ip_address=request.remote_addr,
description=f"删除图书失败: {book.title}, 原因: 该图书有未归还的借阅记录"
)
return jsonify({'success': False, 'message': '该图书有未归还的借阅记录,无法删除'})
# 考虑软删除而不是物理删除
book.status = 0 # 0表示已删除/下架
book.updated_at = datetime.datetime.now()
db.session.commit()
# 记录操作日志
Log.add_log(
action='下架图书',
user_id=current_user.id,
target_type='book',
target_id=book_id,
ip_address=request.remote_addr,
description=f"下架图书: {book.title}, ISBN: {book.isbn}"
)
return jsonify({'success': True, 'message': '图书已成功下架'})
# 图书分类管理
@book_bp.route('/categories', methods=['GET'])
@login_required
@permission_required('manage_categories') # 替换 @admin_required
def category_list():
categories = Category.query.all()
# 记录访问日志
Log.add_log(
action='访问分类管理',
user_id=current_user.id,
ip_address=request.remote_addr,
description="访问图书分类管理页面"
)
return render_template('book/categories.html', categories=categories, current_user=current_user)
# 添加分类
@book_bp.route('/categories/add', methods=['POST'])
@login_required
@permission_required('manage_categories') # 替换 @admin_required
def add_category():
name = request.form.get('name')
parent_id = request.form.get('parent_id') or None
sort = request.form.get('sort', 0, type=int)
if not name:
return jsonify({'success': False, 'message': '分类名称不能为空'})
category = Category(name=name, parent_id=parent_id, sort=sort)
db.session.add(category)
db.session.commit()
# 记录操作日志
Log.add_log(
action='添加图书分类',
user_id=current_user.id,
ip_address=request.remote_addr,
description=f"添加图书分类: {name}, 上级分类ID: {parent_id}, 排序: {sort}"
)
return jsonify({'success': True, 'message': '分类添加成功', 'id': category.id, 'name': category.name})
# 编辑分类
@book_bp.route('/categories/edit/<int:category_id>', methods=['POST'])
@login_required
@permission_required('manage_categories') # 替换 @admin_required
def edit_category(category_id):
category = Category.query.get_or_404(category_id)
old_name = category.name
old_parent_id = category.parent_id
old_sort = category.sort
name = request.form.get('name')
parent_id = request.form.get('parent_id') or None
sort = request.form.get('sort', 0, type=int)
if not name:
return jsonify({'success': False, 'message': '分类名称不能为空'})
category.name = name
category.parent_id = parent_id
category.sort = sort
db.session.commit()
# 记录操作日志
Log.add_log(
action='编辑图书分类',
user_id=current_user.id,
target_type='category',
target_id=category_id,
ip_address=request.remote_addr,
description=f"编辑图书分类: 从 [名称={old_name}, 上级={old_parent_id}, 排序={old_sort}] 修改为 [名称={name}, 上级={parent_id}, 排序={sort}]"
)
return jsonify({'success': True, 'message': '分类更新成功'})
# 删除分类
@book_bp.route('/categories/delete/<int:category_id>', methods=['POST'])
@login_required
@permission_required('manage_categories') # 替换 @admin_required
def delete_category(category_id):
category = Category.query.get_or_404(category_id)
# 检查是否有书籍使用此分类
books_count = Book.query.filter_by(category_id=category_id).count()
if books_count > 0:
# 记录操作失败日志
Log.add_log(
action='删除图书分类失败',
user_id=current_user.id,
target_type='category',
target_id=category_id,
ip_address=request.remote_addr,
description=f"删除图书分类失败: {category.name}, 原因: 该分类下有{books_count}本图书"
)
return jsonify({'success': False, 'message': f'该分类下有{books_count}本图书,无法删除'})
# 检查是否有子分类
children_count = Category.query.filter_by(parent_id=category_id).count()
if children_count > 0:
# 记录操作失败日志
Log.add_log(
action='删除图书分类失败',
user_id=current_user.id,
target_type='category',
target_id=category_id,
ip_address=request.remote_addr,
description=f"删除图书分类失败: {category.name}, 原因: 该分类下有{children_count}个子分类"
)
return jsonify({'success': False, 'message': f'该分类下有{children_count}个子分类,无法删除'})
category_name = category.name # 保存分类名称以便记录日志
db.session.delete(category)
db.session.commit()
# 记录操作日志
Log.add_log(
action='删除图书分类',
user_id=current_user.id,
target_type='category',
target_id=category_id,
ip_address=request.remote_addr,
description=f"删除图书分类: {category_name}"
)
return jsonify({'success': True, 'message': '分类删除成功'})
# 批量导入图书
@book_bp.route('/import', methods=['GET', 'POST'])
@login_required
@permission_required('import_export_books')
def import_books():
if request.method == 'POST':
if 'file' not in request.files:
flash('未选择文件', 'danger')
return redirect(request.url)
file = request.files['file']
if file.filename == '':
flash('未选择文件', 'danger')
return redirect(request.url)
if file and file.filename.endswith(('.xlsx', '.xls')):
try:
# 添加详细日志
current_app.logger.info(f"开始导入Excel文件: {file.filename}")
# 读取Excel文件
df = pd.read_excel(file)
current_app.logger.info(f"成功读取Excel文件共有{len(df)}行数据")
# 打印DataFrame的列名和前几行数据帮助诊断问题
current_app.logger.info(f"Excel文件列名: {df.columns.tolist()}")
current_app.logger.info(f"Excel文件前两行数据:\n{df.head(2)}")
success_count = 0
update_count = 0 # 新增:更新计数
error_count = 0
errors = []
# 检查必填列是否存在
required_columns = ['title', 'author']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
error_msg = f"Excel文件缺少必要的列: {', '.join(missing_columns)}"
current_app.logger.error(error_msg)
flash(error_msg, 'danger')
return redirect(request.url)
# 处理每一行数据
for index, row in df.iterrows():
try:
current_app.logger.debug(f"处理第{index + 2}行数据")
# 检查必填字段
if pd.isna(row.get('title')) or pd.isna(row.get('author')):
error_msg = f'{index + 2}行: 书名或作者为空'
current_app.logger.warning(error_msg)
errors.append(error_msg)
error_count += 1
continue
# 处理数据类型转换
try:
stock = int(row.get('stock')) if not pd.isna(row.get('stock')) else 0
except ValueError:
error_msg = f'{index + 2}行: 库存数量必须是整数'
current_app.logger.warning(error_msg)
errors.append(error_msg)
error_count += 1
continue
try:
price = float(row.get('price')) if not pd.isna(row.get('price')) else None
except ValueError:
error_msg = f'{index + 2}行: 价格必须是数字'
current_app.logger.warning(error_msg)
errors.append(error_msg)
error_count += 1
continue
try:
category_id = int(row.get('category_id')) if not pd.isna(row.get('category_id')) else None
if category_id and not Category.query.get(category_id):
error_msg = f'{index + 2}行: 分类ID {category_id} 不存在'
current_app.logger.warning(error_msg)
errors.append(error_msg)
error_count += 1
continue
except ValueError:
error_msg = f'{index + 2}行: 分类ID必须是整数'
current_app.logger.warning(error_msg)
errors.append(error_msg)
error_count += 1
continue
# 检查ISBN是否已存在
isbn = row.get('isbn')
existing_book = None
if isbn and not pd.isna(isbn):
isbn = str(isbn)
existing_book = Book.query.filter_by(isbn=isbn).first()
# 如果ISBN已存在检查状态
if existing_book:
if existing_book.status == 1:
# 活跃的图书,不允许重复导入
error_msg = f'{index + 2}行: ISBN {isbn} 已存在于活跃图书中'
current_app.logger.warning(error_msg)
errors.append(error_msg)
error_count += 1
continue
else:
# 已软删除的图书,更新它
current_app.logger.info(f"{index + 2}行: 发现已删除的ISBN {isbn},将更新该记录")
# 更新图书信息
existing_book.title = row.get('title')
existing_book.author = row.get('author')
existing_book.publisher = row.get('publisher') if not pd.isna(
row.get('publisher')) else None
existing_book.category_id = category_id
existing_book.tags = row.get('tags') if not pd.isna(row.get('tags')) else None
existing_book.publish_year = str(row.get('publish_year')) if not pd.isna(
row.get('publish_year')) else None
existing_book.description = row.get('description') if not pd.isna(
row.get('description')) else None
existing_book.cover_url = row.get('cover_url') if not pd.isna(
row.get('cover_url')) else None
existing_book.price = price
existing_book.status = 1 # 重新激活图书
existing_book.updated_at = datetime.datetime.now()
# 处理库存变更
stock_change = stock - existing_book.stock
existing_book.stock = stock
# 创建库存日志
if stock_change != 0:
from app.models.inventory import InventoryLog
change_type = '入库' if stock_change > 0 else '出库'
inventory_log = InventoryLog(
book_id=existing_book.id,
change_type=change_type,
change_amount=abs(stock_change),
after_stock=stock,
operator_id=current_user.id,
remark='批量导入更新图书',
changed_at=datetime.datetime.now()
)
db.session.add(inventory_log)
update_count += 1
continue # 继续处理下一行
# 创建新书籍记录
book = Book(
title=row.get('title'),
author=row.get('author'),
publisher=row.get('publisher') if not pd.isna(row.get('publisher')) else None,
category_id=category_id,
tags=row.get('tags') if not pd.isna(row.get('tags')) else None,
isbn=isbn,
publish_year=str(row.get('publish_year')) if not pd.isna(row.get('publish_year')) else None,
description=row.get('description') if not pd.isna(row.get('description')) else None,
cover_url=row.get('cover_url') if not pd.isna(row.get('cover_url')) else None,
stock=stock,
price=price,
status=1,
created_at=datetime.datetime.now(),
updated_at=datetime.datetime.now()
)
db.session.add(book)
# 提交以获取book的id
db.session.flush()
current_app.logger.debug(f"书籍添加成功: {book.title}, ID: {book.id}")
# 创建库存日志
if book.stock > 0:
from app.models.inventory import InventoryLog
inventory_log = InventoryLog(
book_id=book.id,
change_type='入库',
change_amount=book.stock,
after_stock=book.stock,
operator_id=current_user.id,
remark='批量导入图书',
changed_at=datetime.datetime.now()
)
db.session.add(inventory_log)
current_app.logger.debug(f"库存日志添加成功: 书籍ID {book.id}, 数量 {book.stock}")
success_count += 1
except Exception as e:
error_msg = f'{index + 2}行: {str(e)}'
current_app.logger.error(f"处理第{index + 2}行时出错: {str(e)}")
errors.append(error_msg)
error_count += 1
db.session.rollback() # 每行错误单独回滚
try:
# 最终提交所有成功的记录
if success_count > 0 or update_count > 0:
db.session.commit()
current_app.logger.info(f"导入完成: 新增{success_count}条,更新{update_count}条,失败{error_count}")
# 记录操作日志
Log.add_log(
action='批量导入图书',
user_id=current_user.id,
ip_address=request.remote_addr,
description=f"批量导入图书: 新增{success_count}条,更新{update_count}条,失败{error_count}条,文件名:{file.filename}"
)
# 输出详细的错误信息
if success_count > 0 or update_count > 0:
flash(f'导入完成: 新增{success_count}条,更新{update_count}条,失败{error_count}', 'success')
else:
flash(f'导入完成: 新增{success_count}条,更新{update_count}条,失败{error_count}', 'warning')
if errors:
error_display = '<br>'.join(errors[:10])
if len(errors) > 10:
error_display += f'<br>...等共{len(errors)}个错误'
flash(error_display, 'warning')
return redirect(url_for('book.book_list'))
except Exception as commit_error:
db.session.rollback()
error_msg = f"提交数据库事务失败: {str(commit_error)}"
current_app.logger.error(error_msg)
flash(error_msg, 'danger')
return redirect(request.url)
except Exception as e:
db.session.rollback()
# 记录详细错误日志
error_msg = f"批量导入图书失败: {str(e)}"
current_app.logger.error(error_msg)
# 记录操作失败日志
Log.add_log(
action='批量导入图书失败',
user_id=current_user.id,
ip_address=request.remote_addr,
description=f"批量导入图书失败: {str(e)}, 文件名:{file.filename}"
)
flash(f'导入失败: {str(e)}', 'danger')
return redirect(request.url)
else:
flash('只支持Excel文件(.xlsx, .xls)', 'danger')
return redirect(request.url)
return render_template('book/import.html', current_user=current_user)
# 导出图书
@book_bp.route('/export', methods=['GET'])
@login_required
@permission_required('import_export_books')
def export_books():
try:
# 获取所有活跃的图书
books = Book.query.filter_by(status=1).all()
# 创建工作簿和工作表
output = BytesIO()
workbook = xlsxwriter.Workbook(output)
worksheet = workbook.add_worksheet()
# 添加表头
headers = ['ID', '书名', '作者', '出版社', '分类', '标签', 'ISBN', '出版年份',
'描述', '封面链接', '库存', '价格', '创建时间', '更新时间']
for col_num, header in enumerate(headers):
worksheet.write(0, col_num, header)
# 不使用status过滤条件因为Category模型没有status字段
category_dict = {}
try:
categories = db.session.execute(
text("SELECT id, name FROM categories")
).fetchall()
for cat in categories:
category_dict[cat[0]] = cat[1]
except Exception as ex:
current_app.logger.warning(f"获取分类失败: {str(ex)}")
# 添加数据行
for row_num, book in enumerate(books, 1):
# 获取分类名称(如果有)
category_name = category_dict.get(book.category_id, "") if book.category_id else ""
# 格式化日期
created_at = book.created_at.strftime('%Y-%m-%d %H:%M:%S') if book.created_at else ""
updated_at = book.updated_at.strftime('%Y-%m-%d %H:%M:%S') if book.updated_at else ""
# 写入图书数据
worksheet.write(row_num, 0, book.id)
worksheet.write(row_num, 1, book.title)
worksheet.write(row_num, 2, book.author)
worksheet.write(row_num, 3, book.publisher or "")
worksheet.write(row_num, 4, category_name)
worksheet.write(row_num, 5, book.tags or "")
worksheet.write(row_num, 6, book.isbn or "")
worksheet.write(row_num, 7, book.publish_year or "")
worksheet.write(row_num, 8, book.description or "")
worksheet.write(row_num, 9, book.cover_url or "")
worksheet.write(row_num, 10, book.stock)
worksheet.write(row_num, 11, float(book.price) if book.price else 0)
worksheet.write(row_num, 12, created_at)
worksheet.write(row_num, 13, updated_at)
# 关闭工作簿
workbook.close()
# 设置响应
output.seek(0)
# 记录操作日志
Log.add_log(
action='导出图书',
user_id=current_user.id,
ip_address=request.remote_addr,
description=f"导出了{len(books)}本图书"
)
# 返回文件
return send_file(
output,
as_attachment=True,
download_name=f"图书导出_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}.xlsx",
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
)
except Exception as e:
current_app.logger.error(f"导出图书失败: {str(e)}")
flash(f'导出失败: {str(e)}', 'danger')
return redirect(url_for('book.book_list'))
@book_bp.route('/test-permissions')
def test_permissions():
"""测试当前用户权限"""
if not current_user.is_authenticated:
return "未登录"
return f"""
<h1>用户权限信息</h1>
<p>用户名: {current_user.username}</p>
<p>角色ID: {current_user.role_id}</p>
<p>是否管理员: {'' if current_user.role_id == 1 else ''}</p>
<p><a href="/book/admin/list">尝试访问管理页面</a></p>
"""
# 图书浏览页面 - 不需要修改,已经只有@login_required
@book_bp.route('/browse')
@login_required
def browse_books():
"""图书浏览页面 - 面向普通用户的友好界面"""
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 12, type=int) # 增加每页数量
# 只显示状态为1的图书未下架的图书
query = Book.query.filter_by(status=1)
# 搜索功能
search = request.args.get('search', '')
if search:
query = query.filter(
(Book.title.contains(search)) |
(Book.author.contains(search)) |
(Book.isbn.contains(search))
)
# 分类筛选
category_id = request.args.get('category_id', type=int)
if category_id:
query = query.filter_by(category_id=category_id)
# 排序
sort = request.args.get('sort', 'id')
order = request.args.get('order', 'desc')
if order == 'desc':
query = query.order_by(getattr(Book, sort).desc())
else:
query = query.order_by(getattr(Book, sort))
pagination = query.paginate(page=page, per_page=per_page)
books = pagination.items
# 获取所有分类供筛选使用
categories = Category.query.all()
# 记录访问日志
Log.add_log(
action='浏览图书',
user_id=current_user.id,
ip_address=request.remote_addr,
description=f"浏览图书: 搜索={search}, 分类={category_id}, 排序={sort} {order}"
)
return render_template('book/browse.html',
books=books,
pagination=pagination,
search=search,
categories=categories,
category_id=category_id,
sort=sort,
order=order)
@book_bp.route('/template/download')
@login_required
@permission_required('import_export_books')
def download_template():
"""下载图书导入模板"""
import tempfile
import os
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.utils import get_column_letter
from flask import after_this_request, current_app as app, send_file
# 创建工作簿和工作表
wb = Workbook()
ws = wb.active
ws.title = "图书导入模板"
# 定义样式
header_font = Font(name='微软雅黑', size=11, bold=True, color="FFFFFF")
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
required_font = Font(name='微软雅黑', size=11, bold=True, color="FF0000")
optional_font = Font(name='微软雅黑', size=11)
center_alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
left_alignment = Alignment(vertical='center', wrap_text=True)
# 定义边框样式
thin_border = Border(
left=Side(style='thin'),
right=Side(style='thin'),
top=Side(style='thin'),
bottom=Side(style='thin')
)
# 定义列宽
column_widths = {
'A': 30, # title
'B': 20, # author
'C': 20, # publisher
'D': 15, # category_id
'E': 25, # tags
'F': 15, # isbn
'G': 15, # publish_year
'H': 40, # description
'I': 30, # cover_url
'J': 10, # stock
'K': 10, # price
}
# 设置列宽
for col, width in column_widths.items():
ws.column_dimensions[col].width = width
# 定义字段信息
fields = [
{'name': 'title', 'desc': '图书标题', 'required': True, 'example': '哈利·波特与魔法石'},
{'name': 'author', 'desc': '作者名称', 'required': True, 'example': 'J.K.罗琳'},
{'name': 'publisher', 'desc': '出版社', 'required': False, 'example': '人民文学出版社'},
{'name': 'category_id', 'desc': '分类ID', 'required': False, 'example': '1'},
{'name': 'tags', 'desc': '标签', 'required': False, 'example': '魔幻,冒险,青少年文学'},
{'name': 'isbn', 'desc': 'ISBN编号', 'required': False, 'example': '9787020042494'},
{'name': 'publish_year', 'desc': '出版年份', 'required': False, 'example': '2000'},
{'name': 'description', 'desc': '图书简介', 'required': False,
'example': '这是一本关于魔法的书籍,讲述了小男孩哈利...'},
{'name': 'cover_url', 'desc': '封面图片URL', 'required': False, 'example': 'https://example.com/covers/hp.jpg'},
{'name': 'stock', 'desc': '库存数量', 'required': False, 'example': '10'},
{'name': 'price', 'desc': '价格', 'required': False, 'example': '39.5'},
]
# 添加标题行
ws.append([f"{field['name']}" for field in fields])
# 设置标题行样式
for col_idx, field in enumerate(fields, start=1):
cell = ws.cell(row=1, column=col_idx)
cell.font = header_font
cell.fill = header_fill
cell.alignment = center_alignment
cell.border = thin_border
# 添加示例数据行
ws.append([field['example'] for field in fields])
# 设置示例行样式
for col_idx, field in enumerate(fields, start=1):
cell = ws.cell(row=2, column=col_idx)
cell.alignment = left_alignment
cell.border = thin_border
if field['required']:
cell.font = required_font
else:
cell.font = optional_font
# 冻结首行
ws.freeze_panes = "A2"
# 添加说明工作表
instructions_ws = wb.create_sheet(title="填写说明")
# 设置说明工作表的列宽
instructions_ws.column_dimensions['A'].width = 15
instructions_ws.column_dimensions['B'].width = 20
instructions_ws.column_dimensions['C'].width = 60
# 添加说明标题
instructions_ws.append(["字段名", "说明", "备注"])
# 设置标题行样式
for col_idx in range(1, 4):
cell = instructions_ws.cell(row=1, column=col_idx)
cell.font = header_font
cell.fill = header_fill
cell.alignment = center_alignment
cell.border = thin_border
# 字段详细说明
notes = {
'title': "图书的完整名称,例如:'哈利·波特与魔法石'。必须填写且不能为空。",
'author': "图书的作者名称,例如:'J.K.罗琳'。必须填写且不能为空。如有多个作者,请用逗号分隔。",
'publisher': "出版社名称,例如:'人民文学出版社'",
'category_id': "图书分类的系统ID。请在系统的分类管理页面查看ID。如果不确定可以留空系统将使用默认分类。",
'tags': "图书的标签,多个标签请用英文逗号分隔,例如:'魔幻,冒险,青少年文学'",
'isbn': "图书的ISBN号码例如'9787020042494'。请输入完整的13位或10位ISBN。",
'publish_year': "图书的出版年份,例如:'2000'。请输入4位数字年份。",
'description': "图书的简介或摘要。可以输入详细的描述信息,系统支持换行和基本格式。",
'cover_url': "图书封面的在线URL地址。如果有图片网址系统将自动下载并设置为封面。",
'stock': "图书的库存数量,例如:'10'。请输入整数。",
'price': "图书的价格,例如:'39.5'。可以输入小数。"
}
# 添加字段说明
for idx, field in enumerate(fields, start=2):
required_text = "【必填】" if field['required'] else "【选填】"
instructions_ws.append([
field['name'],
f"{field['desc']} {required_text}",
notes.get(field['name'], "")
])
# 设置单元格样式
for col_idx in range(1, 4):
cell = instructions_ws.cell(row=idx, column=col_idx)
cell.alignment = left_alignment
cell.border = thin_border
if field['required'] and col_idx == 2:
cell.font = required_font
else:
cell.font = optional_font
# 添加示例行
instructions_ws.append(["", "", ""])
instructions_ws.append(["示例", "", "以下是完整的示例数据:"])
example_data = [
{"title": "哈利·波特与魔法石", "author": "J.K.罗琳", "publisher": "人民文学出版社",
"category_id": "1", "tags": "魔幻,冒险,青少年文学", "isbn": "9787020042494",
"publish_year": "2000", "description": "这是一本关于魔法的书籍,讲述了小男孩哈利...",
"cover_url": "https://example.com/covers/hp.jpg", "stock": "10", "price": "39.5"},
{"title": "三体", "author": "刘慈欣", "publisher": "重庆出版社",
"category_id": "2", "tags": "科幻,硬科幻,中国科幻", "isbn": "9787536692930",
"publish_year": "2008", "description": "文化大革命如火如荼进行的同时...",
"cover_url": "https://example.com/covers/threebody.jpg", "stock": "15", "price": "59.8"},
{"title": "平凡的世界", "author": "路遥", "publisher": "北京十月文艺出版社",
"category_id": "3", "tags": "文学,现实主义,农村", "isbn": "9787530216781",
"publish_year": "2017", "description": "这是一部全景式地表现中国当代城乡社会...",
"cover_url": "https://example.com/covers/world.jpg", "stock": "8", "price": "128.0"}
]
# 添加3个完整示例到主模板工作表
for example in example_data:
row_data = [example.get(field['name'], '') for field in fields]
ws.append(row_data)
# 设置示例数据样式
for row_idx in range(3, 6):
for col_idx in range(1, len(fields) + 1):
cell = ws.cell(row=row_idx, column=col_idx)
cell.alignment = left_alignment
cell.border = thin_border
# 使用临时文件保存工作簿
fd, temp_path = tempfile.mkstemp(suffix='.xlsx')
os.close(fd)
try:
wb.save(temp_path)
# 注册一个函数在响应完成后删除临时文件
@after_this_request
def remove_file(response):
try:
os.unlink(temp_path)
except Exception as error:
app.logger.error("删除临时文件出错: %s", error)
return response
response = send_file(
temp_path,
as_attachment=True,
download_name='图书导入模板.xlsx',
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
)
# 添加防止缓存的头信息
response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
response.headers["Pragma"] = "no-cache"
response.headers["Expires"] = "0"
return response
except Exception as e:
# 确保在出错时也删除临时文件
try:
os.unlink(temp_path)
except:
pass
app.logger.error("生成模板文件出错: %s", str(e))
return {"error": "生成模板文件失败,请稍后再试"}, 500