275 lines
7.7 KiB
Python
275 lines
7.7 KiB
Python
"""
|
||
腾讯云COS客户端工具
|
||
"""
|
||
import sys
|
||
import os
|
||
import uuid
|
||
import logging
|
||
from datetime import datetime
|
||
from qcloud_cos import CosConfig, CosS3Client
|
||
from qcloud_cos.cos_exception import CosClientError, CosServiceError
|
||
from config.cos_config import COSConfig
|
||
|
||
# 配置日志
|
||
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class COSClient:
|
||
"""腾讯云COS客户端"""
|
||
|
||
def __init__(self):
|
||
"""初始化COS客户端"""
|
||
try:
|
||
# 配置COS
|
||
config = CosConfig(
|
||
Region=COSConfig.REGION,
|
||
SecretId=COSConfig.SECRET_ID,
|
||
SecretKey=COSConfig.SECRET_KEY,
|
||
Token=None, # 临时密钥需要传入Token,永久密钥不需要
|
||
Scheme='https' # 指定使用 http/https 协议来访问COS,默认为https
|
||
)
|
||
|
||
# 创建客户端
|
||
self.client = CosS3Client(config)
|
||
self.bucket = COSConfig.BUCKET_NAME
|
||
|
||
logger.info("COS客户端初始化成功")
|
||
|
||
except Exception as e:
|
||
logger.error(f"COS客户端初始化失败: {str(e)}")
|
||
raise
|
||
|
||
def generate_file_key(self, folder_type, original_filename):
|
||
"""
|
||
生成文件存储路径
|
||
|
||
Args:
|
||
folder_type: 文件夹类型 (avatar, product, review, temp)
|
||
original_filename: 原始文件名
|
||
|
||
Returns:
|
||
str: 生成的文件路径
|
||
"""
|
||
# 获取文件扩展名
|
||
file_ext = original_filename.rsplit('.', 1)[1].lower() if '.' in original_filename else ''
|
||
|
||
# 生成唯一文件名
|
||
unique_filename = f"{uuid.uuid4().hex}.{file_ext}" if file_ext else uuid.uuid4().hex
|
||
|
||
# 按日期分组
|
||
date_folder = datetime.now().strftime('%Y/%m/%d')
|
||
|
||
# 获取存储路径前缀
|
||
folder_prefix = COSConfig.UPLOAD_FOLDERS.get(folder_type, COSConfig.UPLOAD_FOLDERS['temp'])
|
||
|
||
# 组合完整路径
|
||
file_key = f"{folder_prefix}{date_folder}/{unique_filename}"
|
||
|
||
return file_key
|
||
|
||
def upload_file(self, file_obj, folder_type='temp', original_filename=None):
|
||
"""
|
||
上传文件到COS
|
||
|
||
Args:
|
||
file_obj: 文件对象或文件路径
|
||
folder_type: 文件夹类型
|
||
original_filename: 原始文件名
|
||
|
||
Returns:
|
||
dict: 上传结果 {'success': bool, 'file_key': str, 'url': str, 'error': str}
|
||
"""
|
||
try:
|
||
# 生成文件路径
|
||
if original_filename is None:
|
||
if hasattr(file_obj, 'filename'):
|
||
original_filename = file_obj.filename
|
||
else:
|
||
original_filename = 'unknown'
|
||
|
||
file_key = self.generate_file_key(folder_type, original_filename)
|
||
|
||
# 上传文件
|
||
if hasattr(file_obj, 'read'):
|
||
# 文件对象
|
||
response = self.client.put_object(
|
||
Bucket=self.bucket,
|
||
Body=file_obj,
|
||
Key=file_key,
|
||
StorageClass='STANDARD',
|
||
EnableMD5=False
|
||
)
|
||
else:
|
||
# 文件路径
|
||
response = self.client.put_object_from_local_file(
|
||
Bucket=self.bucket,
|
||
LocalFilePath=file_obj,
|
||
Key=file_key,
|
||
EnableMD5=False
|
||
)
|
||
|
||
# 生成访问URL
|
||
file_url = COSConfig.get_full_url(file_key)
|
||
|
||
logger.info(f"文件上传成功: {file_key}")
|
||
|
||
return {
|
||
'success': True,
|
||
'file_key': file_key,
|
||
'url': file_url,
|
||
'etag': response['ETag'],
|
||
'error': None
|
||
}
|
||
|
||
except CosClientError as e:
|
||
error_msg = f"COS客户端错误: {str(e)}"
|
||
logger.error(error_msg)
|
||
return {
|
||
'success': False,
|
||
'file_key': None,
|
||
'url': None,
|
||
'error': error_msg
|
||
}
|
||
|
||
except CosServiceError as e:
|
||
error_msg = f"COS服务错误: {e.get_error_code()} - {e.get_error_msg()}"
|
||
logger.error(error_msg)
|
||
return {
|
||
'success': False,
|
||
'file_key': None,
|
||
'url': None,
|
||
'error': error_msg
|
||
}
|
||
|
||
except Exception as e:
|
||
error_msg = f"上传失败: {str(e)}"
|
||
logger.error(error_msg)
|
||
return {
|
||
'success': False,
|
||
'file_key': None,
|
||
'url': None,
|
||
'error': error_msg
|
||
}
|
||
|
||
def delete_file(self, file_key):
|
||
"""
|
||
删除COS中的文件
|
||
|
||
Args:
|
||
file_key: 文件路径
|
||
|
||
Returns:
|
||
dict: 删除结果
|
||
"""
|
||
try:
|
||
response = self.client.delete_object(
|
||
Bucket=self.bucket,
|
||
Key=file_key
|
||
)
|
||
|
||
logger.info(f"文件删除成功: {file_key}")
|
||
|
||
return {
|
||
'success': True,
|
||
'error': None
|
||
}
|
||
|
||
except Exception as e:
|
||
error_msg = f"删除文件失败: {str(e)}"
|
||
logger.error(error_msg)
|
||
return {
|
||
'success': False,
|
||
'error': error_msg
|
||
}
|
||
|
||
def get_file_url(self, file_key, expires=3600):
|
||
"""
|
||
获取文件访问URL(用于私有文件)
|
||
|
||
Args:
|
||
file_key: 文件路径
|
||
expires: 过期时间(秒)
|
||
|
||
Returns:
|
||
str: 预签名URL
|
||
"""
|
||
try:
|
||
response = self.client.get_presigned_download_url(
|
||
Bucket=self.bucket,
|
||
Key=file_key,
|
||
Expired=expires
|
||
)
|
||
return response
|
||
|
||
except Exception as e:
|
||
logger.error(f"生成预签名URL失败: {str(e)}")
|
||
return None
|
||
|
||
def list_files(self, prefix='', max_keys=100):
|
||
"""
|
||
列出存储桶中的文件
|
||
|
||
Args:
|
||
prefix: 文件路径前缀
|
||
max_keys: 最大返回数量
|
||
|
||
Returns:
|
||
list: 文件列表
|
||
"""
|
||
try:
|
||
response = self.client.list_objects(
|
||
Bucket=self.bucket,
|
||
Prefix=prefix,
|
||
MaxKeys=max_keys
|
||
)
|
||
|
||
files = []
|
||
if 'Contents' in response:
|
||
for obj in response['Contents']:
|
||
files.append({
|
||
'key': obj['Key'],
|
||
'size': obj['Size'],
|
||
'last_modified': obj['LastModified'],
|
||
'url': COSConfig.get_full_url(obj['Key'])
|
||
})
|
||
|
||
return files
|
||
|
||
except Exception as e:
|
||
logger.error(f"列出文件失败: {str(e)}")
|
||
return []
|
||
|
||
def test_connection(self):
|
||
"""
|
||
测试COS连接
|
||
|
||
Returns:
|
||
dict: 测试结果
|
||
"""
|
||
try:
|
||
# 尝试列出存储桶
|
||
response = self.client.list_objects(
|
||
Bucket=self.bucket,
|
||
MaxKeys=1
|
||
)
|
||
|
||
return {
|
||
'success': True,
|
||
'message': 'COS连接测试成功',
|
||
'bucket': self.bucket,
|
||
'region': COSConfig.REGION
|
||
}
|
||
|
||
except Exception as e:
|
||
return {
|
||
'success': False,
|
||
'message': f'COS连接测试失败: {str(e)}',
|
||
'bucket': self.bucket,
|
||
'region': COSConfig.REGION
|
||
}
|
||
|
||
|
||
# 创建全局COS客户端实例
|
||
cos_client = COSClient()
|