This commit is contained in:
felix
2025-11-09 19:14:34 +08:00
parent d85d1c089d
commit fd0118e5c1
7 changed files with 197 additions and 141 deletions

View File

@@ -54,108 +54,108 @@ async def get_standard_audio_file_id(
return response_base.success(data={'audio_id': str(file_id)})
@router.get("/{text_id}", summary="获取图片文本详情", dependencies=[DependsJwtAuth])
async def get_image_text(
text_id: int
) -> ResponseSchemaModel[ImageTextWithRecordingsSchema]:
"""
获取图片文本详情,包括关联的录音记录
参数:
- text_id: 图片文本ID
返回:
- 图片文本记录及其关联的录音记录
"""
text = await image_text_service.get_text_by_id(text_id)
if not text:
raise errors.NotFoundError(msg="图片文本不存在")
# 获取关联的录音记录
recordings = await recording_service.get_recordings_by_text_id(text_id)
# 构造返回数据
result = ImageTextWithRecordingsSchema(
id=text.id,
image_id=text.image_id,
content=text.content,
position=text.position,
dict_level=text.dict_level,
standard_audio_id=text.standard_audio_id,
info=text.info,
created_time=text.created_time,
recordings=recordings
)
return response_base.success(data=result)
# @router.get("/{text_id}", summary="获取图片文本详情", dependencies=[DependsJwtAuth])
# async def get_image_text(
# text_id: int
# ) -> ResponseSchemaModel[ImageTextWithRecordingsSchema]:
# """
# 获取图片文本详情,包括关联的录音记录
#
# 参数:
# - text_id: 图片文本ID
#
# 返回:
# - 图片文本记录及其关联的录音记录
# """
# text = await image_text_service.get_text_by_id(text_id)
# if not text:
# raise errors.NotFoundError(msg="图片文本不存在")
#
# # 获取关联的录音记录
# recordings = await recording_service.get_recordings_by_text_id(text_id)
#
# # 构造返回数据
# result = ImageTextWithRecordingsSchema(
# id=text.id,
# image_id=text.image_id or 0,
# content=text.content,
# position=text.position,
# dict_level=text.dict_level,
# standard_audio_id=text.standard_audio_id,
# info=text.info,
# created_time=text.created_time.isoformat() if text.created_time else None,
# recordings=recordings
# )
#
# return response_base.success(data=result)
@router.put("/{text_id}", summary="更新图片文本", dependencies=[DependsJwtAuth])
async def update_image_text(
text_id: int,
request: Request,
background_tasks: BackgroundTasks,
params: UpdateImageTextParam
) -> ResponseSchemaModel[ImageTextSchema]:
"""
更新图片文本记录
参数:
- text_id: 图片文本ID
请求体参数:
- image_id: 图片ID
- content: 文本内容
- position: 文本在图片中的位置信息(可选)
- dict_level: 词典等级(可选)
- standard_audio_id: 标准朗读音频文件ID可选
- info: 附加信息(可选)
返回:
- 更新后的图片文本记录
"""
success = await image_text_service.update_text(text_id, params)
if not success:
return response_base.fail(code=404, msg="图片文本不存在")
text = await image_text_service.get_text_by_id(text_id)
return response_base.success(data=text)
@router.delete("/{text_id}", summary="删除图片文本", dependencies=[DependsJwtAuth])
async def delete_image_text(
text_id: int,
request: Request,
background_tasks: BackgroundTasks
) -> ResponseSchemaModel[None]:
"""
删除图片文本记录
参数:
- text_id: 图片文本ID
返回:
- 无
"""
success = await image_text_service.delete_text(text_id)
if not success:
return response_base.fail(code=404, msg="图片文本不存在")
return response_base.success()
@router.get("", summary="获取图片的所有文本", dependencies=[DependsJwtAuth])
async def get_image_texts(
image_id: int = Query(..., description="图片ID")
) -> ResponseSchemaModel[List[ImageTextSchema]]:
"""
获取指定图片的所有文本记录
参数:
- image_id: 图片ID
返回:
- 图片的所有文本记录列表
"""
texts = await image_text_service.get_texts_by_image_id(image_id)
return response_base.success(data=texts)
# @router.put("/{text_id}", summary="更新图片文本", dependencies=[DependsJwtAuth])
# async def update_image_text(
# text_id: int,
# request: Request,
# background_tasks: BackgroundTasks,
# params: UpdateImageTextParam
# ) -> ResponseSchemaModel[ImageTextSchema]:
# """
# 更新图片文本记录
#
# 参数:
# - text_id: 图片文本ID
#
# 请求体参数:
# - image_id: 图片ID
# - content: 文本内容
# - position: 文本在图片中的位置信息(可选)
# - dict_level: 词典等级(可选)
# - standard_audio_id: 标准朗读音频文件ID可选
# - info: 附加信息(可选)
#
# 返回:
# - 更新后的图片文本记录
# """
# success = await image_text_service.update_text(text_id, params)
# if not success:
# return response_base.fail(res=CustomResponseCode.HTTP_404, data={"msg": "图片文本不存在"})
#
# text = await image_text_service.get_text_by_id(text_id)
# return response_base.success(data=text)
#
#
# @router.delete("/{text_id}", summary="删除图片文本", dependencies=[DependsJwtAuth])
# async def delete_image_text(
# text_id: int,
# request: Request,
# background_tasks: BackgroundTasks
# ) -> ResponseSchemaModel[None]:
# """
# 删除图片文本记录
#
# 参数:
# - text_id: 图片文本ID
#
# 返回:
# - 无
# """
# success = await image_text_service.delete_text(text_id)
# if not success:
# return response_base.fail(code=404, msg="图片文本不存在")
#
# return response_base.success()
#
#
# @router.get("", summary="获取图片的所有文本", dependencies=[DependsJwtAuth])
# async def get_image_texts(
# image_id: int = Query(..., description="图片ID")
# ) -> ResponseSchemaModel[List[ImageTextSchema]]:
# """
# 获取指定图片的所有文本记录
#
# 参数:
# - image_id: 图片ID
#
# 返回:
# - 图片的所有文本记录列表
# """
# texts = await image_text_service.get_texts_by_image_id(image_id)
# return response_base.success(data=texts)

View File

@@ -40,5 +40,9 @@ class ImageTextCRUD(CRUDPlus[ImageText]):
"""更新文本记录"""
return await self.update_model(db, id, obj_in)
async def delete(self, db: AsyncSession, id: int) -> int:
"""删除文本记录"""
return await self.delete_model(db, id)
image_text_dao: ImageTextCRUD = ImageTextCRUD(ImageText)

View File

@@ -572,6 +572,20 @@ class ImageService:
Returns:
美式音标如果不存在则返回None
"""
# 判断是否是拼接单词(包含连字符)
if '-' in word:
# 按照 - 分割单词
parts = word.split('-')
phonetics = []
# 分别查询每个部分的音标
for part in parts:
part_phonetic = await ImageService._get_word_us_phone(part)
phonetics.append(part_phonetic if part_phonetic else "*")
# 用 - 拼接音标
return " - ".join(phonetics)
# 标准化单词
normalized_word = ImageService._normalize_word(word)
@@ -687,6 +701,24 @@ class ImageService:
Returns:
(是否已处理, 美式音标)
"""
# 判断是否是拼接单词(包含连字符)
if '-' in word:
# 按照 - 分割单词
parts = word.split('-')
phonetics = []
is_processed = True
# 分别查询每个部分的音标
for part in parts:
part_processed, part_phonetic = await ImageService._is_word_already_processed(part)
if not part_processed:
is_processed = False
phonetics.append(part_phonetic if part_phonetic else "*")
# 用 - 拼接音标
us_phone = " - ".join(phonetics) if is_processed else None
return is_processed, us_phone
# 获取缓存键
cache_key, counter_key = ImageService._get_word_cache_keys(word)

View File

@@ -79,6 +79,18 @@ class ImageTextService:
UpdateImageTextParam(standard_audio_id=audio_file_id)
)
@staticmethod
async def delete_text(text_id: int) -> bool:
"""
删除图片文本记录
:param text_id: 图片文本ID
:return: 是否删除成功
"""
async with async_db_session.begin() as db:
result = await image_text_dao.delete(db, text_id)
return result > 0
@staticmethod
async def init_image_texts(user_id: int, image_id: int, dict_level: str, background_tasks=None) -> ImageTextInitResponseSchema:
"""

View File

@@ -363,7 +363,7 @@ class RecordingService:
if background_tasks:
self._log_audit(background_tasks, file_id, ref_text, result, duration, status_code, user_id, image_id, image_text_id)
return details
return result
except Exception as e:
# 如果评估失败,记录错误并返回错误信息

View File

@@ -33,44 +33,52 @@ class CustomHTTPBearer(HTTPBearer):
自定义 HTTPBearer 认证类
"""
def __init__(self, auto_error: bool = False):
super().__init__(auto_error=auto_error)
# def __init__(self, auto_error: bool = False):
# super().__init__(auto_error=auto_error)
#
# async def __call__(self, request: Request) -> HTTPAuthorizationCredentials | None:
#
# try:
# credentials: HTTPAuthorizationCredentials = await super().__call__(request)
#
# if credentials:
# token = credentials.credentials
# # 验证 token 是否有效
# if self._is_valid_jwt(token):
# return credentials
#
# # Header 失败,尝试从 Cookie 获取
# cookie_token = request.cookies.get("access_token")
# if cookie_token and self._is_valid_jwt(cookie_token):
# # 构造一个 fake credentials 对象
# return HTTPAuthorizationCredentials(scheme="Bearer", credentials=cookie_token)
#
# # 都失败
# if self.auto_error:
# raise TokenError()
# except HTTPException as e:
# if e.status_code == 403 or e.status_code == 401:
# raise TokenError()
# raise e
#
#
# def _is_valid_jwt(self, token: str) -> bool:
# try:
# # 这里用你的 SECRET_KEY 和 ALGORITHM
# jwt.decode(token, settings.TOKEN_SECRET_KEY, algorithms=[settings.TOKEN_ALGORITHM])
# return True
# except JWTError:
# return False
async def __call__(self, request: Request) -> HTTPAuthorizationCredentials | None:
try:
credentials: HTTPAuthorizationCredentials = await super().__call__(request)
if credentials:
token = credentials.credentials
# 验证 token 是否有效
if self._is_valid_jwt(token):
return credentials
# Header 失败,尝试从 Cookie 获取
cookie_token = request.cookies.get("access_token")
if cookie_token and self._is_valid_jwt(cookie_token):
# 构造一个 fake credentials 对象
return HTTPAuthorizationCredentials(scheme="Bearer", credentials=cookie_token)
# 都失败
if self.auto_error:
raise TokenError()
return await super().__call__(request)
except HTTPException as e:
if e.status_code == 403 or e.status_code == 401:
raise TokenError()
raise e
def _is_valid_jwt(self, token: str) -> bool:
try:
# 这里用你的 SECRET_KEY 和 ALGORITHM
jwt.decode(token, settings.TOKEN_SECRET_KEY, algorithms=[settings.TOKEN_ALGORITHM])
return True
except JWTError:
return False
# JWT authorizes dependency injection
DependsJwtAuth = Depends(CustomHTTPBearer())
@@ -164,12 +172,12 @@ async def create_access_token(user_id: int, multi_login: bool, **kwargs) -> Acce
)
# Token 附加信息单独存储
if kwargs:
await redis_client.setex(
f'{settings.TOKEN_EXTRA_INFO_REDIS_PREFIX}:{user_id}:{session_uuid}',
settings.TOKEN_EXPIRE_SECONDS,
json.dumps(kwargs, ensure_ascii=False),
)
# if kwargs:
# await redis_client.setex(
# f'{settings.TOKEN_EXTRA_INFO_REDIS_PREFIX}:{user_id}:{session_uuid}',
# settings.TOKEN_EXPIRE_SECONDS,
# json.dumps(kwargs, ensure_ascii=False),
# )
return AccessToken(access_token=access_token, access_token_expire_time=expire, session_uuid=session_uuid)

View File

@@ -65,8 +65,8 @@ class Qwen:
"Vision-to-English-Chinese education module. Analyze and describe the image in three levels: "
"LEVEL1 (simple vocabulary and basic grammar, ~10 words),"
"LEVEL2 (detailed and complex vocabulary, 15-20 words),"
"LEVEL3 (professional, uncommon words and complex grammar, ≤25 words). "
"For each level, provide 5-8 English sentences and Chinese translations. "
"LEVEL3 (professional, uncommon words and complex grammar, ≤25 words)."
"For each level, provide 6-8 English sentences and Chinese translations."
"Output JSON: {LEVEL1: {desc_en:[], desc_zh:[]}, LEVEL2: {}, LEVEL3: {}}."
"Ensure all description are unique - no repetition."
# "Focus: primary/central/artificial objects."