429 lines
12 KiB
Python
429 lines
12 KiB
Python
"""
|
||
文件上传处理工具
|
||
"""
|
||
import os
|
||
import magic
|
||
from PIL import Image
|
||
from io import BytesIO
|
||
from werkzeug.utils import secure_filename
|
||
from config.cos_config import COSConfig
|
||
from .cos_client import cos_client
|
||
|
||
|
||
class FileUploadHandler:
|
||
"""文件上传处理器"""
|
||
|
||
@staticmethod
|
||
def validate_file(file_obj, file_type='image'):
|
||
"""
|
||
验证文件
|
||
|
||
Args:
|
||
file_obj: 文件对象
|
||
file_type: 文件类型 (image, file)
|
||
|
||
Returns:
|
||
dict: 验证结果
|
||
"""
|
||
if not file_obj or not file_obj.filename:
|
||
return {'valid': False, 'error': '请选择文件'}
|
||
|
||
# 检查文件扩展名
|
||
filename = secure_filename(file_obj.filename)
|
||
if '.' not in filename:
|
||
return {'valid': False, 'error': '文件格式不正确'}
|
||
|
||
file_ext = filename.rsplit('.', 1)[1].lower()
|
||
|
||
if file_type == 'image':
|
||
allowed_extensions = COSConfig.ALLOWED_IMAGE_EXTENSIONS
|
||
max_size = COSConfig.MAX_IMAGE_SIZE
|
||
else:
|
||
allowed_extensions = COSConfig.ALLOWED_FILE_EXTENSIONS
|
||
max_size = COSConfig.MAX_FILE_SIZE
|
||
|
||
if file_ext not in allowed_extensions:
|
||
return {
|
||
'valid': False,
|
||
'error': f'不支持的文件格式,支持格式: {", ".join(allowed_extensions)}'
|
||
}
|
||
|
||
# 检查文件大小
|
||
file_obj.seek(0, 2) # 移动到文件末尾
|
||
file_size = file_obj.tell()
|
||
file_obj.seek(0) # 重置文件指针
|
||
|
||
if file_size > max_size:
|
||
max_size_mb = max_size / (1024 * 1024)
|
||
return {'valid': False, 'error': f'文件大小不能超过 {max_size_mb:.1f}MB'}
|
||
|
||
# 验证文件内容类型(防止恶意文件)
|
||
try:
|
||
file_content = file_obj.read(1024) # 读取前1KB用于检测
|
||
file_obj.seek(0) # 重置文件指针
|
||
|
||
mime_type = magic.from_buffer(file_content, mime=True)
|
||
|
||
if file_type == 'image' and not mime_type.startswith('image/'):
|
||
return {'valid': False, 'error': '文件内容不是有效的图片格式'}
|
||
|
||
except Exception:
|
||
# 如果magic检测失败,继续处理(某些环境可能没有libmagic)
|
||
pass
|
||
|
||
return {'valid': True, 'filename': filename, 'size': file_size}
|
||
|
||
@staticmethod
|
||
def process_image(file_obj, max_width=1200, max_height=1200, quality=None):
|
||
"""
|
||
处理图片(压缩、调整尺寸)
|
||
|
||
Args:
|
||
file_obj: 图片文件对象
|
||
max_width: 最大宽度
|
||
max_height: 最大高度
|
||
quality: 压缩质量
|
||
|
||
Returns:
|
||
BytesIO: 处理后的图片数据
|
||
"""
|
||
try:
|
||
# 打开图片
|
||
image = Image.open(file_obj)
|
||
|
||
# 转换RGBA到RGB(处理PNG透明背景)
|
||
if image.mode in ('RGBA', 'LA', 'P'):
|
||
background = Image.new('RGB', image.size, (255, 255, 255))
|
||
if image.mode == 'P':
|
||
image = image.convert('RGBA')
|
||
background.paste(image, mask=image.split()[-1] if image.mode == 'RGBA' else None)
|
||
image = background
|
||
|
||
# 调整图片尺寸
|
||
if image.width > max_width or image.height > max_height:
|
||
image.thumbnail((max_width, max_height), Image.Resampling.LANCZOS)
|
||
|
||
# 保存处理后的图片
|
||
output = BytesIO()
|
||
image.save(
|
||
output,
|
||
format='JPEG',
|
||
quality=quality or COSConfig.IMAGE_QUALITY,
|
||
optimize=True
|
||
)
|
||
output.seek(0)
|
||
|
||
return output
|
||
|
||
except Exception as e:
|
||
raise Exception(f"图片处理失败: {str(e)}")
|
||
|
||
@staticmethod
|
||
def upload_image(file_obj, folder_type='temp', process_image=True):
|
||
"""
|
||
上传图片到COS
|
||
|
||
Args:
|
||
file_obj: 图片文件对象
|
||
folder_type: 存储文件夹类型
|
||
process_image: 是否处理图片
|
||
|
||
Returns:
|
||
dict: 上传结果
|
||
"""
|
||
# 验证文件
|
||
validation = FileUploadHandler.validate_file(file_obj, 'image')
|
||
if not validation['valid']:
|
||
return {
|
||
'success': False,
|
||
'error': validation['error'],
|
||
'url': None,
|
||
'file_key': None
|
||
}
|
||
|
||
try:
|
||
# 处理图片
|
||
if process_image:
|
||
processed_file = FileUploadHandler.process_image(file_obj)
|
||
upload_file = processed_file
|
||
else:
|
||
file_obj.seek(0)
|
||
upload_file = file_obj
|
||
|
||
# 上传到COS
|
||
result = cos_client.upload_file(
|
||
upload_file,
|
||
folder_type,
|
||
validation['filename']
|
||
)
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
return {
|
||
'success': False,
|
||
'error': f"上传失败: {str(e)}",
|
||
'url': None,
|
||
'file_key': None
|
||
}
|
||
|
||
@staticmethod
|
||
def upload_file(file_obj, folder_type='temp'):
|
||
"""
|
||
上传普通文件到COS
|
||
|
||
Args:
|
||
file_obj: 文件对象
|
||
folder_type: 存储文件夹类型
|
||
|
||
Returns:
|
||
dict: 上传结果
|
||
"""
|
||
# 验证文件
|
||
validation = FileUploadHandler.validate_file(file_obj, 'file')
|
||
if not validation['valid']:
|
||
return {
|
||
'success': False,
|
||
'error': validation['error'],
|
||
'url': None,
|
||
'file_key': None
|
||
}
|
||
|
||
try:
|
||
file_obj.seek(0)
|
||
|
||
# 上传到COS
|
||
result = cos_client.upload_file(
|
||
file_obj,
|
||
folder_type,
|
||
validation['filename']
|
||
)
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
return {
|
||
'success': False,
|
||
'error': f"上传失败: {str(e)}",
|
||
'url': None,
|
||
'file_key': None
|
||
}
|
||
|
||
|
||
# 创建全局文件上传处理器实例
|
||
file_upload_handler = FileUploadHandler()
|
||
"""
|
||
文件上传处理工具
|
||
"""
|
||
import os
|
||
import magic
|
||
from PIL import Image
|
||
from io import BytesIO
|
||
from werkzeug.utils import secure_filename
|
||
from config.cos_config import COSConfig
|
||
from .cos_client import cos_client
|
||
|
||
|
||
class FileUploadHandler:
|
||
"""文件上传处理器"""
|
||
|
||
@staticmethod
|
||
def validate_file(file_obj, file_type='image'):
|
||
"""
|
||
验证文件
|
||
|
||
Args:
|
||
file_obj: 文件对象
|
||
file_type: 文件类型 (image, file)
|
||
|
||
Returns:
|
||
dict: 验证结果
|
||
"""
|
||
if not file_obj or not file_obj.filename:
|
||
return {'valid': False, 'error': '请选择文件'}
|
||
|
||
# 检查文件扩展名
|
||
filename = secure_filename(file_obj.filename)
|
||
if '.' not in filename:
|
||
return {'valid': False, 'error': '文件格式不正确'}
|
||
|
||
file_ext = filename.rsplit('.', 1)[1].lower()
|
||
|
||
if file_type == 'image':
|
||
allowed_extensions = COSConfig.ALLOWED_IMAGE_EXTENSIONS
|
||
max_size = COSConfig.MAX_IMAGE_SIZE
|
||
else:
|
||
allowed_extensions = COSConfig.ALLOWED_FILE_EXTENSIONS
|
||
max_size = COSConfig.MAX_FILE_SIZE
|
||
|
||
if file_ext not in allowed_extensions:
|
||
return {
|
||
'valid': False,
|
||
'error': f'不支持的文件格式,支持格式: {", ".join(allowed_extensions)}'
|
||
}
|
||
|
||
# 检查文件大小
|
||
file_obj.seek(0, 2) # 移动到文件末尾
|
||
file_size = file_obj.tell()
|
||
file_obj.seek(0) # 重置文件指针
|
||
|
||
if file_size > max_size:
|
||
max_size_mb = max_size / (1024 * 1024)
|
||
return {'valid': False, 'error': f'文件大小不能超过 {max_size_mb:.1f}MB'}
|
||
|
||
# 验证文件内容类型(防止恶意文件)
|
||
try:
|
||
file_content = file_obj.read(1024) # 读取前1KB用于检测
|
||
file_obj.seek(0) # 重置文件指针
|
||
|
||
mime_type = magic.from_buffer(file_content, mime=True)
|
||
|
||
if file_type == 'image' and not mime_type.startswith('image/'):
|
||
return {'valid': False, 'error': '文件内容不是有效的图片格式'}
|
||
|
||
except Exception:
|
||
# 如果magic检测失败,继续处理(某些环境可能没有libmagic)
|
||
pass
|
||
|
||
return {'valid': True, 'filename': filename, 'size': file_size}
|
||
|
||
@staticmethod
|
||
def process_image(file_obj, max_width=1200, max_height=1200, quality=None):
|
||
"""
|
||
处理图片(压缩、调整尺寸)
|
||
|
||
Args:
|
||
file_obj: 图片文件对象
|
||
max_width: 最大宽度
|
||
max_height: 最大高度
|
||
quality: 压缩质量
|
||
|
||
Returns:
|
||
BytesIO: 处理后的图片数据
|
||
"""
|
||
try:
|
||
# 打开图片
|
||
image = Image.open(file_obj)
|
||
|
||
# 转换RGBA到RGB(处理PNG透明背景)
|
||
if image.mode in ('RGBA', 'LA', 'P'):
|
||
background = Image.new('RGB', image.size, (255, 255, 255))
|
||
if image.mode == 'P':
|
||
image = image.convert('RGBA')
|
||
background.paste(image, mask=image.split()[-1] if image.mode == 'RGBA' else None)
|
||
image = background
|
||
|
||
# 调整图片尺寸
|
||
if image.width > max_width or image.height > max_height:
|
||
image.thumbnail((max_width, max_height), Image.Resampling.LANCZOS)
|
||
|
||
# 保存处理后的图片
|
||
output = BytesIO()
|
||
image.save(
|
||
output,
|
||
format='JPEG',
|
||
quality=quality or COSConfig.IMAGE_QUALITY,
|
||
optimize=True
|
||
)
|
||
output.seek(0)
|
||
|
||
return output
|
||
|
||
except Exception as e:
|
||
raise Exception(f"图片处理失败: {str(e)}")
|
||
|
||
@staticmethod
|
||
def upload_image(file_obj, folder_type='temp', process_image=True):
|
||
"""
|
||
上传图片到COS
|
||
|
||
Args:
|
||
file_obj: 图片文件对象
|
||
folder_type: 存储文件夹类型
|
||
process_image: 是否处理图片
|
||
|
||
Returns:
|
||
dict: 上传结果
|
||
"""
|
||
# 验证文件
|
||
validation = FileUploadHandler.validate_file(file_obj, 'image')
|
||
if not validation['valid']:
|
||
return {
|
||
'success': False,
|
||
'error': validation['error'],
|
||
'url': None,
|
||
'file_key': None
|
||
}
|
||
|
||
try:
|
||
# 处理图片
|
||
if process_image:
|
||
processed_file = FileUploadHandler.process_image(file_obj)
|
||
upload_file = processed_file
|
||
else:
|
||
file_obj.seek(0)
|
||
upload_file = file_obj
|
||
|
||
# 上传到COS
|
||
result = cos_client.upload_file(
|
||
upload_file,
|
||
folder_type,
|
||
validation['filename']
|
||
)
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
return {
|
||
'success': False,
|
||
'error': f"上传失败: {str(e)}",
|
||
'url': None,
|
||
'file_key': None
|
||
}
|
||
|
||
@staticmethod
|
||
def upload_file(file_obj, folder_type='temp'):
|
||
"""
|
||
上传普通文件到COS
|
||
|
||
Args:
|
||
file_obj: 文件对象
|
||
folder_type: 存储文件夹类型
|
||
|
||
Returns:
|
||
dict: 上传结果
|
||
"""
|
||
# 验证文件
|
||
validation = FileUploadHandler.validate_file(file_obj, 'file')
|
||
if not validation['valid']:
|
||
return {
|
||
'success': False,
|
||
'error': validation['error'],
|
||
'url': None,
|
||
'file_key': None
|
||
}
|
||
|
||
try:
|
||
file_obj.seek(0)
|
||
|
||
# 上传到COS
|
||
result = cos_client.upload_file(
|
||
file_obj,
|
||
folder_type,
|
||
validation['filename']
|
||
)
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
return {
|
||
'success': False,
|
||
'error': f"上传失败: {str(e)}",
|
||
'url': None,
|
||
'file_key': None
|
||
}
|
||
|
||
|
||
# 创建全局文件上传处理器实例
|
||
file_upload_handler = FileUploadHandler()
|