superlishunqin 0c1d1b0d19 detail
2025-04-30 23:28:51 +08:00

567 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, g, jsonify
from app.models.book import Book, Category
from app.models.user import db
from app.utils.auth import login_required, admin_required
import os
from werkzeug.utils import secure_filename
import datetime
import pandas as pd
import uuid
book_bp = Blueprint('book', __name__)
# 图书列表页面
@book_bp.route('/list')
@login_required
def book_list():
print("访问图书列表页面") # 调试输出
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
query = Book.query
# 搜索功能
search = request.args.get('search', '')
if search:
query = query.filter(
(Book.title.contains(search)) |
(Book.author.contains(search)) |
(Book.isbn.contains(search))
)
# 分类筛选
category_id = request.args.get('category_id', type=int)
if category_id:
query = query.filter_by(category_id=category_id)
# 排序
sort = request.args.get('sort', 'id')
order = request.args.get('order', 'desc')
if order == 'desc':
query = query.order_by(getattr(Book, sort).desc())
else:
query = query.order_by(getattr(Book, sort))
pagination = query.paginate(page=page, per_page=per_page)
books = pagination.items
# 获取所有分类供筛选使用
categories = Category.query.all()
return render_template('book/list.html',
books=books,
pagination=pagination,
search=search,
categories=categories,
category_id=category_id,
sort=sort,
order=order,
current_user=g.user)
# 图书详情页面
@book_bp.route('/detail/<int:book_id>')
@login_required
def book_detail(book_id):
book = Book.query.get_or_404(book_id)
# 添加当前时间用于判断借阅是否逾期
now = datetime.datetime.now()
# 如果用户是管理员,预先查询并排序借阅记录
borrow_records = []
if g.user.role_id == 1: # 假设 role_id 1 为管理员
from app.models.borrow import BorrowRecord
borrow_records = BorrowRecord.query.filter_by(book_id=book_id).order_by(BorrowRecord.borrow_date.desc()).limit(
10).all()
return render_template(
'book/detail.html',
book=book,
current_user=g.user,
borrow_records=borrow_records,
now=now
)
# 添加图书页面
# 添加图书页面
@book_bp.route('/add', methods=['GET', 'POST'])
@login_required
@admin_required
def add_book():
if request.method == 'POST':
title = request.form.get('title')
author = request.form.get('author')
publisher = request.form.get('publisher')
category_id = request.form.get('category_id')
tags = request.form.get('tags')
isbn = request.form.get('isbn')
publish_year = request.form.get('publish_year')
description = request.form.get('description')
stock = request.form.get('stock', type=int, default=0)
price = request.form.get('price')
# 表单验证
errors = []
if not title:
errors.append('书名不能为空')
if not author:
errors.append('作者不能为空')
# 检查ISBN是否已存在(如果提供了ISBN)
if isbn:
existing_book = Book.query.filter_by(isbn=isbn).first()
if existing_book:
errors.append(f'ISBN "{isbn}" 已存在请检查ISBN或查找现有图书')
if errors:
for error in errors:
flash(error, 'danger')
categories = Category.query.all()
# 保留已填写的表单数据
book_data = {
'title': title,
'author': author,
'publisher': publisher,
'category_id': category_id,
'tags': tags,
'isbn': isbn,
'publish_year': publish_year,
'description': description,
'stock': stock,
'price': price
}
return render_template('book/add.html', categories=categories,
current_user=g.user, book=book_data)
# 处理封面图片上传
cover_url = None
if 'cover' in request.files:
cover_file = request.files['cover']
if cover_file and cover_file.filename != '':
try:
# 更清晰的文件命名
original_filename = secure_filename(cover_file.filename)
# 保留原始文件扩展名
_, ext = os.path.splitext(original_filename)
if not ext:
ext = '.jpg' # 默认扩展名
filename = f"{uuid.uuid4()}{ext}"
upload_folder = os.path.join(current_app.static_folder, 'uploads', 'covers')
# 确保上传目录存在
if not os.path.exists(upload_folder):
os.makedirs(upload_folder)
file_path = os.path.join(upload_folder, filename)
cover_file.save(file_path)
cover_url = f'/static/uploads/covers/{filename}'
except Exception as e:
current_app.logger.error(f"封面上传失败: {str(e)}")
flash(f"封面上传失败: {str(e)}", 'warning')
try:
# 创建新图书
book = Book(
title=title,
author=author,
publisher=publisher,
category_id=category_id,
tags=tags,
isbn=isbn,
publish_year=publish_year,
description=description,
cover_url=cover_url,
stock=stock,
price=price,
status=1,
created_at=datetime.datetime.now(),
updated_at=datetime.datetime.now()
)
db.session.add(book)
# 先提交以获取book的id
db.session.commit()
# 记录库存日志 - 在获取 book.id 后
if stock and int(stock) > 0:
from app.models.inventory import InventoryLog
inventory_log = InventoryLog(
book_id=book.id,
change_type='入库',
change_amount=stock,
after_stock=stock,
operator_id=g.user.id,
remark='新书入库',
changed_at=datetime.datetime.now()
)
db.session.add(inventory_log)
db.session.commit()
flash(f'{title}》添加成功', 'success')
return redirect(url_for('book.book_list'))
except Exception as e:
db.session.rollback()
error_msg = str(e)
# 记录详细错误日志
current_app.logger.error(f"添加图书失败: {error_msg}")
flash(f'添加图书失败: {error_msg}', 'danger')
categories = Category.query.all()
# 保留已填写的表单数据
book_data = {
'title': title,
'author': author,
'publisher': publisher,
'category_id': category_id,
'tags': tags,
'isbn': isbn,
'publish_year': publish_year,
'description': description,
'stock': stock,
'price': price
}
return render_template('book/add.html', categories=categories,
current_user=g.user, book=book_data)
categories = Category.query.all()
return render_template('book/add.html', categories=categories, current_user=g.user)
# 编辑图书
@book_bp.route('/edit/<int:book_id>', methods=['GET', 'POST'])
@login_required
@admin_required
def edit_book(book_id):
book = Book.query.get_or_404(book_id)
if request.method == 'POST':
title = request.form.get('title')
author = request.form.get('author')
publisher = request.form.get('publisher')
category_id = request.form.get('category_id')
tags = request.form.get('tags')
isbn = request.form.get('isbn')
publish_year = request.form.get('publish_year')
description = request.form.get('description')
price = request.form.get('price')
status = request.form.get('status', type=int)
if not title or not author:
flash('书名和作者不能为空', 'danger')
categories = Category.query.all()
return render_template('book/edit.html', book=book, categories=categories, current_user=g.user)
# 处理库存变更
new_stock = request.form.get('stock', type=int)
if new_stock != book.stock:
from app.models.inventory import InventoryLog
change_amount = new_stock - book.stock
change_type = '入库' if change_amount > 0 else '出库'
inventory_log = InventoryLog(
book_id=book.id,
change_type=change_type,
change_amount=abs(change_amount),
after_stock=new_stock,
operator_id=g.user.id,
remark=f'管理员编辑图书库存 - {book.title}',
changed_at=datetime.datetime.now()
)
db.session.add(inventory_log)
book.stock = new_stock
# 处理封面图片上传
if 'cover' in request.files:
cover_file = request.files['cover']
if cover_file and cover_file.filename != '':
filename = secure_filename(f"{uuid.uuid4()}_{cover_file.filename}")
upload_folder = os.path.join(current_app.static_folder, 'uploads/covers')
# 确保上传目录存在
if not os.path.exists(upload_folder):
os.makedirs(upload_folder)
file_path = os.path.join(upload_folder, filename)
cover_file.save(file_path)
book.cover_url = f'/static/covers/{filename}'
# 更新图书信息
book.title = title
book.author = author
book.publisher = publisher
book.category_id = category_id
book.tags = tags
book.isbn = isbn
book.publish_year = publish_year
book.description = description
book.price = price
book.status = status
book.updated_at = datetime.datetime.now()
db.session.commit()
flash('图书信息更新成功', 'success')
return redirect(url_for('book.book_list'))
categories = Category.query.all()
return render_template('book/edit.html', book=book, categories=categories, current_user=g.user)
# 删除图书
@book_bp.route('/delete/<int:book_id>', methods=['POST'])
@login_required
@admin_required
def delete_book(book_id):
book = Book.query.get_or_404(book_id)
# 检查该书是否有借阅记录
from app.models.borrow import BorrowRecord
active_borrows = BorrowRecord.query.filter_by(book_id=book_id, status=1).count()
if active_borrows > 0:
return jsonify({'success': False, 'message': '该图书有未归还的借阅记录,无法删除'})
# 考虑软删除而不是物理删除
book.status = 0 # 0表示已删除/下架
book.updated_at = datetime.datetime.now()
db.session.commit()
return jsonify({'success': True, 'message': '图书已成功下架'})
# 图书分类管理
@book_bp.route('/categories', methods=['GET'])
@login_required
@admin_required
def category_list():
categories = Category.query.all()
return render_template('book/categories.html', categories=categories, current_user=g.user)
# 添加分类
@book_bp.route('/categories/add', methods=['POST'])
@login_required
@admin_required
def add_category():
name = request.form.get('name')
parent_id = request.form.get('parent_id') or None
sort = request.form.get('sort', 0, type=int)
if not name:
return jsonify({'success': False, 'message': '分类名称不能为空'})
category = Category(name=name, parent_id=parent_id, sort=sort)
db.session.add(category)
db.session.commit()
return jsonify({'success': True, 'message': '分类添加成功', 'id': category.id, 'name': category.name})
# 编辑分类
@book_bp.route('/categories/edit/<int:category_id>', methods=['POST'])
@login_required
@admin_required
def edit_category(category_id):
category = Category.query.get_or_404(category_id)
name = request.form.get('name')
parent_id = request.form.get('parent_id') or None
sort = request.form.get('sort', 0, type=int)
if not name:
return jsonify({'success': False, 'message': '分类名称不能为空'})
category.name = name
category.parent_id = parent_id
category.sort = sort
db.session.commit()
return jsonify({'success': True, 'message': '分类更新成功'})
# 删除分类
@book_bp.route('/categories/delete/<int:category_id>', methods=['POST'])
@login_required
@admin_required
def delete_category(category_id):
category = Category.query.get_or_404(category_id)
# 检查是否有书籍使用此分类
books_count = Book.query.filter_by(category_id=category_id).count()
if books_count > 0:
return jsonify({'success': False, 'message': f'该分类下有{books_count}本图书,无法删除'})
# 检查是否有子分类
children_count = Category.query.filter_by(parent_id=category_id).count()
if children_count > 0:
return jsonify({'success': False, 'message': f'该分类下有{children_count}个子分类,无法删除'})
db.session.delete(category)
db.session.commit()
return jsonify({'success': True, 'message': '分类删除成功'})
# 批量导入图书
@book_bp.route('/import', methods=['GET', 'POST'])
@login_required
@admin_required
def import_books():
if request.method == 'POST':
if 'file' not in request.files:
flash('未选择文件', 'danger')
return redirect(request.url)
file = request.files['file']
if file.filename == '':
flash('未选择文件', 'danger')
return redirect(request.url)
if file and file.filename.endswith(('.xlsx', '.xls')):
try:
# 读取Excel文件
df = pd.read_excel(file)
success_count = 0
error_count = 0
errors = []
# 处理每一行数据
for index, row in df.iterrows():
try:
# 检查必填字段
if pd.isna(row.get('title')) or pd.isna(row.get('author')):
errors.append(f'{index + 2}行: 书名或作者为空')
error_count += 1
continue
# 检查ISBN是否已存在
isbn = row.get('isbn')
if isbn and not pd.isna(isbn) and Book.query.filter_by(isbn=str(isbn)).first():
errors.append(f'{index + 2}行: ISBN {isbn} 已存在')
error_count += 1
continue
# 创建新书籍记录
book = Book(
title=row.get('title'),
author=row.get('author'),
publisher=row.get('publisher') if not pd.isna(row.get('publisher')) else None,
category_id=row.get('category_id') if not pd.isna(row.get('category_id')) else None,
tags=row.get('tags') if not pd.isna(row.get('tags')) else None,
isbn=str(row.get('isbn')) if not pd.isna(row.get('isbn')) else None,
publish_year=str(row.get('publish_year')) if not pd.isna(row.get('publish_year')) else None,
description=row.get('description') if not pd.isna(row.get('description')) else None,
cover_url=row.get('cover_url') if not pd.isna(row.get('cover_url')) else None,
stock=int(row.get('stock')) if not pd.isna(row.get('stock')) else 0,
price=float(row.get('price')) if not pd.isna(row.get('price')) else None,
status=1,
created_at=datetime.datetime.now(),
updated_at=datetime.datetime.now()
)
db.session.add(book)
# 提交以获取book的id
db.session.flush()
# 创建库存日志
if book.stock > 0:
from app.models.inventory import InventoryLog
inventory_log = InventoryLog(
book_id=book.id,
change_type='入库',
change_amount=book.stock,
after_stock=book.stock,
operator_id=g.user.id,
remark='批量导入图书',
changed_at=datetime.datetime.now()
)
db.session.add(inventory_log)
success_count += 1
except Exception as e:
errors.append(f'{index + 2}行: {str(e)}')
error_count += 1
db.session.commit()
flash(f'导入完成: 成功{success_count}条,失败{error_count}', 'info')
if errors:
flash('<br>'.join(errors[:10]) + (f'<br>...等共{len(errors)}个错误' if len(errors) > 10 else ''),
'warning')
return redirect(url_for('book.book_list'))
except Exception as e:
flash(f'导入失败: {str(e)}', 'danger')
return redirect(request.url)
else:
flash('只支持Excel文件(.xlsx, .xls)', 'danger')
return redirect(request.url)
return render_template('book/import.html', current_user=g.user)
# 导出图书
@book_bp.route('/export')
@login_required
@admin_required
def export_books():
# 获取查询参数
search = request.args.get('search', '')
category_id = request.args.get('category_id', type=int)
query = Book.query
if search:
query = query.filter(
(Book.title.contains(search)) |
(Book.author.contains(search)) |
(Book.isbn.contains(search))
)
if category_id:
query = query.filter_by(category_id=category_id)
books = query.all()
# 创建DataFrame
data = []
for book in books:
category_name = book.category.name if book.category else ""
data.append({
'id': book.id,
'title': book.title,
'author': book.author,
'publisher': book.publisher,
'category': category_name,
'tags': book.tags,
'isbn': book.isbn,
'publish_year': book.publish_year,
'description': book.description,
'stock': book.stock,
'price': book.price,
'status': '上架' if book.status == 1 else '下架',
'created_at': book.created_at.strftime('%Y-%m-%d %H:%M:%S') if book.created_at else '',
'updated_at': book.updated_at.strftime('%Y-%m-%d %H:%M:%S') if book.updated_at else ''
})
df = pd.DataFrame(data)
# 创建临时文件
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
filename = f'books_export_{timestamp}.xlsx'
filepath = os.path.join(current_app.static_folder, 'temp', filename)
# 确保目录存在
os.makedirs(os.path.dirname(filepath), exist_ok=True)
# 写入Excel
df.to_excel(filepath, index=False)
# 提供下载链接
return redirect(url_for('static', filename=f'temp/{filename}'))