superlishunqin 29914a4178 0506
2025-05-06 12:01:11 +08:00

743 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, g, jsonify
from app.models.book import Book, Category
from app.models.user import db
from app.utils.auth import login_required, admin_required
from flask_login import current_user, login_required
import os
from werkzeug.utils import secure_filename
import datetime
import pandas as pd
import uuid
book_bp = Blueprint('book', __name__)
@book_bp.route('/admin/list')
@login_required
@admin_required
def admin_book_list():
print(f"DEBUG: admin_book_list 函数被调用,用户={current_user.username},认证状态={current_user.is_authenticated}")
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
# 只显示状态为1的图书未下架的图书
query = Book.query.filter_by(status=1)
# 搜索功能
search = request.args.get('search', '')
if search:
query = query.filter(
(Book.title.contains(search)) |
(Book.author.contains(search)) |
(Book.isbn.contains(search))
)
# 分类筛选
category_id = request.args.get('category_id', type=int)
if category_id:
query = query.filter_by(category_id=category_id)
# 排序
sort = request.args.get('sort', 'id')
order = request.args.get('order', 'desc')
if order == 'desc':
query = query.order_by(getattr(Book, sort).desc())
else:
query = query.order_by(getattr(Book, sort))
pagination = query.paginate(page=page, per_page=per_page)
books = pagination.items
# 获取所有分类供筛选使用
categories = Category.query.all()
return render_template('book/list.html',
books=books,
pagination=pagination,
search=search,
categories=categories,
category_id=category_id,
sort=sort,
order=order,
current_user=g.user,
is_admin_view=True) # 指明这是管理视图
# 图书列表页面
@book_bp.route('/list')
@login_required
def book_list():
print("访问图书列表页面") # 调试输出
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
# 只显示状态为1的图书未下架的图书
query = Book.query.filter_by(status=1)
# 搜索功能
search = request.args.get('search', '')
if search:
query = query.filter(
(Book.title.contains(search)) |
(Book.author.contains(search)) |
(Book.isbn.contains(search))
)
# 分类筛选
category_id = request.args.get('category_id', type=int)
if category_id:
query = query.filter_by(category_id=category_id)
# 排序
sort = request.args.get('sort', 'id')
order = request.args.get('order', 'desc')
if order == 'desc':
query = query.order_by(getattr(Book, sort).desc())
else:
query = query.order_by(getattr(Book, sort))
pagination = query.paginate(page=page, per_page=per_page)
books = pagination.items
# 获取所有分类供筛选使用
categories = Category.query.all()
return render_template('book/list.html',
books=books,
pagination=pagination,
search=search,
categories=categories,
category_id=category_id,
sort=sort,
order=order,
current_user=g.user)
# 图书详情页面
@book_bp.route('/detail/<int:book_id>')
@login_required
def book_detail(book_id):
book = Book.query.get_or_404(book_id)
# 添加当前时间用于判断借阅是否逾期
now = datetime.datetime.now()
# 如果用户是管理员,预先查询并排序借阅记录
borrow_records = []
# 防御性编程:确保 g.user 存在且有 role_id 属性
if hasattr(g, 'user') and g.user is not None and hasattr(g.user, 'role_id') and g.user.role_id == 1:
from app.models.borrow import BorrowRecord
borrow_records = BorrowRecord.query.filter_by(book_id=book_id).order_by(BorrowRecord.borrow_date.desc()).limit(
10).all()
return render_template(
'book/detail.html',
book=book,
current_user=current_user, # 使用 flask_login 的 current_user 而不是 g.user
borrow_records=borrow_records,
now=now
)
# 添加图书页面
@book_bp.route('/add', methods=['GET', 'POST'])
@login_required
@admin_required
def add_book():
if request.method == 'POST':
title = request.form.get('title')
author = request.form.get('author')
publisher = request.form.get('publisher')
category_id = request.form.get('category_id')
tags = request.form.get('tags')
isbn = request.form.get('isbn')
publish_year = request.form.get('publish_year')
description = request.form.get('description')
stock = request.form.get('stock', type=int, default=0)
price = request.form.get('price')
# 表单验证
errors = []
if not title:
errors.append('书名不能为空')
if not author:
errors.append('作者不能为空')
# 检查ISBN是否已存在(如果提供了ISBN)
if isbn:
existing_book = Book.query.filter_by(isbn=isbn).first()
if existing_book:
errors.append(f'ISBN "{isbn}" 已存在请检查ISBN或查找现有图书')
if errors:
for error in errors:
flash(error, 'danger')
categories = Category.query.all()
# 保留已填写的表单数据
book_data = {
'title': title,
'author': author,
'publisher': publisher,
'category_id': category_id,
'tags': tags,
'isbn': isbn,
'publish_year': publish_year,
'description': description,
'stock': stock,
'price': price
}
return render_template('book/add.html', categories=categories,
current_user=g.user, book=book_data)
# 处理封面图片上传
cover_url = None
if 'cover' in request.files:
cover_file = request.files['cover']
if cover_file and cover_file.filename != '':
try:
# 更清晰的文件命名
original_filename = secure_filename(cover_file.filename)
# 保留原始文件扩展名
_, ext = os.path.splitext(original_filename)
if not ext:
ext = '.jpg' # 默认扩展名
filename = f"{uuid.uuid4()}{ext}"
upload_folder = os.path.join(current_app.static_folder, 'covers')
# 确保上传目录存在
if not os.path.exists(upload_folder):
os.makedirs(upload_folder)
file_path = os.path.join(upload_folder, filename)
cover_file.save(file_path)
cover_url = f'/static/covers/{filename}'
except Exception as e:
current_app.logger.error(f"封面上传失败: {str(e)}")
flash(f"封面上传失败: {str(e)}", 'warning')
try:
# 创建新图书
book = Book(
title=title,
author=author,
publisher=publisher,
category_id=category_id,
tags=tags,
isbn=isbn,
publish_year=publish_year,
description=description,
cover_url=cover_url,
stock=stock,
price=price,
status=1,
created_at=datetime.datetime.now(),
updated_at=datetime.datetime.now()
)
db.session.add(book)
# 先提交以获取book的id
db.session.commit()
# 记录库存日志 - 在获取 book.id 后
if stock and int(stock) > 0:
from app.models.inventory import InventoryLog
inventory_log = InventoryLog(
book_id=book.id,
change_type='入库',
change_amount=stock,
after_stock=stock,
operator_id=g.user.id,
remark='新书入库',
changed_at=datetime.datetime.now()
)
db.session.add(inventory_log)
db.session.commit()
flash(f'{title}》添加成功', 'success')
return redirect(url_for('book.book_list'))
except Exception as e:
db.session.rollback()
error_msg = str(e)
# 记录详细错误日志
current_app.logger.error(f"添加图书失败: {error_msg}")
flash(f'添加图书失败: {error_msg}', 'danger')
categories = Category.query.all()
# 保留已填写的表单数据
book_data = {
'title': title,
'author': author,
'publisher': publisher,
'category_id': category_id,
'tags': tags,
'isbn': isbn,
'publish_year': publish_year,
'description': description,
'stock': stock,
'price': price
}
return render_template('book/add.html', categories=categories,
current_user=g.user, book=book_data)
categories = Category.query.all()
return render_template('book/add.html', categories=categories, current_user=g.user)
# 编辑图书
@book_bp.route('/edit/<int:book_id>', methods=['GET', 'POST'])
@login_required
@admin_required
def edit_book(book_id):
book = Book.query.get_or_404(book_id)
if request.method == 'POST':
# 获取表单数据
title = request.form.get('title')
author = request.form.get('author')
publisher = request.form.get('publisher')
category_id = request.form.get('category_id')
tags = request.form.get('tags')
isbn = request.form.get('isbn')
publish_year = request.form.get('publish_year')
description = request.form.get('description')
price = request.form.get('price')
status = request.form.get('status', type=int)
# 基本验证
if not title or not author:
flash('书名和作者不能为空', 'danger')
categories = Category.query.all()
return render_template('book/edit.html', book=book, categories=categories, current_user=g.user)
# ISBN验证
if isbn and isbn.strip(): # 确保ISBN不是空字符串
# 移除连字符和空格
clean_isbn = isbn.replace('-', '').replace(' ', '')
# 长度检查
if len(clean_isbn) != 10 and len(clean_isbn) != 13:
flash('ISBN必须是10位或13位', 'danger')
categories = Category.query.all()
return render_template('book/edit.html', book=book, categories=categories, current_user=g.user)
# ISBN-10验证
if len(clean_isbn) == 10:
# 检查前9位是否为数字
if not clean_isbn[:9].isdigit():
flash('ISBN-10的前9位必须是数字', 'danger')
categories = Category.query.all()
return render_template('book/edit.html', book=book, categories=categories, current_user=g.user)
# 检查最后一位是否为数字或'X'
if not (clean_isbn[9].isdigit() or clean_isbn[9].upper() == 'X'):
flash('ISBN-10的最后一位必须是数字或X', 'danger')
categories = Category.query.all()
return render_template('book/edit.html', book=book, categories=categories, current_user=g.user)
# 校验和验证
sum = 0
for i in range(9):
sum += int(clean_isbn[i]) * (10 - i)
check_digit = 10 if clean_isbn[9].upper() == 'X' else int(clean_isbn[9])
sum += check_digit
if sum % 11 != 0:
flash('ISBN-10校验和无效', 'danger')
categories = Category.query.all()
return render_template('book/edit.html', book=book, categories=categories, current_user=g.user)
# ISBN-13验证
if len(clean_isbn) == 13:
# 检查是否全是数字
if not clean_isbn.isdigit():
flash('ISBN-13必须全是数字', 'danger')
categories = Category.query.all()
return render_template('book/edit.html', book=book, categories=categories, current_user=g.user)
# 校验和验证
sum = 0
for i in range(12):
sum += int(clean_isbn[i]) * (1 if i % 2 == 0 else 3)
check_digit = (10 - (sum % 10)) % 10
if check_digit != int(clean_isbn[12]):
flash('ISBN-13校验和无效', 'danger')
categories = Category.query.all()
return render_template('book/edit.html', book=book, categories=categories, current_user=g.user)
# 处理库存变更
new_stock = request.form.get('stock', type=int) or 0 # 默认为0而非None
if new_stock != book.stock:
from app.models.inventory import InventoryLog
change_amount = new_stock - book.stock
change_type = '入库' if change_amount > 0 else '出库'
inventory_log = InventoryLog(
book_id=book.id,
change_type=change_type,
change_amount=abs(change_amount),
after_stock=new_stock,
operator_id=g.user.id,
remark=f'管理员编辑图书库存 - {book.title}',
changed_at=datetime.datetime.now()
)
db.session.add(inventory_log)
book.stock = new_stock
# 处理封面图片上传
if 'cover' in request.files:
cover_file = request.files['cover']
if cover_file and cover_file.filename != '':
filename = secure_filename(f"{uuid.uuid4()}_{cover_file.filename}")
upload_folder = os.path.join(current_app.static_folder, 'covers')
# 确保上传目录存在
if not os.path.exists(upload_folder):
os.makedirs(upload_folder)
file_path = os.path.join(upload_folder, filename)
cover_file.save(file_path)
book.cover_url = f'/static/covers/{filename}'
# 更新图书信息
book.title = title
book.author = author
book.publisher = publisher
book.category_id = category_id
book.tags = tags
book.isbn = isbn
book.publish_year = publish_year
book.description = description
book.price = price
book.status = status
book.updated_at = datetime.datetime.now()
try:
db.session.commit()
flash('图书信息更新成功', 'success')
return redirect(url_for('book.book_list'))
except Exception as e:
db.session.rollback()
flash(f'保存失败: {str(e)}', 'danger')
categories = Category.query.all()
return render_template('book/edit.html', book=book, categories=categories, current_user=g.user)
# GET 请求
categories = Category.query.all()
return render_template('book/edit.html', book=book, categories=categories, current_user=g.user)
# 删除图书
@book_bp.route('/delete/<int:book_id>', methods=['POST'])
@login_required
@admin_required
def delete_book(book_id):
book = Book.query.get_or_404(book_id)
# 检查该书是否有借阅记录
from app.models.borrow import BorrowRecord
active_borrows = BorrowRecord.query.filter_by(book_id=book_id, status=1).count()
if active_borrows > 0:
return jsonify({'success': False, 'message': '该图书有未归还的借阅记录,无法删除'})
# 考虑软删除而不是物理删除
book.status = 0 # 0表示已删除/下架
book.updated_at = datetime.datetime.now()
db.session.commit()
return jsonify({'success': True, 'message': '图书已成功下架'})
# 图书分类管理
@book_bp.route('/categories', methods=['GET'])
@login_required
@admin_required
def category_list():
categories = Category.query.all()
return render_template('book/categories.html', categories=categories, current_user=g.user)
# 添加分类
@book_bp.route('/categories/add', methods=['POST'])
@login_required
@admin_required
def add_category():
name = request.form.get('name')
parent_id = request.form.get('parent_id') or None
sort = request.form.get('sort', 0, type=int)
if not name:
return jsonify({'success': False, 'message': '分类名称不能为空'})
category = Category(name=name, parent_id=parent_id, sort=sort)
db.session.add(category)
db.session.commit()
return jsonify({'success': True, 'message': '分类添加成功', 'id': category.id, 'name': category.name})
# 编辑分类
@book_bp.route('/categories/edit/<int:category_id>', methods=['POST'])
@login_required
@admin_required
def edit_category(category_id):
category = Category.query.get_or_404(category_id)
name = request.form.get('name')
parent_id = request.form.get('parent_id') or None
sort = request.form.get('sort', 0, type=int)
if not name:
return jsonify({'success': False, 'message': '分类名称不能为空'})
category.name = name
category.parent_id = parent_id
category.sort = sort
db.session.commit()
return jsonify({'success': True, 'message': '分类更新成功'})
# 删除分类
@book_bp.route('/categories/delete/<int:category_id>', methods=['POST'])
@login_required
@admin_required
def delete_category(category_id):
category = Category.query.get_or_404(category_id)
# 检查是否有书籍使用此分类
books_count = Book.query.filter_by(category_id=category_id).count()
if books_count > 0:
return jsonify({'success': False, 'message': f'该分类下有{books_count}本图书,无法删除'})
# 检查是否有子分类
children_count = Category.query.filter_by(parent_id=category_id).count()
if children_count > 0:
return jsonify({'success': False, 'message': f'该分类下有{children_count}个子分类,无法删除'})
db.session.delete(category)
db.session.commit()
return jsonify({'success': True, 'message': '分类删除成功'})
# 批量导入图书
@book_bp.route('/import', methods=['GET', 'POST'])
@login_required
@admin_required
def import_books():
if request.method == 'POST':
if 'file' not in request.files:
flash('未选择文件', 'danger')
return redirect(request.url)
file = request.files['file']
if file.filename == '':
flash('未选择文件', 'danger')
return redirect(request.url)
if file and file.filename.endswith(('.xlsx', '.xls')):
try:
# 读取Excel文件
df = pd.read_excel(file)
success_count = 0
error_count = 0
errors = []
# 处理每一行数据
for index, row in df.iterrows():
try:
# 检查必填字段
if pd.isna(row.get('title')) or pd.isna(row.get('author')):
errors.append(f'{index + 2}行: 书名或作者为空')
error_count += 1
continue
# 检查ISBN是否已存在
isbn = row.get('isbn')
if isbn and not pd.isna(isbn) and Book.query.filter_by(isbn=str(isbn)).first():
errors.append(f'{index + 2}行: ISBN {isbn} 已存在')
error_count += 1
continue
# 创建新书籍记录
book = Book(
title=row.get('title'),
author=row.get('author'),
publisher=row.get('publisher') if not pd.isna(row.get('publisher')) else None,
category_id=row.get('category_id') if not pd.isna(row.get('category_id')) else None,
tags=row.get('tags') if not pd.isna(row.get('tags')) else None,
isbn=str(row.get('isbn')) if not pd.isna(row.get('isbn')) else None,
publish_year=str(row.get('publish_year')) if not pd.isna(row.get('publish_year')) else None,
description=row.get('description') if not pd.isna(row.get('description')) else None,
cover_url=row.get('cover_url') if not pd.isna(row.get('cover_url')) else None,
stock=int(row.get('stock')) if not pd.isna(row.get('stock')) else 0,
price=float(row.get('price')) if not pd.isna(row.get('price')) else None,
status=1,
created_at=datetime.datetime.now(),
updated_at=datetime.datetime.now()
)
db.session.add(book)
# 提交以获取book的id
db.session.flush()
# 创建库存日志
if book.stock > 0:
from app.models.inventory import InventoryLog
inventory_log = InventoryLog(
book_id=book.id,
change_type='入库',
change_amount=book.stock,
after_stock=book.stock,
operator_id=g.user.id,
remark='批量导入图书',
changed_at=datetime.datetime.now()
)
db.session.add(inventory_log)
success_count += 1
except Exception as e:
errors.append(f'{index + 2}行: {str(e)}')
error_count += 1
db.session.commit()
flash(f'导入完成: 成功{success_count}条,失败{error_count}', 'info')
if errors:
flash('<br>'.join(errors[:10]) + (f'<br>...等共{len(errors)}个错误' if len(errors) > 10 else ''),
'warning')
return redirect(url_for('book.book_list'))
except Exception as e:
flash(f'导入失败: {str(e)}', 'danger')
return redirect(request.url)
else:
flash('只支持Excel文件(.xlsx, .xls)', 'danger')
return redirect(request.url)
return render_template('book/import.html', current_user=g.user)
# 导出图书
@book_bp.route('/export')
@login_required
@admin_required
def export_books():
# 获取查询参数
search = request.args.get('search', '')
category_id = request.args.get('category_id', type=int)
query = Book.query
if search:
query = query.filter(
(Book.title.contains(search)) |
(Book.author.contains(search)) |
(Book.isbn.contains(search))
)
if category_id:
query = query.filter_by(category_id=category_id)
books = query.all()
# 创建DataFrame
data = []
for book in books:
category_name = book.category.name if book.category else ""
data.append({
'id': book.id,
'title': book.title,
'author': book.author,
'publisher': book.publisher,
'category': category_name,
'tags': book.tags,
'isbn': book.isbn,
'publish_year': book.publish_year,
'description': book.description,
'stock': book.stock,
'price': book.price,
'status': '上架' if book.status == 1 else '下架',
'created_at': book.created_at.strftime('%Y-%m-%d %H:%M:%S') if book.created_at else '',
'updated_at': book.updated_at.strftime('%Y-%m-%d %H:%M:%S') if book.updated_at else ''
})
df = pd.DataFrame(data)
# 创建临时文件
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
filename = f'books_export_{timestamp}.xlsx'
filepath = os.path.join(current_app.static_folder, 'temp', filename)
# 确保目录存在
os.makedirs(os.path.dirname(filepath), exist_ok=True)
# 写入Excel
df.to_excel(filepath, index=False)
# 提供下载链接
return redirect(url_for('static', filename=f'temp/{filename}'))
@book_bp.route('/test-permissions')
def test_permissions():
"""测试当前用户权限"""
if not current_user.is_authenticated:
return "未登录"
return f"""
<h1>用户权限信息</h1>
<p>用户名: {current_user.username}</p>
<p>角色ID: {current_user.role_id}</p>
<p>是否管理员: {'' if current_user.role_id == 1 else ''}</p>
<p><a href="/book/admin/list">尝试访问管理页面</a></p>
"""
# 添加到app/controllers/book.py文件中
@book_bp.route('/browse')
@login_required
def browse_books():
"""图书浏览页面 - 面向普通用户的友好界面"""
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 12, type=int) # 增加每页数量
# 只显示状态为1的图书未下架的图书
query = Book.query.filter_by(status=1)
# 搜索功能
search = request.args.get('search', '')
if search:
query = query.filter(
(Book.title.contains(search)) |
(Book.author.contains(search)) |
(Book.isbn.contains(search))
)
# 分类筛选
category_id = request.args.get('category_id', type=int)
if category_id:
query = query.filter_by(category_id=category_id)
# 排序
sort = request.args.get('sort', 'id')
order = request.args.get('order', 'desc')
if order == 'desc':
query = query.order_by(getattr(Book, sort).desc())
else:
query = query.order_by(getattr(Book, sort))
pagination = query.paginate(page=page, per_page=per_page)
books = pagination.items
# 获取所有分类供筛选使用
categories = Category.query.all()
return render_template('book/browse.html',
books=books,
pagination=pagination,
search=search,
categories=categories,
category_id=category_id,
sort=sort,
order=order,)