""" 商品管理视图 """ from flask import Blueprint, render_template, request, redirect, url_for, flash, jsonify, g from werkzeug.utils import secure_filename from app.models.product import Product, Category, ProductImage, SpecName, SpecValue, ProductInventory from app.models.admin import AdminUser from app.utils.decorators import admin_required, log_operation from app.utils.cos_client import cos_client from config.database import db from sqlalchemy import func import time import uuid import json product_bp = Blueprint('product', __name__, url_prefix='/admin/products') @product_bp.route('/') @admin_required def index(): """商品列表""" page = request.args.get('page', 1, type=int) per_page = 20 query = Product.query.order_by(Product.created_at.desc()) # 搜索功能 search = request.args.get('search', '').strip() if search: query = query.filter(Product.name.like(f'%{search}%')) # 分类筛选 category_id = request.args.get('category_id', '', type=str) if category_id: query = query.filter(Product.category_id == int(category_id)) # 状态筛选 status = request.args.get('status', '', type=str) if status: query = query.filter(Product.status == int(status)) products = query.paginate(page=page, per_page=per_page, error_out=False) # 获取所有分类用于筛选 categories = Category.query.filter_by(is_active=1).order_by(Category.sort_order).all() return render_template('admin/products.html', products=products, categories=categories, search=search, category_id=category_id, status=status) @product_bp.route('/add') @admin_required def add(): """添加商品页面""" categories = Category.query.filter_by(is_active=1).order_by(Category.sort_order).all() spec_names = SpecName.query.order_by(SpecName.sort_order).all() return render_template('admin/product_form.html', product=None, categories=categories, spec_names=spec_names) @product_bp.route('/edit/') @admin_required def edit(product_id): """编辑商品页面""" product = Product.query.get_or_404(product_id) categories = Category.query.filter_by(is_active=1).order_by(Category.sort_order).all() spec_names = SpecName.query.order_by(SpecName.sort_order).all() return render_template('admin/product_form.html', product=product, categories=categories, spec_names=spec_names) @product_bp.route('/save', methods=['POST']) @admin_required @log_operation('保存商品信息', 'product') def save(): """保存商品信息""" try: product_id = request.form.get('product_id', type=int) # 基本信息 name = request.form.get('name', '').strip() category_id = request.form.get('category_id', type=int) brand = request.form.get('brand', '').strip() price = request.form.get('price', type=float) original_price = request.form.get('original_price', type=float) description = request.form.get('description', '').strip() weight = request.form.get('weight', type=float) status = request.form.get('status', 1, type=int) # 验证必填字段 if not name or not category_id or not price: flash('请填写完整的商品基本信息', 'error') return redirect(request.referrer) # 创建或更新商品 if product_id: product = Product.query.get_or_404(product_id) else: product = Product() product.name = name product.category_id = category_id product.brand = brand product.price = price product.original_price = original_price product.description = description product.weight = weight product.status = status # 处理库存类型 inventory_type = request.form.get('inventory_type', 'single') if inventory_type == 'single': product.has_specs = 0 else: product.has_specs = 1 if not product_id: db.session.add(product) db.session.flush() # 获取product.id # 处理库存信息 if inventory_type == 'single': # 单规格处理 single_stock = request.form.get('single_stock', 0, type=int) warning_stock = request.form.get('warning_stock', 10, type=int) # 删除现有库存记录(如果是编辑模式) if product_id: ProductInventory.query.filter_by(product_id=product.id).delete() # 创建单个SKU sku_code = f"{product.name[:3].upper()}-DEFAULT-{product.id}" inventory = ProductInventory( product_id=product.id, sku_code=sku_code, spec_combination=None, price_adjustment=0, stock=single_stock, warning_stock=warning_stock, is_default=1, status=1 ) db.session.add(inventory) else: # 多规格处理 sku_codes = request.form.getlist('sku_codes[]') spec_combinations = request.form.getlist('spec_combinations[]') price_adjustments = request.form.getlist('price_adjustments[]') stocks = request.form.getlist('stocks[]') warning_stocks = request.form.getlist('warning_stocks[]') default_sku_index = request.form.get('default_sku', 0, type=int) if not sku_codes: flash('请至少添加一个SKU', 'error') return redirect(request.referrer) # 删除现有库存记录(如果是编辑模式) if product_id: ProductInventory.query.filter_by(product_id=product.id).delete() # 创建多个SKU for i, sku_code in enumerate(sku_codes): try: spec_combination = json.loads(spec_combinations[i]) if spec_combinations[i] else None except: spec_combination = None inventory = ProductInventory( product_id=product.id, sku_code=sku_code, spec_combination=spec_combination, price_adjustment=float(price_adjustments[i]) if price_adjustments[i] else 0, stock=int(stocks[i]) if stocks[i] else 0, warning_stock=int(warning_stocks[i]) if warning_stocks[i] else 10, is_default=1 if i == default_sku_index else 0, status=1 ) db.session.add(inventory) db.session.commit() flash('商品信息保存成功', 'success') return redirect(url_for('product.edit', product_id=product.id)) except Exception as e: db.session.rollback() flash(f'保存失败: {str(e)}', 'error') return redirect(request.referrer) @product_bp.route('/upload-images/', methods=['POST']) @admin_required @log_operation('上传商品图片', 'productdef index():') def upload_images(product_id): """上传商品图片""" try: product = Product.query.get_or_404(product_id) if 'files' not in request.files: return jsonify({'success': False, 'message': '没有选择文件'}) files = request.files.getlist('files') uploaded_images = [] for file in files: if file.filename == '': continue # 检查文件类型 allowed_extensions = {'jpg', 'jpeg', 'png', 'gif', 'webp'} file_ext = file.filename.rsplit('.', 1)[1].lower() if '.' in file.filename else '' if file_ext not in allowed_extensions: continue # 上传到COS result = cos_client.upload_file(file, 'product', file.filename) if result['success']: # 检查是否是第一张图片 existing_images_count = ProductImage.query.filter_by(product_id=product_id).count() is_first_image = (existing_images_count == 0) # 保存图片记录 image = ProductImage( product_id=product_id, image_url=result['url'], sort_order=existing_images_count, is_main=1 if is_first_image else 0 # 第一张图片自动设为主图 ) db.session.add(image) # 如果是第一张图片,同时更新商品主图 if is_first_image: product.main_image = result['url'] uploaded_images.append({ 'id': None, # 临时ID,提交后会更新 'url': result['url'], 'sort_order': image.sort_order, 'is_main': is_first_image }) db.session.commit() # 更新图片ID for i, uploaded_image in enumerate(uploaded_images): image = ProductImage.query.filter_by( product_id=product_id, image_url=uploaded_image['url'] ).first() if image: uploaded_images[i]['id'] = image.id return jsonify({ 'success': True, 'message': f'成功上传 {len(uploaded_images)} 张图片', 'images': uploaded_images }) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': f'上传失败: {str(e)}'}) @product_bp.route('/delete-image/', methods=['DELETE']) @admin_required @log_operation('删除商品图片', 'product_image') def delete_image(image_id): """删除商品图片""" try: image = ProductImage.query.get_or_404(image_id) # 从COS删除文件 if image.image_url: file_key = image.image_url.split('/')[-4:] # 提取文件路径 file_key = '/'.join(file_key) cos_client.delete_file(file_key) db.session.delete(image) db.session.commit() return jsonify({'success': True, 'message': '图片删除成功'}) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': f'删除失败: {str(e)}'}) @product_bp.route('/set-main-image/', methods=['POST']) @admin_required @log_operation('设置主图', 'product') def set_main_image(image_id): """设置主图""" try: image = ProductImage.query.get_or_404(image_id) product = image.product # 清除当前主图 ProductImage.query.filter_by(product_id=product.id, is_main=1).update({'is_main': 0}) # 设置新主图 image.is_main = 1 product.main_image = image.image_url db.session.commit() return jsonify({'success': True, 'message': '主图设置成功'}) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': f'设置失败: {str(e)}'}) @product_bp.route('/sort-images/', methods=['POST']) @admin_required @log_operation('排序商品图片', 'product') def sort_images(product_id): """图片排序""" try: image_ids = request.json.get('image_ids', []) for index, image_id in enumerate(image_ids): ProductImage.query.filter_by(id=image_id).update({'sort_order': index}) db.session.commit() return jsonify({'success': True, 'message': '排序保存成功'}) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': f'排序失败: {str(e)}'}) @product_bp.route('/delete/', methods=['DELETE']) @admin_required @log_operation('删除商品', 'product') def delete(product_id): """删除商品""" try: product = Product.query.get_or_404(product_id) # 删除商品图片 for image in product.images: if image.image_url: file_key = image.image_url.split('/')[-4:] file_key = '/'.join(file_key) cos_client.delete_file(file_key) # 删除商品记录 db.session.delete(product) db.session.commit() return jsonify({'success': True, 'message': '商品删除成功'}) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': f'删除失败: {str(e)}'}) # 分类管理相关路由 @product_bp.route('/categories') @admin_required def categories(): """分类管理""" categories = Category.query.order_by(Category.sort_order).all() return render_template('admin/categories.html', categories=categories) @product_bp.route('/categories/save', methods=['POST']) @admin_required @log_operation('保存商品分类', 'category') def save_category(): """保存分类""" try: category_id = request.form.get('category_id', type=int) name = request.form.get('name', '').strip() parent_id = request.form.get('parent_id', 0, type=int) sort_order = request.form.get('sort_order', 0, type=int) is_active = request.form.get('is_active', 1, type=int) if not name: flash('分类名称不能为空', 'error') return redirect(url_for('product.categories')) # 检查分类名称是否重复 existing = Category.query.filter_by(name=name, parent_id=parent_id).first() if existing and (not category_id or existing.id != category_id): flash('同一层级下分类名称不能重复', 'error') return redirect(url_for('product.categories')) if category_id: category = Category.query.get_or_404(category_id) # 防止将分类设为自己的子分类 if parent_id == category_id: flash('不能将分类设为自己的子分类', 'error') return redirect(url_for('product.categories')) # 防止循环引用 if parent_id != 0: parent = Category.query.get(parent_id) temp_parent = parent while temp_parent and temp_parent.parent_id != 0: if temp_parent.parent_id == category_id: flash('不能创建循环引用的分类层级', 'error') return redirect(url_for('product.categories')) temp_parent = Category.query.get(temp_parent.parent_id) else: category = Category() category.name = name category.parent_id = parent_id category.sort_order = sort_order category.is_active = is_active # 设置层级 if parent_id == 0: category.level = 1 else: parent = Category.query.get(parent_id) if parent: category.level = parent.level + 1 if category.level > 3: flash('分类层级不能超过3级', 'error') return redirect(url_for('product.categories')) else: category.level = 1 # 处理图标上传 if 'icon' in request.files: icon_file = request.files['icon'] if icon_file and icon_file.filename: # 删除旧图标 if category.icon_url: old_file_key = category.icon_url.split('/')[-4:] old_file_key = '/'.join(old_file_key) cos_client.delete_file(old_file_key) # 上传新图标 result = cos_client.upload_file(icon_file, 'category', icon_file.filename) if result['success']: category.icon_url = result['url'] else: flash(f'图标上传失败: {result["error"]}', 'error') return redirect(url_for('product.categories')) if not category_id: db.session.add(category) db.session.commit() flash('分类保存成功', 'success') except Exception as e: db.session.rollback() flash(f'保存失败: {str(e)}', 'error') return redirect(url_for('product.categories')) @product_bp.route('/categories/', methods=['GET']) @admin_required def get_category(category_id): """获取分类详情""" try: category = Category.query.get_or_404(category_id) return jsonify({ 'success': True, 'category': category.to_dict() }) except Exception as e: return jsonify({'success': False, 'message': str(e)}) @product_bp.route('/categories/', methods=['DELETE']) @admin_required @log_operation('删除商品分类', 'category') def delete_category(category_id): """删除分类""" try: category = Category.query.get_or_404(category_id) # 检查是否有子分类 children = Category.query.filter_by(parent_id=category_id).count() if children > 0: return jsonify({'success': False, 'message': '该分类下还有子分类,无法删除'}) # 检查是否有商品使用此分类 products = Product.query.filter_by(category_id=category_id).count() if products > 0: return jsonify({'success': False, 'message': f'该分类下还有 {products} 个商品,无法删除'}) # 删除分类图标 if category.icon_url: file_key = category.icon_url.split('/')[-4:] file_key = '/'.join(file_key) cos_client.delete_file(file_key) db.session.delete(category) db.session.commit() return jsonify({'success': True, 'message': '分类删除成功'}) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': f'删除失败: {str(e)}'}) @product_bp.route('/categories/sort', methods=['POST']) @admin_required @log_operation('分类排序', 'category') def sort_categories(): """分类排序""" try: category_orders = request.json.get('orders', []) for item in category_orders: category_id = item.get('id') sort_order = item.get('sort_order') Category.query.filter_by(id=category_id).update({'sort_order': sort_order}) db.session.commit() return jsonify({'success': True, 'message': '排序保存成功'}) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': f'排序失败: {str(e)}'}) # 库存管理相关路由 @product_bp.route('/inventory/') @admin_required def get_inventory(product_id): """获取商品库存详情""" try: product = Product.query.get_or_404(product_id) inventory_list = ProductInventory.query.filter_by(product_id=product_id) \ .order_by(ProductInventory.is_default.desc(), ProductInventory.id).all() inventory_data = [] for inventory in inventory_list: inventory_data.append({ 'id': inventory.id, 'sku_code': inventory.sku_code, 'spec_combination': inventory.spec_combination, 'stock': inventory.stock, 'warning_stock': inventory.warning_stock, 'price_adjustment': float(inventory.price_adjustment) if inventory.price_adjustment else 0, 'is_default': inventory.is_default, 'status': inventory.status, 'final_price': inventory.get_final_price() }) return jsonify({ 'success': True, 'inventory': inventory_data, 'product_name': product.name }) except Exception as e: return jsonify({'success': False, 'message': str(e)}) @product_bp.route('/inventory/update', methods=['POST']) @admin_required @log_operation('更新库存', 'inventory') def update_inventory(): """批量更新库存""" try: inventory_data = request.json.get('inventory_list', []) for item in inventory_data: inventory_id = item.get('id') new_stock = item.get('stock') if inventory_id and new_stock is not None: inventory = ProductInventory.query.get(inventory_id) if inventory: old_stock = inventory.stock inventory.stock = new_stock # 记录库存变更日志 from app.models.product import InventoryLog InventoryLog.create_log( product_id=inventory.product_id, sku_code=inventory.sku_code, change_type=3, # 调整 change_quantity=new_stock - old_stock, before_stock=old_stock, after_stock=new_stock, remark='管理员手动调整' ) db.session.commit() return jsonify({'success': True, 'message': '库存更新成功'}) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': f'更新失败: {str(e)}'}) @product_bp.route('/inventory/log/') @admin_required def inventory_log(product_id): """查看库存变更日志""" product = Product.query.get_or_404(product_id) page = request.args.get('page', 1, type=int) per_page = 20 from app.models.product import InventoryLog logs = InventoryLog.query.filter_by(product_id=product_id) \ .order_by(InventoryLog.created_at.desc()) \ .paginate(page=page, per_page=per_page, error_out=False) return render_template('admin/inventory_log.html', product=product, logs=logs) @product_bp.route('/generate-sku-code', methods=['POST']) @admin_required def generate_sku_code(): """生成SKU编码""" try: product_name = request.json.get('product_name', '') spec_combination = request.json.get('spec_combination', {}) # 生成SKU编码逻辑 short_name = product_name[:3].upper() if product_name else 'PRD' spec_code = ''.join([v[:2].upper() for v in spec_combination.values()]) if spec_combination else 'DEFAULT' timestamp = str(int(time.time()))[-4:] sku_code = f"{short_name}-{spec_code}-{timestamp}" return jsonify({'success': True, 'sku_code': sku_code}) except Exception as e: return jsonify({'success': False, 'message': str(e)}) @product_bp.route('/check-sku-code', methods=['POST']) @admin_required def check_sku_code(): """检查SKU编码是否重复""" try: sku_code = request.json.get('sku_code', '') product_id = request.json.get('product_id', None) query = ProductInventory.query.filter_by(sku_code=sku_code) if product_id: query = query.filter(ProductInventory.product_id != product_id) exists = query.first() is not None return jsonify({ 'success': True, 'exists': exists, 'message': 'SKU编码已存在' if exists else 'SKU编码可用' }) except Exception as e: return jsonify({'success': False, 'message': str(e)})