668 lines
23 KiB
Python
668 lines
23 KiB
Python
"""
|
||
商品管理视图
|
||
"""
|
||
from flask import Blueprint, render_template, request, redirect, url_for, flash, jsonify, g
|
||
from werkzeug.utils import secure_filename
|
||
from app.models.product import Product, Category, ProductImage, SpecName, SpecValue, ProductInventory
|
||
from app.models.admin import AdminUser
|
||
from app.utils.decorators import admin_required, log_operation
|
||
from app.utils.cos_client import cos_client
|
||
from config.database import db
|
||
from sqlalchemy import func
|
||
import time
|
||
import uuid
|
||
import json
|
||
|
||
product_bp = Blueprint('product', __name__, url_prefix='/admin/products')
|
||
|
||
|
||
@product_bp.route('/')
|
||
@admin_required
|
||
def index():
|
||
"""商品列表"""
|
||
page = request.args.get('page', 1, type=int)
|
||
per_page = 20
|
||
|
||
query = Product.query.order_by(Product.created_at.desc())
|
||
|
||
# 搜索功能
|
||
search = request.args.get('search', '').strip()
|
||
if search:
|
||
query = query.filter(Product.name.like(f'%{search}%'))
|
||
|
||
# 分类筛选
|
||
category_id = request.args.get('category_id', '', type=str)
|
||
if category_id:
|
||
query = query.filter(Product.category_id == int(category_id))
|
||
|
||
# 状态筛选
|
||
status = request.args.get('status', '', type=str)
|
||
if status:
|
||
query = query.filter(Product.status == int(status))
|
||
|
||
products = query.paginate(page=page, per_page=per_page, error_out=False)
|
||
|
||
# 获取所有分类用于筛选
|
||
categories = Category.query.filter_by(is_active=1).order_by(Category.sort_order).all()
|
||
|
||
return render_template('admin/products.html',
|
||
products=products,
|
||
categories=categories,
|
||
search=search,
|
||
category_id=category_id,
|
||
status=status)
|
||
|
||
|
||
|
||
|
||
@product_bp.route('/add')
|
||
@admin_required
|
||
def add():
|
||
"""添加商品页面"""
|
||
categories = Category.query.filter_by(is_active=1).order_by(Category.sort_order).all()
|
||
spec_names = SpecName.query.order_by(SpecName.sort_order).all()
|
||
|
||
return render_template('admin/product_form.html',
|
||
product=None,
|
||
categories=categories,
|
||
spec_names=spec_names)
|
||
|
||
|
||
@product_bp.route('/edit/<int:product_id>')
|
||
@admin_required
|
||
def edit(product_id):
|
||
"""编辑商品页面"""
|
||
product = Product.query.get_or_404(product_id)
|
||
categories = Category.query.filter_by(is_active=1).order_by(Category.sort_order).all()
|
||
spec_names = SpecName.query.order_by(SpecName.sort_order).all()
|
||
|
||
return render_template('admin/product_form.html',
|
||
product=product,
|
||
categories=categories,
|
||
spec_names=spec_names)
|
||
|
||
|
||
@product_bp.route('/save', methods=['POST'])
|
||
@admin_required
|
||
@log_operation('保存商品信息', 'product')
|
||
def save():
|
||
"""保存商品信息"""
|
||
try:
|
||
product_id = request.form.get('product_id', type=int)
|
||
|
||
# 基本信息
|
||
name = request.form.get('name', '').strip()
|
||
category_id = request.form.get('category_id', type=int)
|
||
brand = request.form.get('brand', '').strip()
|
||
price = request.form.get('price', type=float)
|
||
original_price = request.form.get('original_price', type=float)
|
||
description = request.form.get('description', '').strip()
|
||
weight = request.form.get('weight', type=float)
|
||
status = request.form.get('status', 1, type=int)
|
||
|
||
# 验证必填字段
|
||
if not name or not category_id or not price:
|
||
flash('请填写完整的商品基本信息', 'error')
|
||
return redirect(request.referrer)
|
||
|
||
# 创建或更新商品
|
||
if product_id:
|
||
product = Product.query.get_or_404(product_id)
|
||
else:
|
||
product = Product()
|
||
|
||
product.name = name
|
||
product.category_id = category_id
|
||
product.brand = brand
|
||
product.price = price
|
||
product.original_price = original_price
|
||
product.description = description
|
||
product.weight = weight
|
||
product.status = status
|
||
|
||
# 处理库存类型
|
||
inventory_type = request.form.get('inventory_type', 'single')
|
||
|
||
if inventory_type == 'single':
|
||
product.has_specs = 0
|
||
else:
|
||
product.has_specs = 1
|
||
|
||
if not product_id:
|
||
db.session.add(product)
|
||
db.session.flush() # 获取product.id
|
||
|
||
# 处理库存信息
|
||
if inventory_type == 'single':
|
||
# 单规格处理
|
||
single_stock = request.form.get('single_stock', 0, type=int)
|
||
warning_stock = request.form.get('warning_stock', 10, type=int)
|
||
|
||
# 删除现有库存记录(如果是编辑模式)
|
||
if product_id:
|
||
ProductInventory.query.filter_by(product_id=product.id).delete()
|
||
|
||
# 创建单个SKU
|
||
sku_code = f"{product.name[:3].upper()}-DEFAULT-{product.id}"
|
||
inventory = ProductInventory(
|
||
product_id=product.id,
|
||
sku_code=sku_code,
|
||
spec_combination=None,
|
||
price_adjustment=0,
|
||
stock=single_stock,
|
||
warning_stock=warning_stock,
|
||
is_default=1,
|
||
status=1
|
||
)
|
||
db.session.add(inventory)
|
||
|
||
else:
|
||
# 多规格处理
|
||
sku_codes = request.form.getlist('sku_codes[]')
|
||
spec_combinations = request.form.getlist('spec_combinations[]')
|
||
price_adjustments = request.form.getlist('price_adjustments[]')
|
||
stocks = request.form.getlist('stocks[]')
|
||
warning_stocks = request.form.getlist('warning_stocks[]')
|
||
default_sku_index = request.form.get('default_sku', 0, type=int)
|
||
|
||
if not sku_codes:
|
||
flash('请至少添加一个SKU', 'error')
|
||
return redirect(request.referrer)
|
||
|
||
# 删除现有库存记录(如果是编辑模式)
|
||
if product_id:
|
||
ProductInventory.query.filter_by(product_id=product.id).delete()
|
||
|
||
# 创建多个SKU
|
||
for i, sku_code in enumerate(sku_codes):
|
||
try:
|
||
spec_combination = json.loads(spec_combinations[i]) if spec_combinations[i] else None
|
||
except:
|
||
spec_combination = None
|
||
|
||
inventory = ProductInventory(
|
||
product_id=product.id,
|
||
sku_code=sku_code,
|
||
spec_combination=spec_combination,
|
||
price_adjustment=float(price_adjustments[i]) if price_adjustments[i] else 0,
|
||
stock=int(stocks[i]) if stocks[i] else 0,
|
||
warning_stock=int(warning_stocks[i]) if warning_stocks[i] else 10,
|
||
is_default=1 if i == default_sku_index else 0,
|
||
status=1
|
||
)
|
||
db.session.add(inventory)
|
||
|
||
db.session.commit()
|
||
|
||
flash('商品信息保存成功', 'success')
|
||
return redirect(url_for('product.edit', product_id=product.id))
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
flash(f'保存失败: {str(e)}', 'error')
|
||
return redirect(request.referrer)
|
||
|
||
|
||
@product_bp.route('/upload-images/<int:product_id>', methods=['POST'])
|
||
@admin_required
|
||
@log_operation('上传商品图片', 'productdef index():')
|
||
def upload_images(product_id):
|
||
"""上传商品图片"""
|
||
try:
|
||
product = Product.query.get_or_404(product_id)
|
||
if 'files' not in request.files:
|
||
return jsonify({'success': False, 'message': '没有选择文件'})
|
||
files = request.files.getlist('files')
|
||
uploaded_images = []
|
||
for file in files:
|
||
if file.filename == '':
|
||
continue
|
||
# 检查文件类型
|
||
allowed_extensions = {'jpg', 'jpeg', 'png', 'gif', 'webp'}
|
||
file_ext = file.filename.rsplit('.', 1)[1].lower() if '.' in file.filename else ''
|
||
if file_ext not in allowed_extensions:
|
||
continue
|
||
# 上传到COS
|
||
result = cos_client.upload_file(file, 'product', file.filename)
|
||
if result['success']:
|
||
# 检查是否是第一张图片
|
||
existing_images_count = ProductImage.query.filter_by(product_id=product_id).count()
|
||
is_first_image = (existing_images_count == 0)
|
||
|
||
# 保存图片记录
|
||
image = ProductImage(
|
||
product_id=product_id,
|
||
image_url=result['url'],
|
||
sort_order=existing_images_count,
|
||
is_main=1 if is_first_image else 0 # 第一张图片自动设为主图
|
||
)
|
||
db.session.add(image)
|
||
|
||
# 如果是第一张图片,同时更新商品主图
|
||
if is_first_image:
|
||
product.main_image = result['url']
|
||
uploaded_images.append({
|
||
'id': None, # 临时ID,提交后会更新
|
||
'url': result['url'],
|
||
'sort_order': image.sort_order,
|
||
'is_main': is_first_image
|
||
})
|
||
db.session.commit()
|
||
# 更新图片ID
|
||
for i, uploaded_image in enumerate(uploaded_images):
|
||
image = ProductImage.query.filter_by(
|
||
product_id=product_id,
|
||
image_url=uploaded_image['url']
|
||
).first()
|
||
if image:
|
||
uploaded_images[i]['id'] = image.id
|
||
return jsonify({
|
||
'success': True,
|
||
'message': f'成功上传 {len(uploaded_images)} 张图片',
|
||
'images': uploaded_images
|
||
})
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
return jsonify({'success': False, 'message': f'上传失败: {str(e)}'})
|
||
|
||
|
||
@product_bp.route('/delete-image/<int:image_id>', methods=['DELETE'])
|
||
@admin_required
|
||
@log_operation('删除商品图片', 'product_image')
|
||
def delete_image(image_id):
|
||
"""删除商品图片"""
|
||
try:
|
||
image = ProductImage.query.get_or_404(image_id)
|
||
|
||
# 从COS删除文件
|
||
if image.image_url:
|
||
file_key = image.image_url.split('/')[-4:] # 提取文件路径
|
||
file_key = '/'.join(file_key)
|
||
cos_client.delete_file(file_key)
|
||
|
||
db.session.delete(image)
|
||
db.session.commit()
|
||
|
||
return jsonify({'success': True, 'message': '图片删除成功'})
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
return jsonify({'success': False, 'message': f'删除失败: {str(e)}'})
|
||
|
||
|
||
@product_bp.route('/set-main-image/<int:image_id>', methods=['POST'])
|
||
@admin_required
|
||
@log_operation('设置主图', 'product')
|
||
def set_main_image(image_id):
|
||
"""设置主图"""
|
||
try:
|
||
image = ProductImage.query.get_or_404(image_id)
|
||
product = image.product
|
||
|
||
# 清除当前主图
|
||
ProductImage.query.filter_by(product_id=product.id, is_main=1).update({'is_main': 0})
|
||
|
||
# 设置新主图
|
||
image.is_main = 1
|
||
product.main_image = image.image_url
|
||
|
||
db.session.commit()
|
||
|
||
return jsonify({'success': True, 'message': '主图设置成功'})
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
return jsonify({'success': False, 'message': f'设置失败: {str(e)}'})
|
||
|
||
|
||
@product_bp.route('/sort-images/<int:product_id>', methods=['POST'])
|
||
@admin_required
|
||
@log_operation('排序商品图片', 'product')
|
||
def sort_images(product_id):
|
||
"""图片排序"""
|
||
try:
|
||
image_ids = request.json.get('image_ids', [])
|
||
|
||
for index, image_id in enumerate(image_ids):
|
||
ProductImage.query.filter_by(id=image_id).update({'sort_order': index})
|
||
|
||
db.session.commit()
|
||
|
||
return jsonify({'success': True, 'message': '排序保存成功'})
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
return jsonify({'success': False, 'message': f'排序失败: {str(e)}'})
|
||
|
||
|
||
@product_bp.route('/delete/<int:product_id>', methods=['DELETE'])
|
||
@admin_required
|
||
@log_operation('删除商品', 'product')
|
||
def delete(product_id):
|
||
"""删除商品"""
|
||
try:
|
||
product = Product.query.get_or_404(product_id)
|
||
|
||
# 删除商品图片
|
||
for image in product.images:
|
||
if image.image_url:
|
||
file_key = image.image_url.split('/')[-4:]
|
||
file_key = '/'.join(file_key)
|
||
cos_client.delete_file(file_key)
|
||
|
||
# 删除商品记录
|
||
db.session.delete(product)
|
||
db.session.commit()
|
||
|
||
return jsonify({'success': True, 'message': '商品删除成功'})
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
return jsonify({'success': False, 'message': f'删除失败: {str(e)}'})
|
||
|
||
|
||
# 分类管理相关路由
|
||
@product_bp.route('/categories')
|
||
@admin_required
|
||
def categories():
|
||
"""分类管理"""
|
||
categories = Category.query.order_by(Category.sort_order).all()
|
||
return render_template('admin/categories.html', categories=categories)
|
||
|
||
|
||
@product_bp.route('/categories/save', methods=['POST'])
|
||
@admin_required
|
||
@log_operation('保存商品分类', 'category')
|
||
def save_category():
|
||
"""保存分类"""
|
||
try:
|
||
category_id = request.form.get('category_id', type=int)
|
||
name = request.form.get('name', '').strip()
|
||
parent_id = request.form.get('parent_id', 0, type=int)
|
||
sort_order = request.form.get('sort_order', 0, type=int)
|
||
is_active = request.form.get('is_active', 1, type=int)
|
||
|
||
if not name:
|
||
flash('分类名称不能为空', 'error')
|
||
return redirect(url_for('product.categories'))
|
||
|
||
# 检查分类名称是否重复
|
||
existing = Category.query.filter_by(name=name, parent_id=parent_id).first()
|
||
if existing and (not category_id or existing.id != category_id):
|
||
flash('同一层级下分类名称不能重复', 'error')
|
||
return redirect(url_for('product.categories'))
|
||
|
||
if category_id:
|
||
category = Category.query.get_or_404(category_id)
|
||
|
||
# 防止将分类设为自己的子分类
|
||
if parent_id == category_id:
|
||
flash('不能将分类设为自己的子分类', 'error')
|
||
return redirect(url_for('product.categories'))
|
||
|
||
# 防止循环引用
|
||
if parent_id != 0:
|
||
parent = Category.query.get(parent_id)
|
||
temp_parent = parent
|
||
while temp_parent and temp_parent.parent_id != 0:
|
||
if temp_parent.parent_id == category_id:
|
||
flash('不能创建循环引用的分类层级', 'error')
|
||
return redirect(url_for('product.categories'))
|
||
temp_parent = Category.query.get(temp_parent.parent_id)
|
||
else:
|
||
category = Category()
|
||
|
||
category.name = name
|
||
category.parent_id = parent_id
|
||
category.sort_order = sort_order
|
||
category.is_active = is_active
|
||
|
||
# 设置层级
|
||
if parent_id == 0:
|
||
category.level = 1
|
||
else:
|
||
parent = Category.query.get(parent_id)
|
||
if parent:
|
||
category.level = parent.level + 1
|
||
if category.level > 3:
|
||
flash('分类层级不能超过3级', 'error')
|
||
return redirect(url_for('product.categories'))
|
||
else:
|
||
category.level = 1
|
||
|
||
# 处理图标上传
|
||
if 'icon' in request.files:
|
||
icon_file = request.files['icon']
|
||
if icon_file and icon_file.filename:
|
||
# 删除旧图标
|
||
if category.icon_url:
|
||
old_file_key = category.icon_url.split('/')[-4:]
|
||
old_file_key = '/'.join(old_file_key)
|
||
cos_client.delete_file(old_file_key)
|
||
|
||
# 上传新图标
|
||
result = cos_client.upload_file(icon_file, 'category', icon_file.filename)
|
||
if result['success']:
|
||
category.icon_url = result['url']
|
||
else:
|
||
flash(f'图标上传失败: {result["error"]}', 'error')
|
||
return redirect(url_for('product.categories'))
|
||
|
||
if not category_id:
|
||
db.session.add(category)
|
||
|
||
db.session.commit()
|
||
flash('分类保存成功', 'success')
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
flash(f'保存失败: {str(e)}', 'error')
|
||
|
||
return redirect(url_for('product.categories'))
|
||
|
||
|
||
@product_bp.route('/categories/<int:category_id>', methods=['GET'])
|
||
@admin_required
|
||
def get_category(category_id):
|
||
"""获取分类详情"""
|
||
try:
|
||
category = Category.query.get_or_404(category_id)
|
||
return jsonify({
|
||
'success': True,
|
||
'category': category.to_dict()
|
||
})
|
||
except Exception as e:
|
||
return jsonify({'success': False, 'message': str(e)})
|
||
|
||
|
||
@product_bp.route('/categories/<int:category_id>', methods=['DELETE'])
|
||
@admin_required
|
||
@log_operation('删除商品分类', 'category')
|
||
def delete_category(category_id):
|
||
"""删除分类"""
|
||
try:
|
||
category = Category.query.get_or_404(category_id)
|
||
|
||
# 检查是否有子分类
|
||
children = Category.query.filter_by(parent_id=category_id).count()
|
||
if children > 0:
|
||
return jsonify({'success': False, 'message': '该分类下还有子分类,无法删除'})
|
||
|
||
# 检查是否有商品使用此分类
|
||
products = Product.query.filter_by(category_id=category_id).count()
|
||
if products > 0:
|
||
return jsonify({'success': False, 'message': f'该分类下还有 {products} 个商品,无法删除'})
|
||
|
||
# 删除分类图标
|
||
if category.icon_url:
|
||
file_key = category.icon_url.split('/')[-4:]
|
||
file_key = '/'.join(file_key)
|
||
cos_client.delete_file(file_key)
|
||
|
||
db.session.delete(category)
|
||
db.session.commit()
|
||
|
||
return jsonify({'success': True, 'message': '分类删除成功'})
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
return jsonify({'success': False, 'message': f'删除失败: {str(e)}'})
|
||
|
||
|
||
@product_bp.route('/categories/sort', methods=['POST'])
|
||
@admin_required
|
||
@log_operation('分类排序', 'category')
|
||
def sort_categories():
|
||
"""分类排序"""
|
||
try:
|
||
category_orders = request.json.get('orders', [])
|
||
|
||
for item in category_orders:
|
||
category_id = item.get('id')
|
||
sort_order = item.get('sort_order')
|
||
|
||
Category.query.filter_by(id=category_id).update({'sort_order': sort_order})
|
||
|
||
db.session.commit()
|
||
|
||
return jsonify({'success': True, 'message': '排序保存成功'})
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
return jsonify({'success': False, 'message': f'排序失败: {str(e)}'})
|
||
|
||
|
||
# 库存管理相关路由
|
||
@product_bp.route('/inventory/<int:product_id>')
|
||
@admin_required
|
||
def get_inventory(product_id):
|
||
"""获取商品库存详情"""
|
||
try:
|
||
product = Product.query.get_or_404(product_id)
|
||
inventory_list = ProductInventory.query.filter_by(product_id=product_id) \
|
||
.order_by(ProductInventory.is_default.desc(), ProductInventory.id).all()
|
||
|
||
inventory_data = []
|
||
for inventory in inventory_list:
|
||
inventory_data.append({
|
||
'id': inventory.id,
|
||
'sku_code': inventory.sku_code,
|
||
'spec_combination': inventory.spec_combination,
|
||
'stock': inventory.stock,
|
||
'warning_stock': inventory.warning_stock,
|
||
'price_adjustment': float(inventory.price_adjustment) if inventory.price_adjustment else 0,
|
||
'is_default': inventory.is_default,
|
||
'status': inventory.status,
|
||
'final_price': inventory.get_final_price()
|
||
})
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'inventory': inventory_data,
|
||
'product_name': product.name
|
||
})
|
||
|
||
except Exception as e:
|
||
return jsonify({'success': False, 'message': str(e)})
|
||
|
||
|
||
@product_bp.route('/inventory/update', methods=['POST'])
|
||
@admin_required
|
||
@log_operation('更新库存', 'inventory')
|
||
def update_inventory():
|
||
"""批量更新库存"""
|
||
try:
|
||
inventory_data = request.json.get('inventory_list', [])
|
||
|
||
for item in inventory_data:
|
||
inventory_id = item.get('id')
|
||
new_stock = item.get('stock')
|
||
|
||
if inventory_id and new_stock is not None:
|
||
inventory = ProductInventory.query.get(inventory_id)
|
||
if inventory:
|
||
old_stock = inventory.stock
|
||
inventory.stock = new_stock
|
||
|
||
# 记录库存变更日志
|
||
from app.models.product import InventoryLog
|
||
InventoryLog.create_log(
|
||
product_id=inventory.product_id,
|
||
sku_code=inventory.sku_code,
|
||
change_type=3, # 调整
|
||
change_quantity=new_stock - old_stock,
|
||
before_stock=old_stock,
|
||
after_stock=new_stock,
|
||
remark='管理员手动调整'
|
||
)
|
||
|
||
db.session.commit()
|
||
return jsonify({'success': True, 'message': '库存更新成功'})
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
return jsonify({'success': False, 'message': f'更新失败: {str(e)}'})
|
||
|
||
|
||
@product_bp.route('/inventory/log/<int:product_id>')
|
||
@admin_required
|
||
def inventory_log(product_id):
|
||
"""查看库存变更日志"""
|
||
product = Product.query.get_or_404(product_id)
|
||
page = request.args.get('page', 1, type=int)
|
||
per_page = 20
|
||
|
||
from app.models.product import InventoryLog
|
||
logs = InventoryLog.query.filter_by(product_id=product_id) \
|
||
.order_by(InventoryLog.created_at.desc()) \
|
||
.paginate(page=page, per_page=per_page, error_out=False)
|
||
|
||
return render_template('admin/inventory_log.html',
|
||
product=product,
|
||
logs=logs)
|
||
|
||
|
||
@product_bp.route('/generate-sku-code', methods=['POST'])
|
||
@admin_required
|
||
def generate_sku_code():
|
||
"""生成SKU编码"""
|
||
try:
|
||
product_name = request.json.get('product_name', '')
|
||
spec_combination = request.json.get('spec_combination', {})
|
||
|
||
# 生成SKU编码逻辑
|
||
short_name = product_name[:3].upper() if product_name else 'PRD'
|
||
spec_code = ''.join([v[:2].upper() for v in spec_combination.values()]) if spec_combination else 'DEFAULT'
|
||
timestamp = str(int(time.time()))[-4:]
|
||
|
||
sku_code = f"{short_name}-{spec_code}-{timestamp}"
|
||
|
||
return jsonify({'success': True, 'sku_code': sku_code})
|
||
|
||
except Exception as e:
|
||
return jsonify({'success': False, 'message': str(e)})
|
||
|
||
|
||
@product_bp.route('/check-sku-code', methods=['POST'])
|
||
@admin_required
|
||
def check_sku_code():
|
||
"""检查SKU编码是否重复"""
|
||
try:
|
||
sku_code = request.json.get('sku_code', '')
|
||
product_id = request.json.get('product_id', None)
|
||
|
||
query = ProductInventory.query.filter_by(sku_code=sku_code)
|
||
if product_id:
|
||
query = query.filter(ProductInventory.product_id != product_id)
|
||
|
||
exists = query.first() is not None
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'exists': exists,
|
||
'message': 'SKU编码已存在' if exists else 'SKU编码可用'
|
||
})
|
||
|
||
except Exception as e:
|
||
return jsonify({'success': False, 'message': str(e)})
|