taibai_shopping/test_cos_connection.py
2025-07-04 19:07:35 +08:00

219 lines
6.5 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
测试腾讯云COS连接 - 独立测试脚本
"""
import sys
import os
from datetime import datetime
# 添加项目根目录到Python路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# 直接导入COS相关模块避免导入Flask应用
from qcloud_cos import CosConfig, CosS3Client
from qcloud_cos.cos_exception import CosClientError, CosServiceError
from config.cos_config import COSConfig
class COSTestClient:
"""COS测试客户端"""
def __init__(self):
"""初始化COS客户端"""
try:
# 配置COS
config = CosConfig(
Region=COSConfig.REGION,
SecretId=COSConfig.SECRET_ID,
SecretKey=COSConfig.SECRET_KEY,
Token=None,
Scheme='https'
)
# 创建客户端
self.client = CosS3Client(config)
self.bucket = COSConfig.BUCKET_NAME
print("✅ COS客户端初始化成功")
except Exception as e:
print(f"❌ COS客户端初始化失败: {str(e)}")
raise
def test_connection(self):
"""测试COS连接"""
try:
# 尝试列出存储桶
response = self.client.list_objects(
Bucket=self.bucket,
MaxKeys=1
)
return {
'success': True,
'message': 'COS连接测试成功',
'bucket': self.bucket,
'region': COSConfig.REGION
}
except Exception as e:
return {
'success': False,
'message': f'COS连接测试失败: {str(e)}',
'bucket': self.bucket,
'region': COSConfig.REGION
}
def list_files(self, prefix='', max_keys=10):
"""列出文件"""
try:
response = self.client.list_objects(
Bucket=self.bucket,
Prefix=prefix,
MaxKeys=max_keys
)
files = []
if 'Contents' in response:
for obj in response['Contents']:
files.append({
'key': obj['Key'],
'size': obj['Size'],
'last_modified': obj['LastModified'],
'url': COSConfig.get_full_url(obj['Key'])
})
return files
except Exception as e:
print(f"❌ 列出文件失败: {str(e)}")
return []
def upload_test_file(self):
"""上传测试文件"""
test_content = f"COS上传测试文件\n创建时间: {datetime.now()}\n测试内容: Hello COS!"
test_file_key = f"test/test_upload_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
try:
# 上传文件
response = self.client.put_object(
Bucket=self.bucket,
Body=test_content.encode('utf-8'),
Key=test_file_key,
StorageClass='STANDARD'
)
file_url = COSConfig.get_full_url(test_file_key)
return {
'success': True,
'file_key': test_file_key,
'url': file_url,
'etag': response['ETag']
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
def delete_file(self, file_key):
"""删除文件"""
try:
response = self.client.delete_object(
Bucket=self.bucket,
Key=file_key
)
return {'success': True}
except Exception as e:
return {
'success': False,
'error': str(e)
}
def main():
"""主测试函数"""
print("=" * 60)
print("🚀 腾讯云COS连接测试")
print("=" * 60)
# 显示配置信息
print(f"📦 存储桶名称: {COSConfig.BUCKET_NAME}")
print(f"🌍 所属地域: {COSConfig.REGION}")
print(f"🔗 访问域名: {COSConfig.BUCKET_DOMAIN}")
print(f"🔑 SecretId: {COSConfig.SECRET_ID[:8]}***")
print("-" * 60)
try:
# 初始化测试客户端
cos_test = COSTestClient()
# 1. 测试连接
print("1⃣ 测试COS连接...")
result = cos_test.test_connection()
if result['success']:
print("✅ COS连接测试成功")
print(f" 存储桶: {result['bucket']}")
print(f" 地域: {result['region']}")
else:
print("❌ COS连接测试失败")
print(f" 错误信息: {result['message']}")
return False
print("-" * 60)
# 2. 测试文件列表
print("2⃣ 测试文件列表功能...")
files = cos_test.list_files(max_keys=5)
print(f"✅ 文件列表获取成功,共找到 {len(files)} 个文件")
if files:
print("📁 最近的文件:")
for i, file_info in enumerate(files[:3], 1):
size_mb = file_info['size'] / 1024 / 1024
print(f" {i}. {file_info['key']}")
print(f" 大小: {size_mb:.2f}MB")
print(f" 修改时间: {file_info['last_modified']}")
else:
print("📭 存储桶为空")
print("-" * 60)
# 3. 测试文件上传
print("3⃣ 测试文件上传功能...")
upload_result = cos_test.upload_test_file()
if upload_result['success']:
print("✅ 文件上传测试成功!")
print(f" 文件路径: {upload_result['file_key']}")
print(f" 访问URL: {upload_result['url']}")
print(f" ETag: {upload_result['etag']}")
# 4. 测试文件删除
print("-" * 60)
print("4⃣ 测试文件删除功能...")
delete_result = cos_test.delete_file(upload_result['file_key'])
if delete_result['success']:
print("✅ 文件删除测试成功!")
else:
print(f"❌ 文件删除测试失败: {delete_result['error']}")
else:
print(f"❌ 文件上传测试失败: {upload_result['error']}")
print("=" * 60)
print("🎉 COS功能测试完成")
print("=" * 60)
return True
except Exception as e:
print(f"❌ 测试过程中发生异常: {str(e)}")
return False
if __name__ == '__main__':
main()