2025-07-04 19:07:35 +08:00

668 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
商品管理视图
"""
from flask import Blueprint, render_template, request, redirect, url_for, flash, jsonify, g
from werkzeug.utils import secure_filename
from app.models.product import Product, Category, ProductImage, SpecName, SpecValue, ProductInventory
from app.models.admin import AdminUser
from app.utils.decorators import admin_required, log_operation
from app.utils.cos_client import cos_client
from config.database import db
from sqlalchemy import func
import time
import uuid
import json
product_bp = Blueprint('product', __name__, url_prefix='/admin/products')
@product_bp.route('/')
@admin_required
def index():
"""商品列表"""
page = request.args.get('page', 1, type=int)
per_page = 20
query = Product.query.order_by(Product.created_at.desc())
# 搜索功能
search = request.args.get('search', '').strip()
if search:
query = query.filter(Product.name.like(f'%{search}%'))
# 分类筛选
category_id = request.args.get('category_id', '', type=str)
if category_id:
query = query.filter(Product.category_id == int(category_id))
# 状态筛选
status = request.args.get('status', '', type=str)
if status:
query = query.filter(Product.status == int(status))
products = query.paginate(page=page, per_page=per_page, error_out=False)
# 获取所有分类用于筛选
categories = Category.query.filter_by(is_active=1).order_by(Category.sort_order).all()
return render_template('admin/products.html',
products=products,
categories=categories,
search=search,
category_id=category_id,
status=status)
@product_bp.route('/add')
@admin_required
def add():
"""添加商品页面"""
categories = Category.query.filter_by(is_active=1).order_by(Category.sort_order).all()
spec_names = SpecName.query.order_by(SpecName.sort_order).all()
return render_template('admin/product_form.html',
product=None,
categories=categories,
spec_names=spec_names)
@product_bp.route('/edit/<int:product_id>')
@admin_required
def edit(product_id):
"""编辑商品页面"""
product = Product.query.get_or_404(product_id)
categories = Category.query.filter_by(is_active=1).order_by(Category.sort_order).all()
spec_names = SpecName.query.order_by(SpecName.sort_order).all()
return render_template('admin/product_form.html',
product=product,
categories=categories,
spec_names=spec_names)
@product_bp.route('/save', methods=['POST'])
@admin_required
@log_operation('保存商品信息', 'product')
def save():
"""保存商品信息"""
try:
product_id = request.form.get('product_id', type=int)
# 基本信息
name = request.form.get('name', '').strip()
category_id = request.form.get('category_id', type=int)
brand = request.form.get('brand', '').strip()
price = request.form.get('price', type=float)
original_price = request.form.get('original_price', type=float)
description = request.form.get('description', '').strip()
weight = request.form.get('weight', type=float)
status = request.form.get('status', 1, type=int)
# 验证必填字段
if not name or not category_id or not price:
flash('请填写完整的商品基本信息', 'error')
return redirect(request.referrer)
# 创建或更新商品
if product_id:
product = Product.query.get_or_404(product_id)
else:
product = Product()
product.name = name
product.category_id = category_id
product.brand = brand
product.price = price
product.original_price = original_price
product.description = description
product.weight = weight
product.status = status
# 处理库存类型
inventory_type = request.form.get('inventory_type', 'single')
if inventory_type == 'single':
product.has_specs = 0
else:
product.has_specs = 1
if not product_id:
db.session.add(product)
db.session.flush() # 获取product.id
# 处理库存信息
if inventory_type == 'single':
# 单规格处理
single_stock = request.form.get('single_stock', 0, type=int)
warning_stock = request.form.get('warning_stock', 10, type=int)
# 删除现有库存记录(如果是编辑模式)
if product_id:
ProductInventory.query.filter_by(product_id=product.id).delete()
# 创建单个SKU
sku_code = f"{product.name[:3].upper()}-DEFAULT-{product.id}"
inventory = ProductInventory(
product_id=product.id,
sku_code=sku_code,
spec_combination=None,
price_adjustment=0,
stock=single_stock,
warning_stock=warning_stock,
is_default=1,
status=1
)
db.session.add(inventory)
else:
# 多规格处理
sku_codes = request.form.getlist('sku_codes[]')
spec_combinations = request.form.getlist('spec_combinations[]')
price_adjustments = request.form.getlist('price_adjustments[]')
stocks = request.form.getlist('stocks[]')
warning_stocks = request.form.getlist('warning_stocks[]')
default_sku_index = request.form.get('default_sku', 0, type=int)
if not sku_codes:
flash('请至少添加一个SKU', 'error')
return redirect(request.referrer)
# 删除现有库存记录(如果是编辑模式)
if product_id:
ProductInventory.query.filter_by(product_id=product.id).delete()
# 创建多个SKU
for i, sku_code in enumerate(sku_codes):
try:
spec_combination = json.loads(spec_combinations[i]) if spec_combinations[i] else None
except:
spec_combination = None
inventory = ProductInventory(
product_id=product.id,
sku_code=sku_code,
spec_combination=spec_combination,
price_adjustment=float(price_adjustments[i]) if price_adjustments[i] else 0,
stock=int(stocks[i]) if stocks[i] else 0,
warning_stock=int(warning_stocks[i]) if warning_stocks[i] else 10,
is_default=1 if i == default_sku_index else 0,
status=1
)
db.session.add(inventory)
db.session.commit()
flash('商品信息保存成功', 'success')
return redirect(url_for('product.edit', product_id=product.id))
except Exception as e:
db.session.rollback()
flash(f'保存失败: {str(e)}', 'error')
return redirect(request.referrer)
@product_bp.route('/upload-images/<int:product_id>', methods=['POST'])
@admin_required
@log_operation('上传商品图片', 'productdef index():')
def upload_images(product_id):
"""上传商品图片"""
try:
product = Product.query.get_or_404(product_id)
if 'files' not in request.files:
return jsonify({'success': False, 'message': '没有选择文件'})
files = request.files.getlist('files')
uploaded_images = []
for file in files:
if file.filename == '':
continue
# 检查文件类型
allowed_extensions = {'jpg', 'jpeg', 'png', 'gif', 'webp'}
file_ext = file.filename.rsplit('.', 1)[1].lower() if '.' in file.filename else ''
if file_ext not in allowed_extensions:
continue
# 上传到COS
result = cos_client.upload_file(file, 'product', file.filename)
if result['success']:
# 检查是否是第一张图片
existing_images_count = ProductImage.query.filter_by(product_id=product_id).count()
is_first_image = (existing_images_count == 0)
# 保存图片记录
image = ProductImage(
product_id=product_id,
image_url=result['url'],
sort_order=existing_images_count,
is_main=1 if is_first_image else 0 # 第一张图片自动设为主图
)
db.session.add(image)
# 如果是第一张图片,同时更新商品主图
if is_first_image:
product.main_image = result['url']
uploaded_images.append({
'id': None, # 临时ID提交后会更新
'url': result['url'],
'sort_order': image.sort_order,
'is_main': is_first_image
})
db.session.commit()
# 更新图片ID
for i, uploaded_image in enumerate(uploaded_images):
image = ProductImage.query.filter_by(
product_id=product_id,
image_url=uploaded_image['url']
).first()
if image:
uploaded_images[i]['id'] = image.id
return jsonify({
'success': True,
'message': f'成功上传 {len(uploaded_images)} 张图片',
'images': uploaded_images
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'上传失败: {str(e)}'})
@product_bp.route('/delete-image/<int:image_id>', methods=['DELETE'])
@admin_required
@log_operation('删除商品图片', 'product_image')
def delete_image(image_id):
"""删除商品图片"""
try:
image = ProductImage.query.get_or_404(image_id)
# 从COS删除文件
if image.image_url:
file_key = image.image_url.split('/')[-4:] # 提取文件路径
file_key = '/'.join(file_key)
cos_client.delete_file(file_key)
db.session.delete(image)
db.session.commit()
return jsonify({'success': True, 'message': '图片删除成功'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'删除失败: {str(e)}'})
@product_bp.route('/set-main-image/<int:image_id>', methods=['POST'])
@admin_required
@log_operation('设置主图', 'product')
def set_main_image(image_id):
"""设置主图"""
try:
image = ProductImage.query.get_or_404(image_id)
product = image.product
# 清除当前主图
ProductImage.query.filter_by(product_id=product.id, is_main=1).update({'is_main': 0})
# 设置新主图
image.is_main = 1
product.main_image = image.image_url
db.session.commit()
return jsonify({'success': True, 'message': '主图设置成功'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'设置失败: {str(e)}'})
@product_bp.route('/sort-images/<int:product_id>', methods=['POST'])
@admin_required
@log_operation('排序商品图片', 'product')
def sort_images(product_id):
"""图片排序"""
try:
image_ids = request.json.get('image_ids', [])
for index, image_id in enumerate(image_ids):
ProductImage.query.filter_by(id=image_id).update({'sort_order': index})
db.session.commit()
return jsonify({'success': True, 'message': '排序保存成功'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'排序失败: {str(e)}'})
@product_bp.route('/delete/<int:product_id>', methods=['DELETE'])
@admin_required
@log_operation('删除商品', 'product')
def delete(product_id):
"""删除商品"""
try:
product = Product.query.get_or_404(product_id)
# 删除商品图片
for image in product.images:
if image.image_url:
file_key = image.image_url.split('/')[-4:]
file_key = '/'.join(file_key)
cos_client.delete_file(file_key)
# 删除商品记录
db.session.delete(product)
db.session.commit()
return jsonify({'success': True, 'message': '商品删除成功'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'删除失败: {str(e)}'})
# 分类管理相关路由
@product_bp.route('/categories')
@admin_required
def categories():
"""分类管理"""
categories = Category.query.order_by(Category.sort_order).all()
return render_template('admin/categories.html', categories=categories)
@product_bp.route('/categories/save', methods=['POST'])
@admin_required
@log_operation('保存商品分类', 'category')
def save_category():
"""保存分类"""
try:
category_id = request.form.get('category_id', type=int)
name = request.form.get('name', '').strip()
parent_id = request.form.get('parent_id', 0, type=int)
sort_order = request.form.get('sort_order', 0, type=int)
is_active = request.form.get('is_active', 1, type=int)
if not name:
flash('分类名称不能为空', 'error')
return redirect(url_for('product.categories'))
# 检查分类名称是否重复
existing = Category.query.filter_by(name=name, parent_id=parent_id).first()
if existing and (not category_id or existing.id != category_id):
flash('同一层级下分类名称不能重复', 'error')
return redirect(url_for('product.categories'))
if category_id:
category = Category.query.get_or_404(category_id)
# 防止将分类设为自己的子分类
if parent_id == category_id:
flash('不能将分类设为自己的子分类', 'error')
return redirect(url_for('product.categories'))
# 防止循环引用
if parent_id != 0:
parent = Category.query.get(parent_id)
temp_parent = parent
while temp_parent and temp_parent.parent_id != 0:
if temp_parent.parent_id == category_id:
flash('不能创建循环引用的分类层级', 'error')
return redirect(url_for('product.categories'))
temp_parent = Category.query.get(temp_parent.parent_id)
else:
category = Category()
category.name = name
category.parent_id = parent_id
category.sort_order = sort_order
category.is_active = is_active
# 设置层级
if parent_id == 0:
category.level = 1
else:
parent = Category.query.get(parent_id)
if parent:
category.level = parent.level + 1
if category.level > 3:
flash('分类层级不能超过3级', 'error')
return redirect(url_for('product.categories'))
else:
category.level = 1
# 处理图标上传
if 'icon' in request.files:
icon_file = request.files['icon']
if icon_file and icon_file.filename:
# 删除旧图标
if category.icon_url:
old_file_key = category.icon_url.split('/')[-4:]
old_file_key = '/'.join(old_file_key)
cos_client.delete_file(old_file_key)
# 上传新图标
result = cos_client.upload_file(icon_file, 'category', icon_file.filename)
if result['success']:
category.icon_url = result['url']
else:
flash(f'图标上传失败: {result["error"]}', 'error')
return redirect(url_for('product.categories'))
if not category_id:
db.session.add(category)
db.session.commit()
flash('分类保存成功', 'success')
except Exception as e:
db.session.rollback()
flash(f'保存失败: {str(e)}', 'error')
return redirect(url_for('product.categories'))
@product_bp.route('/categories/<int:category_id>', methods=['GET'])
@admin_required
def get_category(category_id):
"""获取分类详情"""
try:
category = Category.query.get_or_404(category_id)
return jsonify({
'success': True,
'category': category.to_dict()
})
except Exception as e:
return jsonify({'success': False, 'message': str(e)})
@product_bp.route('/categories/<int:category_id>', methods=['DELETE'])
@admin_required
@log_operation('删除商品分类', 'category')
def delete_category(category_id):
"""删除分类"""
try:
category = Category.query.get_or_404(category_id)
# 检查是否有子分类
children = Category.query.filter_by(parent_id=category_id).count()
if children > 0:
return jsonify({'success': False, 'message': '该分类下还有子分类,无法删除'})
# 检查是否有商品使用此分类
products = Product.query.filter_by(category_id=category_id).count()
if products > 0:
return jsonify({'success': False, 'message': f'该分类下还有 {products} 个商品,无法删除'})
# 删除分类图标
if category.icon_url:
file_key = category.icon_url.split('/')[-4:]
file_key = '/'.join(file_key)
cos_client.delete_file(file_key)
db.session.delete(category)
db.session.commit()
return jsonify({'success': True, 'message': '分类删除成功'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'删除失败: {str(e)}'})
@product_bp.route('/categories/sort', methods=['POST'])
@admin_required
@log_operation('分类排序', 'category')
def sort_categories():
"""分类排序"""
try:
category_orders = request.json.get('orders', [])
for item in category_orders:
category_id = item.get('id')
sort_order = item.get('sort_order')
Category.query.filter_by(id=category_id).update({'sort_order': sort_order})
db.session.commit()
return jsonify({'success': True, 'message': '排序保存成功'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'排序失败: {str(e)}'})
# 库存管理相关路由
@product_bp.route('/inventory/<int:product_id>')
@admin_required
def get_inventory(product_id):
"""获取商品库存详情"""
try:
product = Product.query.get_or_404(product_id)
inventory_list = ProductInventory.query.filter_by(product_id=product_id) \
.order_by(ProductInventory.is_default.desc(), ProductInventory.id).all()
inventory_data = []
for inventory in inventory_list:
inventory_data.append({
'id': inventory.id,
'sku_code': inventory.sku_code,
'spec_combination': inventory.spec_combination,
'stock': inventory.stock,
'warning_stock': inventory.warning_stock,
'price_adjustment': float(inventory.price_adjustment) if inventory.price_adjustment else 0,
'is_default': inventory.is_default,
'status': inventory.status,
'final_price': inventory.get_final_price()
})
return jsonify({
'success': True,
'inventory': inventory_data,
'product_name': product.name
})
except Exception as e:
return jsonify({'success': False, 'message': str(e)})
@product_bp.route('/inventory/update', methods=['POST'])
@admin_required
@log_operation('更新库存', 'inventory')
def update_inventory():
"""批量更新库存"""
try:
inventory_data = request.json.get('inventory_list', [])
for item in inventory_data:
inventory_id = item.get('id')
new_stock = item.get('stock')
if inventory_id and new_stock is not None:
inventory = ProductInventory.query.get(inventory_id)
if inventory:
old_stock = inventory.stock
inventory.stock = new_stock
# 记录库存变更日志
from app.models.product import InventoryLog
InventoryLog.create_log(
product_id=inventory.product_id,
sku_code=inventory.sku_code,
change_type=3, # 调整
change_quantity=new_stock - old_stock,
before_stock=old_stock,
after_stock=new_stock,
remark='管理员手动调整'
)
db.session.commit()
return jsonify({'success': True, 'message': '库存更新成功'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'更新失败: {str(e)}'})
@product_bp.route('/inventory/log/<int:product_id>')
@admin_required
def inventory_log(product_id):
"""查看库存变更日志"""
product = Product.query.get_or_404(product_id)
page = request.args.get('page', 1, type=int)
per_page = 20
from app.models.product import InventoryLog
logs = InventoryLog.query.filter_by(product_id=product_id) \
.order_by(InventoryLog.created_at.desc()) \
.paginate(page=page, per_page=per_page, error_out=False)
return render_template('admin/inventory_log.html',
product=product,
logs=logs)
@product_bp.route('/generate-sku-code', methods=['POST'])
@admin_required
def generate_sku_code():
"""生成SKU编码"""
try:
product_name = request.json.get('product_name', '')
spec_combination = request.json.get('spec_combination', {})
# 生成SKU编码逻辑
short_name = product_name[:3].upper() if product_name else 'PRD'
spec_code = ''.join([v[:2].upper() for v in spec_combination.values()]) if spec_combination else 'DEFAULT'
timestamp = str(int(time.time()))[-4:]
sku_code = f"{short_name}-{spec_code}-{timestamp}"
return jsonify({'success': True, 'sku_code': sku_code})
except Exception as e:
return jsonify({'success': False, 'message': str(e)})
@product_bp.route('/check-sku-code', methods=['POST'])
@admin_required
def check_sku_code():
"""检查SKU编码是否重复"""
try:
sku_code = request.json.get('sku_code', '')
product_id = request.json.get('product_id', None)
query = ProductInventory.query.filter_by(sku_code=sku_code)
if product_id:
query = query.filter(ProductInventory.product_id != product_id)
exists = query.first() is not None
return jsonify({
'success': True,
'exists': exists,
'message': 'SKU编码已存在' if exists else 'SKU编码可用'
})
except Exception as e:
return jsonify({'success': False, 'message': str(e)})