Files
backend/backend/app/admin/service/file_service.py
2025-12-10 20:13:19 +08:00

920 lines
37 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import base64
import io
import imghdr
import json
import hashlib
from datetime import datetime
from typing import Optional, Dict, Any
from fastapi import UploadFile
from PIL import Image as PILImage, ExifTags
from backend.app.admin.crud.file_crud import file_dao
from backend.app.admin.model.file import File
from backend.app.admin.schema.file import AddFileParam, FileUploadResponse, UpdateFileParam, FileMetadata
from backend.app.ai.schema.image import ColorMode, ImageMetadata, ImageFormat, UpdateImageParam
from backend.app.ai.crud.image_curd import image_dao
from backend.middleware.cos_client import CosClient
from backend.common.exception import errors
from backend.core.conf import settings
from backend.database.db import async_db_session
class FileService:
@staticmethod
def is_image_file(content_type: str, file_content: bytes, file_name: str) -> bool:
"""判断是否为图片文件"""
# 首先检查文件扩展名是否在允许的图片类型列表中
if file_name:
file_ext = file_name.split('.')[-1].lower()
if file_ext in settings.UPLOAD_IMAGE_EXT_INCLUDE:
return True
# 然后检查content_type
if content_type and content_type.startswith('image/'):
return True
# 最后通过文件内容检测
image_format = imghdr.what(None, h=file_content)
return image_format is not None
@staticmethod
def validate_image_file(file_name: str) -> None:
"""验证图片文件类型是否被允许"""
if not file_name:
return
file_ext = file_name.split('.')[-1].lower()
if file_ext and file_ext not in settings.UPLOAD_IMAGE_EXT_INCLUDE:
raise errors.ForbiddenError(msg=f'[{file_ext}] 此图片格式暂不支持')
@staticmethod
async def upload_file(
file: UploadFile,
metadata: Optional[dict] = None
) -> FileUploadResponse:
"""上传文件"""
# 读取文件内容
content = await file.read()
await file.seek(0) # 重置文件指针
storage_type = settings.DEFAULT_STORAGE_TYPE
# 计算文件哈希
file_hash = hashlib.sha256(content).hexdigest()
# 检查文件是否已存在
async with async_db_session() as db:
existing_file = await file_dao.get_by_hash(db, file_hash)
if existing_file:
return FileUploadResponse(
id=str(existing_file.id),
file_hash=existing_file.file_hash,
file_name=existing_file.file_name,
content_type=existing_file.content_type,
file_size=existing_file.file_size
)
# 创建文件记录
async with async_db_session.begin() as db:
# 创建文件元数据
file_metadata_dict = {
"file_name": file.filename,
"content_type": file.content_type,
"file_size": len(content),
"created_at": datetime.now(),
"updated_at": datetime.now(),
"extra": metadata
}
file_metadata = FileMetadata(**file_metadata_dict)
# 先创建文件记录,获取 file_id
file_params = AddFileParam(
file_hash=file_hash,
file_name=file.filename or "unnamed",
content_type=file.content_type,
file_size=len(content),
storage_type=storage_type,
storage_path=None, # 先设置为None后面再更新
metadata_info=file_metadata
)
# 保存到数据库获取file_id
db_file = await file_dao.create(db, file_params)
file_id = db_file.id
# 根据存储类型处理文件存储
storage_path = None
file_data = None
if storage_type == "cos":
cos_client = CosClient()
key = cos_client.build_key(file_id)
resp = cos_client.upload_object(key, content)
data = None
if isinstance(resp, (list, tuple)) and len(resp) >= 2:
data = resp[1]
elif isinstance(resp, dict):
data = resp
else:
data = {}
original_info = data.get("OriginalInfo", {}) or {}
process_results = data.get("ProcessResults", {}) or {}
image_info = original_info.pop("ImageInfo", None)
obj = process_results.get("Object", {}) or {}
storage_path = obj.get("Key", key)
meta = FileMetadata(
file_name=file.filename,
content_type=file.content_type,
file_size=len(content),
extra={"cos_image_info": image_info} if image_info else metadata,
)
update_params = UpdateFileParam(
storage_path=storage_path,
metadata_info=meta,
details={
"key": key,
"original_info": original_info,
"process_results": process_results,
}
)
await file_dao.update(db, file_id, update_params)
db_file.storage_path = storage_path
else:
# 数据库存储,将文件数据保存到数据库
db_file.file_data = content
await db.flush() # 确保将file_data保存到数据库
return FileUploadResponse(
id=str(db_file.id),
file_hash=db_file.file_hash,
file_name=db_file.file_name,
content_type=db_file.content_type,
file_size=db_file.file_size
)
@staticmethod
async def upload_file_with_content_type(
file,
content_type: str,
metadata: Optional[dict] = None
) -> FileUploadResponse:
"""上传文件并指定content_type"""
# 读取文件内容
content = await file.read()
await file.seek(0) # 重置文件指针
storage_type = settings.DEFAULT_STORAGE_TYPE
# 计算文件哈希
file_hash = hashlib.sha256(content).hexdigest()
# 检查文件是否已存在
async with async_db_session() as db:
existing_file = await file_dao.get_by_hash(db, file_hash)
if existing_file:
return FileUploadResponse(
id=str(existing_file.id),
file_hash=existing_file.file_hash,
file_name=existing_file.file_name,
content_type=existing_file.content_type,
file_size=existing_file.file_size
)
# 创建文件记录
async with async_db_session.begin() as db:
# 创建文件元数据
file_metadata_dict = {
"file_name": getattr(file, 'filename', 'unnamed'),
"content_type": content_type,
"file_size": len(content),
"created_at": datetime.now(),
"updated_at": datetime.now(),
"extra": metadata
}
file_name = getattr(file, 'filename', 'unnamed')
file_metadata = FileMetadata(**file_metadata_dict)
file_params = AddFileParam(
file_hash=file_hash,
file_name=file_name or "unnamed",
content_type=content_type,
file_size=len(content),
storage_type=storage_type,
storage_path=None,
metadata_info=file_metadata
)
# 保存到数据库获取file_id
db_file = await file_dao.create(db, file_params)
file_id = db_file.id
# 根据存储类型处理文件存储
storage_path = None
file_data = None
if storage_type == "database":
db_file.file_data = content
await db.flush()
else:
cos_client = CosClient()
key = cos_client.build_key(file_id)
resp = cos_client.upload_object(key, content)
data = None
if isinstance(resp, (list, tuple)) and len(resp) >= 2:
data = resp[1]
elif isinstance(resp, dict):
data = resp
else:
data = {}
original_info = data.get("OriginalInfo", {}) or {}
process_results = data.get("ProcessResults", {}) or {}
image_info = original_info.pop("ImageInfo", None)
obj = process_results.get("Object", {}) or {}
storage_path = obj.get("Key") or key
update_params = UpdateFileParam(
storage_path=storage_path,
metadata_info=FileMetadata(
file_name=file_name or "unnamed",
content_type=content_type,
file_size=len(content),
extra={"cos_image_info": image_info} if image_info else metadata,
),
details={
"key": key,
"original_info": original_info,
"process_results": process_results,
}
)
await file_dao.update(db, file_id, update_params)
db_file.storage_path = storage_path
return FileUploadResponse(
id=str(db_file.id),
file_hash=db_file.file_hash,
file_name=db_file.file_name,
content_type=db_file.content_type,
file_size=db_file.file_size
)
@staticmethod
async def upload_image(
file: UploadFile,
) -> FileUploadResponse:
"""上传图片:仅允许图片类型并进行重复性检查,不提取元信息"""
content = await file.read()
await file.seek(0)
if not content:
raise errors.RequestError(msg="文件内容为空")
# 严格判断是否为图片类型
content_type = file.content_type or ""
is_image_by_type = content_type.startswith("image/")
is_image_by_content = imghdr.what(None, h=content) is not None
if not (is_image_by_type or is_image_by_content):
raise errors.ForbiddenError(msg="仅支持图片文件上传")
storage_type = settings.DEFAULT_STORAGE_TYPE
file_hash = hashlib.sha256(content).hexdigest()
# 检查文件是否重复
async with async_db_session() as db:
existing_file = await file_dao.get_by_hash(db, file_hash)
if existing_file:
return FileUploadResponse(
id=str(existing_file.id),
file_hash=existing_file.file_hash,
file_name=existing_file.file_name,
content_type=existing_file.content_type,
file_size=existing_file.file_size,
)
# 创建文件记录(不提取图片元信息)
async with async_db_session.begin() as db:
meta = FileMetadata(
file_name=file.filename,
content_type=file.content_type,
file_size=len(content),
extra=None,
)
file_params = AddFileParam(
file_hash=file_hash,
file_name=file.filename or "unnamed",
content_type=file.content_type,
file_size=len(content),
storage_type=storage_type,
storage_path=None,
metadata_info=meta,
)
db_file = await file_dao.create(db, file_params)
file_id = db_file.id
if storage_type == "database":
db_file.file_data = content
await db.flush()
else:
cos_client = CosClient()
key = cos_client.build_key(file_id)
avif_key = f"{file_id}_avif"
pic_ops = {
"is_pic_info": 1,
"rules": [
{
"fileid": avif_key,
"rule": "imageMogr2/format/avif"
}
]
}
resp = cos_client.upload_image(key, content, pic_ops)
data = None
if isinstance(resp, (list, tuple)) and len(resp) >= 2:
data = resp[1]
elif isinstance(resp, dict):
data = resp
else:
data = {}
original_info = data.get("OriginalInfo", {}) or {}
process_results = data.get("ProcessResults", {}) or {}
image_info = original_info.pop("ImageInfo", None)
obj = process_results.get("Object", {}) or {}
storage_path = obj.get("Key") or avif_key
update_params = UpdateFileParam(
storage_path=storage_path,
metadata_info=FileMetadata(
file_name=file.filename,
content_type=file.content_type,
file_size=len(content),
extra={"cos_image_info": image_info} if image_info else None,
),
details={
"key": key,
"original_info": original_info,
"process_results": process_results,
}
)
await file_dao.update(db, file_id, update_params)
db_file.storage_path = storage_path
return FileUploadResponse(
id=str(db_file.id),
file_hash=db_file.file_hash,
file_name=db_file.file_name,
content_type=db_file.content_type,
file_size=db_file.file_size,
)
@staticmethod
async def get_file(file_id: int) -> Optional[File]:
"""获取文件信息"""
async with async_db_session() as db:
return await file_dao.get(db, file_id)
@staticmethod
async def download_file(file_id: int) -> tuple[bytes, str, str]:
"""下载文件"""
async with async_db_session() as db:
db_file = await file_dao.get(db, file_id)
if not db_file:
raise errors.NotFoundError(msg="文件不存在")
content = b""
if db_file.storage_type == "database":
# 从数据库获取文件数据
content = db_file.file_data or b""
else:
cos_client = CosClient()
key = db_file.storage_path or cos_client.build_key(file_id)
content = cos_client.download_object(key)
return content, db_file.file_name, db_file.content_type or "application/octet-stream"
@staticmethod
async def download_image_file(file_id: int) -> tuple[bytes, str, str]:
"""下载文件"""
async with async_db_session() as db:
db_file = await file_dao.get(db, file_id)
if not db_file:
raise errors.NotFoundError(msg="文件不存在")
content = b""
if db_file.storage_type == "database":
# 从数据库获取文件数据
content = db_file.file_data or b""
else:
cos_client = CosClient()
key = cos_client.build_key(file_id)
content = cos_client.download_image(key)
return content, db_file.file_name, db_file.content_type or "application/octet-stream"
@staticmethod
async def delete_file(file_id: int) -> bool:
"""删除文件"""
async with async_db_session.begin() as db:
db_file = await file_dao.get(db, file_id)
if not db_file:
return False
if db_file.storage_type != "database":
cos_client = CosClient()
key = db_file.storage_path or cos_client.build_key(file_id)
try:
cos_client.client.delete_object(Bucket=cos_client.bucket, Key=key)
except Exception:
pass
# 删除数据库记录
result = await file_dao.delete(db, file_id)
return result > 0
@staticmethod
async def get_file_by_hash(db, file_hash: str) -> Optional[File]:
"""通过哈希值获取文件"""
return await file_dao.get_by_hash(db, file_hash)
@staticmethod
def detect_image_format(image_bytes: bytes) -> ImageFormat:
"""通过二进制数据检测图片格式"""
# 使用imghdr识别基础格式
format_str = imghdr.what(None, h=image_bytes)
# 映射到枚举类型
format_mapping = {
'jpeg': ImageFormat.JPEG,
'jpg': ImageFormat.JPEG,
'png': ImageFormat.PNG,
'gif': ImageFormat.GIF,
'bmp': ImageFormat.BMP,
'webp': ImageFormat.WEBP,
'tiff': ImageFormat.TIFF,
'svg': ImageFormat.SVG
}
return format_mapping.get(format_str, ImageFormat.UNKNOWN)
@staticmethod
def extract_metadata(image_bytes: bytes, additional_info: Dict[str, Any] = None) -> ImageMetadata:
"""从图片二进制数据中提取元数据"""
try:
with PILImage.open(io.BytesIO(image_bytes)) as img:
# 获取基础信息
width, height = img.size
color_mode = ColorMode(img.mode) if img.mode in ColorMode.__members__.values() else ColorMode.UNKNOWN
# 获取EXIF数据
exif_data = {}
if hasattr(img, '_getexif') and img._getexif():
for tag, value in img._getexif().items():
decoded_tag = ExifTags.TAGS.get(tag, tag)
# 特殊处理日期时间
if decoded_tag in ['DateTime', 'DateTimeOriginal', 'DateTimeDigitized']:
try:
value = datetime.strptime(value, "%Y:%m:%d %H:%M:%S").isoformat()
except:
pass
# Convert IFDRational values to float to avoid JSON serialization issues
if hasattr(value, 'numerator') and hasattr(value, 'denominator'):
try:
value = float(value)
except:
pass
# 确保值是可序列化的
if isinstance(value, bytes):
try:
value = value.decode('utf-8')
except:
value = base64.b64encode(value).decode('utf-8')
exif_data[decoded_tag] = value
# 获取颜色通道数
channels = len(img.getbands())
# 尝试获取DPI
dpi = img.info.get('dpi')
# Convert DPI IFDRational values to float tuple
if dpi:
try:
dpi = tuple(float(d) for d in dpi)
except:
pass
# 创建元数据对象确保format是字符串值而不是枚举
metadata = ImageMetadata(
format=file_service.detect_image_format(image_bytes), # 使用.value确保是字符串
width=width,
height=height,
color_mode=color_mode,
file_size=len(image_bytes),
channels=channels,
dpi=dpi,
exif=exif_data
)
# 添加额外信息
if additional_info:
for key, value in additional_info.items():
if hasattr(metadata, key):
# 确保设置的值是可序列化的
if isinstance(value, bytes):
try:
value = value.decode('utf-8')
except:
value = base64.b64encode(value).decode('utf-8')
setattr(metadata, key, value)
return metadata
except Exception as e:
# 无法解析图片时返回基础元数据
return ImageMetadata(
format=file_service.detect_image_format(image_bytes), # 确保使用字符串值
width=0,
height=0,
color_mode=ColorMode.UNKNOWN,
file_size=len(image_bytes),
error=f"Metadata extraction failed: {str(e)}"
)
@staticmethod
async def generate_thumbnail(image_id: int, file_id: int) -> None:
try:
db_file = await FileService.get_file(file_id)
if not db_file:
return
if db_file.storage_type == "cos":
cos_client = CosClient()
src_key = db_file.storage_path or cos_client.build_key(file_id)
async with async_db_session.begin() as db:
meta_init = FileMetadata(
file_name=f"thumbnail_{db_file.file_name}",
content_type=db_file.content_type,
file_size=0,
extra=None,
)
t_params = AddFileParam(
file_hash=hashlib.sha256(f"cos:thumbnail_pending:{image_id}:{file_id}".encode()).hexdigest(),
file_name=f"thumbnail_{db_file.file_name}",
content_type=db_file.content_type,
file_size=0,
storage_type="cos",
storage_path=None,
metadata_info=meta_init,
)
t_file = await file_dao.create(db, t_params)
await db.flush()
dest_key = f"{t_file.id}_thumbnail"
# 10% 缩放
pic_ops = {
"is_pic_info": 1,
"rules": [{
"fileid": dest_key,
"rule": "/thumbnail/!10p"
}]
}
resp = cos_client.process_image(src_key, pic_ops)
data = None
if isinstance(resp, (list, tuple)) and len(resp) >= 2:
data = resp[1]
elif isinstance(resp, dict):
data = resp
else:
data = {}
process_results = data.get("ProcessResults", {}) or {}
obj = process_results.get("Object", {}) or {}
final_key = obj.get("Key") or dest_key
fmt = obj.get("Format")
size_str = obj.get("Size")
try:
size_val = int(size_str) if isinstance(size_str, str) else (size_str or 0)
except:
size_val = 0
content_type = f"image/{fmt.lower()}" if isinstance(fmt, str) else (db_file.content_type or "image/avif")
meta = FileMetadata(
file_name=f"thumbnail_{db_file.file_name}",
content_type=content_type,
file_size=size_val,
extra=None,
)
async with async_db_session.begin() as db:
update_params = UpdateFileParam(
file_hash=hashlib.sha256(f"cos:{final_key}".encode()).hexdigest(),
storage_path=final_key,
metadata_info=meta,
details={
"key": src_key,
"process_results": process_results,
}
)
await file_dao.update(db, t_file.id, update_params)
await image_dao.update(db, image_id, UpdateImageParam(thumbnail_id=t_file.id))
else:
file_content, file_name, content_type = await FileService.download_file(file_id)
thumbnail_content = await FileService._create_thumbnail(file_content)
if not thumbnail_content:
thumbnail_content = file_content
meta = FileMetadata(
file_name=f"thumbnail_{file_name}",
content_type=content_type,
file_size=len(thumbnail_content),
extra=None,
)
async with async_db_session.begin() as db:
t_params = AddFileParam(
file_hash=hashlib.sha256(thumbnail_content).hexdigest(),
file_name=f"thumbnail_{file_name}",
content_type=content_type,
file_size=len(thumbnail_content),
storage_type="database",
storage_path=None,
metadata_info=meta,
)
t_file = await file_dao.create(db, t_params)
t_file.file_data = thumbnail_content
await db.flush()
await image_dao.update(db, image_id, UpdateImageParam(thumbnail_id=t_file.id))
except Exception:
pass
@staticmethod
async def _create_thumbnail(image_bytes: bytes, size: tuple = (100, 100)) -> bytes:
try:
if not image_bytes:
return None
with PILImage.open(io.BytesIO(image_bytes)) as img:
if img.mode in ("RGBA", "LA", "P"):
background = PILImage.new("RGB", img.size, (255, 255, 255))
if img.mode == "P":
img = img.convert("RGBA")
background.paste(img, mask=img.split()[-1] if img.mode in ("RGBA", "LA") else None)
img = background
width, height = img.size
if width > height:
left = (width - height) // 2
right = left + height
top = 0
bottom = height
else:
left = 0
right = width
top = (height - width) // 2
bottom = top + width
img = img.crop((left, top, right, bottom))
img = img.resize(size, PILImage.Resampling.LANCZOS)
thumbnail_buffer = io.BytesIO()
img.save(thumbnail_buffer, format=img.format or "JPEG")
thumbnail_buffer.seek(0)
return thumbnail_buffer.read()
except Exception:
return None
@staticmethod
def _mime_to_ext(mime: str | None, filename: str | None) -> str:
if filename and '.' in filename:
return filename.rsplit('.', 1)[-1].lower()
mapping = {
'image/jpeg': 'jpg',
'image/jpg': 'jpg',
'image/png': 'png',
'image/gif': 'gif',
'image/webp': 'webp',
'image/bmp': 'bmp',
'image/svg+xml': 'svg',
'audio/mpeg': 'mp3',
'audio/mp3': 'mp3',
'audio/wav': 'wav',
'video/mp4': 'mp4',
'video/quicktime': 'mov',
}
return mapping.get((mime or '').lower(), 'dat')
@staticmethod
async def init_upload(filename: str, size: int, mime: str | None, biz_type: str | None, wx_user_id: int):
storage_type = 'cos'
async with async_db_session.begin() as db:
temp_hash = hashlib.sha256(f"pending:{filename}:{size}".encode()).hexdigest()
params = AddFileParam(
file_hash=temp_hash,
file_name=filename or 'unnamed',
content_type=mime,
file_size=size,
storage_type=storage_type,
storage_path=None,
metadata_info=FileMetadata(
file_name=filename or 'unnamed',
content_type=mime,
file_size=size,
extra={"biz_type": biz_type} if biz_type else None,
),
)
db_file = await file_dao.create(db, params)
await db.flush()
cos_client = CosClient()
ext = FileService._mime_to_ext(mime, filename)
cloud_path = cos_client.build_key(db_file.id)
await file_dao.update(
db,
db_file.id,
UpdateFileParam(
storage_path=cloud_path,
details={"status": "pending", "cloud_path": cloud_path, "wx_user_id": wx_user_id},
),
)
db_file.storage_path = cloud_path
return {
"file_id": str(db_file.id),
"cloud_path": cloud_path,
}
@staticmethod
async def complete_upload(
file_id: int,
cloud_path: str,
file_id_in_cos: str,
sha256: str | None = None,
size: int | None = None,
mime: str | None = None,
wx_user_id: int = 0,
):
cos = CosClient()
cos_key = cloud_path
if not cos.object_exists(cos_key):
raise errors.NotFoundError(msg="对象未找到或尚未上传")
head = cos.head_object(cos_key) or {}
content_length = head.get('Content-Length') or head.get('ContentLength')
try:
content_length = int(content_length) if content_length is not None else (size or 0)
except:
content_length = size or 0
content_type = mime or head.get('Content-Type') or head.get('ContentType')
etag = (head.get('ETag') or '').strip('"')
file_hash_val = sha256 or etag or hashlib.sha256(f"cos:{cloud_path}:{file_id_in_cos}".encode()).hexdigest()
is_image = (mime or '').lower().startswith('image/') or (content_type or '').lower().startswith('image/')
if is_image:
avif_key = f"{file_id}_avif"
pic_ops = {
"is_pic_info": 1,
"rules": [
{
"fileid": avif_key,
"rule": "imageMogr2/format/avif",
}
],
}
resp = cos.process_image(cos_key, pic_ops)
data = resp[1] if isinstance(resp, (list, tuple)) and len(resp) >= 2 else (resp if isinstance(resp, dict) else {})
process_results = data.get("ProcessResults", {}) or {}
obj = process_results.get("Object", {}) or {}
final_key = obj.get("Key") or avif_key
size_str = obj.get("Size")
try:
final_size = int(size_str) if isinstance(size_str, str) else (size_str or content_length or 0)
except:
final_size = content_length or 0
final_content_type = "image/avif"
expired_seconds = 30 * 24 * 60 * 60
params = {
'response-content-disposition': f'attachment; filename={file_id}.avif',
'response-content-type': final_content_type,
}
url = cos.get_presigned_download_url(final_key, expired_seconds, params=params)
from datetime import datetime, timezone as dt_tz
now_ts = int(datetime.now(dt_tz.utc).timestamp())
expire_ts = now_ts + expired_seconds - 60
async with async_db_session.begin() as db:
meta = FileMetadata(
file_name=None,
content_type=final_content_type,
file_size=final_size,
extra=None,
)
await file_dao.update(
db,
file_id,
UpdateFileParam(
file_hash=file_hash_val,
storage_path=final_key,
metadata_info=meta,
details={
"status": "stored",
"cloud_path": cloud_path,
"fileID": file_id_in_cos,
"download_url": url,
"download_url_expire_ts": expire_ts,
"wx_user_id": wx_user_id,
},
),
)
return {
"id": str(file_id),
"url": url,
"status": "stored",
}
else:
expired_seconds = 30 * 24 * 60 * 60
ext = FileService._mime_to_ext(content_type, None)
filename = f"{file_id}.{ext}"
params = {
'response-content-disposition': f'attachment; filename={filename}',
'response-content-type': content_type or 'application/octet-stream',
}
url = cos.get_presigned_download_url(cos_key, expired_seconds, params=params)
from datetime import datetime, timezone as dt_tz
now_ts = int(datetime.now(dt_tz.utc).timestamp())
expire_ts = now_ts + expired_seconds - 60
async with async_db_session.begin() as db:
meta = FileMetadata(
file_name=None,
content_type=content_type,
file_size=content_length,
extra=None,
)
await file_dao.update(
db,
file_id,
UpdateFileParam(
file_hash=file_hash_val,
storage_path=cloud_path,
metadata_info=meta,
details={
"status": "stored",
"cloud_path": cloud_path,
"fileID": file_id_in_cos,
"download_url": url,
"download_url_expire_ts": expire_ts,
"wx_user_id": wx_user_id,
},
),
)
return {
"id": str(file_id),
"url": url,
"status": "stored",
}
@staticmethod
async def get_presigned_download_url(file_id: int, wx_user_id: int) -> str:
async with async_db_session() as db:
db_file = await file_dao.get(db, file_id)
if not db_file:
raise errors.NotFoundError(msg="文件不存在")
details = db_file.details or {}
owner_id = details.get("wx_user_id")
if owner_id is not None and str(owner_id) != str(wx_user_id):
raise errors.ForbiddenError(msg="无权限访问该文件")
cloud_path = db_file.storage_path or details.get("cloud_path")
if not cloud_path:
raise errors.ServerError(msg="文件路径缺失")
cos = CosClient()
cos_key = cloud_path
url = details.get("download_url")
expire_ts = int(details.get("download_url_expire_ts") or 0)
from datetime import datetime, timezone as dt_tz
now_ts = int(datetime.now(dt_tz.utc).timestamp())
if (not url) or (now_ts >= expire_ts):
expired_seconds = 30 * 24 * 60 * 60
ctype = db_file.content_type or 'application/octet-stream'
ext = FileService._mime_to_ext(ctype, None)
filename = f"{file_id}.{ext}"
params = {
'response-content-disposition': f'attachment; filename={filename}',
'response-content-type': ctype,
}
url = cos.get_presigned_download_url(cos_key, expired_seconds, params=params)
expire_ts = now_ts + expired_seconds - 60
async with async_db_session.begin() as wdb:
await file_dao.update(
wdb,
file_id,
UpdateFileParam(details={
**details,
"download_url": url,
"download_url_expire_ts": expire_ts,
})
)
return url
file_service = FileService()