add variation

This commit is contained in:
Felix
2026-01-13 20:50:31 +08:00
parent d868f17c2e
commit 5ea20bed3b
20 changed files with 1151 additions and 413 deletions

View File

@@ -0,0 +1,26 @@
"""rename_qa_exercise_title_to_type
Revision ID: 0004
Revises: 0003
Create Date: 2026-01-10 10:00:00
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
# revision identifiers, used by Alembic.
revision = '0004'
down_revision = '0003'
branch_labels = None
depends_on = None
def upgrade():
with op.batch_alter_table('qa_exercise', schema=None) as batch_op:
batch_op.alter_column('title', new_column_name='type', existing_type=sa.String(length=100), type_=sa.String(length=20))
def downgrade():
with op.batch_alter_table('qa_exercise', schema=None) as batch_op:
batch_op.alter_column('type', new_column_name='title', existing_type=sa.String(length=20), type_=sa.String(length=100))

View File

@@ -455,7 +455,7 @@ class FileService:
# 映射到枚举类型
format_mapping = {
'jpeg': ImageFormat.JPEG,
'jpg': ImageFormat.JPEG,
'jpg': ImageFormat.JPG,
'png': ImageFormat.PNG,
'gif': ImageFormat.GIF,
'bmp': ImageFormat.BMP,
@@ -875,7 +875,7 @@ class FileService:
}
@staticmethod
async def get_presigned_download_url(file_id: int, wx_user_id: int) -> str:
async def get_presigned_download_url(file_id: int, wx_user_id: int, original: bool = False) -> str:
async with async_db_session() as db:
db_file = await file_dao.get(db, file_id)
if not db_file:
@@ -888,32 +888,61 @@ class FileService:
if not cloud_path:
raise errors.ServerError(msg="文件路径缺失")
cos = CosClient()
cos_key = cloud_path
url = details.get("download_url")
expire_ts = int(details.get("download_url_expire_ts") or 0)
from datetime import datetime, timezone as dt_tz
now_ts = int(datetime.now(dt_tz.utc).timestamp())
if (not url) or (now_ts >= expire_ts):
expired_seconds = 30 * 24 * 60 * 60
ctype = db_file.content_type or 'application/octet-stream'
ext = FileService._mime_to_ext(ctype, None)
filename = f"{file_id}.{ext}"
params = {
'response-content-disposition': f'attachment; filename={filename}',
'response-content-type': ctype,
}
url = cos.get_presigned_download_url(cos_key, expired_seconds, params=params)
expire_ts = now_ts + expired_seconds - 60
async with async_db_session.begin() as wdb:
await file_dao.update(
wdb,
file_id,
UpdateFileParam(details={
**details,
"download_url": url,
"download_url_expire_ts": expire_ts,
})
)
return url
if original:
cos_key = details.get("key")
url = details.get("download_origin_url")
expire_ts = int(details.get("download_origin_url_expire_ts") or 0)
from datetime import datetime, timezone as dt_tz
now_ts = int(datetime.now(dt_tz.utc).timestamp())
if (not url) or (now_ts >= expire_ts):
expired_seconds = 30 * 24 * 60 * 60
ctype = db_file.content_type or 'application/octet-stream'
ext = FileService._mime_to_ext(ctype, None)
filename = f"{file_id}.{ext}"
params = {
'response-content-disposition': f'attachment; filename={filename}',
'response-content-type': ctype,
}
url = cos.get_presigned_download_url(cos_key, expired_seconds, params=params)
expire_ts = now_ts + expired_seconds - 60
async with async_db_session.begin() as wdb:
await file_dao.update(
wdb,
file_id,
UpdateFileParam(details={
**details,
"download_origin_url": url,
"download_origin_url_expire_ts": expire_ts,
})
)
return url
else:
cos_key = cloud_path
url = details.get("download_url")
expire_ts = int(details.get("download_url_expire_ts") or 0)
from datetime import datetime, timezone as dt_tz
now_ts = int(datetime.now(dt_tz.utc).timestamp())
if (not url) or (now_ts >= expire_ts):
expired_seconds = 30 * 24 * 60 * 60
ctype = db_file.content_type or 'application/octet-stream'
ext = FileService._mime_to_ext(ctype, None)
filename = f"{file_id}.{ext}"
params = {
'response-content-disposition': f'attachment; filename={filename}',
'response-content-type': ctype,
}
url = cos.get_presigned_download_url(cos_key, expired_seconds, params=params)
expire_ts = now_ts + expired_seconds - 60
async with async_db_session.begin() as wdb:
await file_dao.update(
wdb,
file_id,
UpdateFileParam(details={
**details,
"download_url": url,
"download_url_expire_ts": expire_ts,
})
)
return url
file_service = FileService()

View File

@@ -11,7 +11,7 @@ router = APIRouter()
@router.post('/exercises/tasks', summary='创建练习任务', dependencies=[DependsJwtAuth])
async def create_exercise_task(request: Request, obj: CreateQaExerciseRequest) -> ResponseSchemaModel[CreateQaExerciseTaskResponse]:
res = await qa_service.create_exercise_task(image_id=obj.image_id, user_id=request.user.id, title=obj.title, description=obj.description)
res = await qa_service.create_exercise_task(image_id=obj.image_id, user_id=request.user.id, type=obj.type)
return response_base.success(data=CreateQaExerciseTaskResponse(**res))
@@ -22,8 +22,8 @@ async def get_exercise_task_status(task_id: int) -> ResponseSchemaModel[TaskStat
@router.get('/{image_id}/exercises', summary='根据图片获取练习', dependencies=[DependsJwtAuth])
async def list_exercises(request: Request, image_id: int) -> ResponseSchemaModel[QaExerciseWithQuestionsSchema | None]:
item = await qa_service.list_exercises_by_image(image_id, user_id=request.user.id)
async def list_exercises(request: Request, image_id: int, type: str = Query(None)) -> ResponseSchemaModel[QaExerciseWithQuestionsSchema | None]:
item = await qa_service.list_exercises_by_image(image_id, user_id=request.user.id, type=type)
data = None if not item else QaExerciseWithQuestionsSchema(**item)
return response_base.success(data=data)
@@ -38,7 +38,6 @@ async def submit_attempt(request: Request, question_id: int, obj: CreateAttemptR
selected_options=obj.selected_options,
input_text=obj.input_text,
cloze_options=obj.cloze_options,
file_id=obj.file_id,
session_id=obj.session_id,
is_trial=obj.is_trial,
)

View File

@@ -22,13 +22,11 @@ class QaExerciseCRUD(CRUDPlus[QaExercise]):
result = await db.execute(stmt)
return list(result.scalars().all())
async def get_latest_by_image_id(self, db: AsyncSession, image_id: int) -> Optional[QaExercise]:
stmt = (
select(self.model)
.where(self.model.image_id == image_id)
.order_by(self.model.created_time.desc(), self.model.id.desc())
.limit(1)
)
async def get_latest_by_image_id(self, db: AsyncSession, image_id: int, type: Optional[str] = None) -> Optional[QaExercise]:
stmt = select(self.model).where(self.model.image_id == image_id)
if type:
stmt = stmt.where(self.model.type == type)
stmt = stmt.order_by(self.model.created_time.desc(), self.model.id.desc()).limit(1)
result = await db.execute(stmt)
return result.scalars().first()

View File

@@ -13,7 +13,7 @@ class QaExercise(Base):
id: Mapped[snowflake_id_key] = mapped_column(BigInteger, init=False, primary_key=True)
image_id: Mapped[int] = mapped_column(BigInteger, ForeignKey('image.id'), nullable=False)
created_by: Mapped[int] = mapped_column(BigInteger, ForeignKey('wx_user.id'), nullable=False)
title: Mapped[Optional[str]] = mapped_column(String(100), default=None)
type: Mapped[Optional[str]] = mapped_column(String(20), default=None)
description: Mapped[Optional[str]] = mapped_column(Text, default=None)
status: Mapped[str] = mapped_column(String(20), default='draft')
question_count: Mapped[int] = mapped_column(Integer, default=0)

View File

@@ -10,6 +10,7 @@ from backend.app.admin.schema.wx import DictLevel
class ImageFormat(str, Enum):
JPEG = "jpeg"
JPG = "jpg"
PNG = "png"
GIF = "gif"
BMP = "bmp"

View File

@@ -7,8 +7,8 @@ from backend.common.schema import SchemaBase
class CreateQaExerciseRequest(SchemaBase):
image_id: int
title: Optional[str] = None
description: Optional[str] = None
type: Optional[str] = None
class CreateQaExerciseTaskResponse(SchemaBase):
@@ -19,7 +19,7 @@ class CreateQaExerciseTaskResponse(SchemaBase):
class QaExerciseSchema(SchemaBase):
id: str
image_id: str
title: Optional[str] = None
type: Optional[str] = None
description: Optional[str] = None
status: str
question_count: int
@@ -43,7 +43,6 @@ class CreateAttemptRequest(SchemaBase):
selected_options: Optional[List[str]] = None
input_text: Optional[str] = None
cloze_options: Optional[List[str]] = None
file_id: Optional[int] = None
session_id: Optional[int] = None
is_trial: bool = False
@@ -103,6 +102,12 @@ class AudioNode(SchemaBase):
stt_text: Optional[str] = None
evaluation: 'EvaluationSchema'
class VariationNode(SchemaBase):
file_id: Optional[str] = None
evaluation: 'EvaluationSchema'
class QuestionLatestResultResponse(SchemaBase):
session_id: Optional[str] = None
type: Optional[str] = None
@@ -110,6 +115,7 @@ class QuestionLatestResultResponse(SchemaBase):
cloze: Optional[ClozeNode] = None
free_text: Optional[FreeTextNode] = None
audio: Optional[AudioNode] = None
variation: Optional[VariationNode] = None
class IncorrectSelectionItem(SchemaBase):
content: str
error_type: Optional[str] = None
@@ -132,3 +138,4 @@ CreateAttemptTaskResponse.model_rebuild()
AttemptResultResponse.model_rebuild()
QuestionEvaluationResponse.model_rebuild()
QuestionLatestResultResponse.model_rebuild()
VariationNode.model_rebuild()

View File

@@ -443,7 +443,7 @@ class ImageService:
raise
@staticmethod
async def _process_image_recognition(task_id: int, proc_type: str) -> None:
async def _process_image_recognition(task_id: int, proc_type: str = "word") -> None:
"""后台处理图片识别任务 - compatible version for task processor"""
# This is maintained for backward compatibility with the task processor
# It creates its own database connection like the original implementation

View File

@@ -58,28 +58,40 @@ class ImageTaskService:
# Calculate and deduct points
total_tokens = 0
extra_points = 0
extra_details = {}
if isinstance(token_usage, dict):
# Check if token_usage is nested (legacy structure) or direct
if "total_tokens" in token_usage:
total_tokens = int(token_usage.get("total_tokens") or 0)
else:
total_tokens = int((token_usage.get("token_usage") or {}).get("total_tokens") or 0)
# Handle extra points from processor
extra_points = int(token_usage.get("extra_points") or 0)
extra_details = token_usage.get("extra_details") or {}
deduct_amount = LLM_CHAT_COST
token_cost = LLM_CHAT_COST
if total_tokens > 0:
units = math.ceil(max(total_tokens, 1) / 1000)
deduct_amount = units * LLM_CHAT_COST
token_cost = units * LLM_CHAT_COST
total_deduct = token_cost + extra_points
# Use ref_id as the related_id for points record
points_deducted = await points_service.deduct_points_with_db(
user_id=task.user_id,
amount=deduct_amount,
amount=total_deduct,
db=db,
related_id=task.ref_id,
details={
"task_id": task_id,
"ref_type": task.ref_type,
"token_usage": total_tokens
"token_usage": total_tokens,
"token_cost": token_cost,
"extra_points": extra_points,
**extra_details
},
action=task.ref_type
)

View File

@@ -3,6 +3,14 @@
import asyncio
import json
import math
import aiohttp
import io
import hashlib
from fastapi import UploadFile
from backend.app.admin.service.file_service import file_service
from backend.app.admin.schema.file import AddFileParam, FileMetadata, UpdateFileParam
from backend.app.admin.crud.file_crud import file_dao
from backend.middleware.cos_client import CosClient
from typing import Optional, List, Dict, Any, Tuple
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
@@ -15,16 +23,19 @@ from backend.app.ai.schema.image_task import CreateImageTaskParam
from backend.app.admin.service.points_service import points_service
from backend.app.ai.service.rate_limit_service import rate_limit_service
from backend.common.exception import errors
from backend.middleware.qwen import Qwen
from backend.middleware.tencent_hunyuan import Hunyuan
from backend.core.llm import LLMFactory, AuditLogCallbackHandler
from langchain_core.messages import SystemMessage, HumanMessage
from backend.core.conf import settings
from backend.app.ai.service.recording_service import recording_service
from backend.common.const import EXERCISE_TYPE_CHOICE, EXERCISE_TYPE_CLOZE, EXERCISE_TYPE_FREE_TEXT, LLM_CHAT_COST
from backend.common.const import EXERCISE_TYPE_CHOICE, EXERCISE_TYPE_CLOZE, EXERCISE_TYPE_FREE_TEXT, LLM_CHAT_COST, POINTS_ACTION_SPEND, IMAGE_GENERATION_COST
from backend.app.admin.schema.wx import DictLevel
from backend.app.ai.service.image_task_service import TaskProcessor, image_task_service
from backend.app.ai.model.image_task import ImageProcessingTask
from backend.app.ai.model.qa import QaQuestion
from backend.core.prompts.qa_exercise import get_qa_exercise_prompt
from backend.app.ai.tools.qa_tool import SceneVariationGenerator, Illustrator
class QaExerciseProcessor(TaskProcessor):
async def process(self, db: AsyncSession, task: ImageProcessingTask) -> Tuple[Dict[str, Any], Dict[str, Any]]:
image = await image_dao.get(db, task.image_id)
@@ -41,31 +52,7 @@ class QaExerciseProcessor(TaskProcessor):
except Exception:
description = ''
payload = {'description': description}
prompt = (
'### 任务目标\n'
'请基于给定的图片英语描述生成【3-4个细节类半开放问题】返回包含**问题、多版本回答、正确/错误选项、填词模式**的结构化JSON数据用于英语口语练习程序自动化调用。\n'
'### 图片描述\n'
+ json.dumps(payload, ensure_ascii=False) + '\n'
'### 生成要求\n'
'1. 问题规则细节类半开放特殊疑问句覆盖至少2个维度主体特征/动作行为/场景环境), 每个问题的维度不能重复,题干和选项都是英文;\n'
'2. JSON数据规则\n'
' - 根节点:`qa_list`数组3-4个问答对象\n'
' - 每个问答对象字段:\n'
' 1. `question`:问题内容;\n'
' 2. `dimension`:考察维度;\n'
' 3. `key_pronunciation_words`核心发音单词2-3个\n'
' 4. `answers`多版本回答spoken/written/friendly\n'
' 5. `correct_options`:正确选项数组(含`content`/`type`字段),每个选项都是一个陈述句;\n'
' 6. `incorrect_options`:错误选项数组(含`content`/`error_type`/`error_reason`字段),无语法类干扰;\n'
' 7. `cloze`:填词模式专项字段:\n'
' - `correct_word`:填空处原词,一个正确选项;\n'
' - `sentence`:含 correct_word 的完整句子;\n'
' - `distractor_words`近义词干扰项数组3-4个无语法类干扰\n'
'3. 输出限制仅返回JSON字符串无其他解释文字确保可被`JSON.parse`直接解析。\n'
'输入图片描述:' + json.dumps(payload, ensure_ascii=False) + '\n'
'### 输出JSON格式\n'
'{ "qa_list": [ { "question": "", "dimension": "", "key_pronunciation_words": [], "answers": { "spoken": "", "written": "", "friendly": "", "lively": "" }, "correct_options": [ { "content": "", "type": "core" } ], "incorrect_options": [ { "content": "", "error_type": "词汇混淆", "error_reason": "" } ], "cloze": { "sentence": "", "correct_word": "", "distractor_words": [] } } ] }'
)
prompt = get_qa_exercise_prompt(payload)
res = await self._call_llm_chat(prompt=prompt, image_id=image.id, user_id=task.user_id, chat_type='qa_exercise')
if not res.get('success'):
raise Exception(res.get('error') or "LLM call failed")
@@ -127,32 +114,66 @@ class QaExerciseProcessor(TaskProcessor):
return result, token_usage
async def _call_llm_chat(self, prompt: str, image_id: int, user_id: int, chat_type: str) -> Dict[str, Any]:
model_type = (settings.LLM_MODEL_TYPE or "").lower()
messages = [{"role": "system", "content": "You are a helpful assistant."}, {'role': 'user', 'content': prompt}]
if model_type == 'qwen':
try:
qres = await Qwen.chat(messages=[{'role': 'user', 'content': prompt}], image_id=image_id, user_id=user_id, api_type=chat_type)
if qres and qres.get('success'):
return {"success": True, "result": qres.get("result"), "token_usage": qres.get("token_usage") or {}}
except Exception as e:
return {"success": False, "error": str(e)}
return {"success": False, "error": "LLM call failed"}
else:
try:
res = await Hunyuan.chat(messages=messages, image_id=image_id, user_id=user_id, system_prompt=None, chat_type=chat_type)
if res and res.get('success'):
return res
except Exception as e:
return {"success": False, "error": str(e)}
return {"success": False, "error": "LLM call failed"}
messages = [
SystemMessage(content="You are a helpful assistant."),
HumanMessage(content=prompt)
]
metadata = {
"image_id": image_id,
"user_id": user_id,
"api_type": chat_type,
"model_name": settings.LLM_MODEL_TYPE
}
try:
llm = LLMFactory.create_llm(settings.LLM_MODEL_TYPE)
res = await llm.ainvoke(
messages,
config={"callbacks": [AuditLogCallbackHandler(metadata=metadata)]}
)
content = res.content
if not isinstance(content, str):
content = str(content)
token_usage = {}
if res.response_metadata:
token_usage = res.response_metadata.get("token_usage") or res.response_metadata.get("usage") or {}
return {
"success": True,
"result": content,
"token_usage": token_usage
}
except Exception as e:
return {"success": False, "error": str(e)}
class SceneVariationProcessor(TaskProcessor):
async def process(self, db: AsyncSession, task: ImageProcessingTask) -> Tuple[Dict[str, Any], Dict[str, Any]]:
count, token_usage = await qa_service.generate_scene_variations(task.ref_id, task.user_id, db=db)
# Calculate extra points for generated images
image_points = count * IMAGE_GENERATION_COST
token_usage['extra_points'] = image_points
token_usage['extra_details'] = {
'image_count': count,
'image_unit_price': IMAGE_GENERATION_COST,
'source': 'scene_variation_generation'
}
return {'count': count, 'token_usage': token_usage}, token_usage
class QaService:
async def create_exercise_task(self, image_id: int, user_id: int, title: Optional[str] = None, description: Optional[str] = None) -> Dict[str, Any]:
async def create_exercise_task(self, image_id: int, user_id: int, type: Optional[str] = "scene_basic") -> Dict[str, Any]:
async with async_db_session.begin() as db:
# Check for existing active task
latest_task = await image_task_dao.get_latest_active_task(db, user_id, image_id, 'qa_exercise')
if latest_task:
# existing_exercise = await qa_exercise_dao.get(db, latest_task.ref_id)
# if existing_exercise and existing_exercise.type != type:
# raise errors.ForbiddenError(msg='当前正在进行其他类型的任务,请等待完成后再试')
return {'task_id': str(latest_task.id), 'status': latest_task.status}
if not await points_service.check_sufficient_points(user_id, LLM_CHAT_COST):
@@ -169,9 +190,10 @@ class QaService:
exercise = await qa_exercise_dao.create(db, {
'image_id': image_id,
'created_by': user_id,
'title': title,
'description': description,
'type': type,
'description': None,
'status': 'draft',
'ext': None
})
await db.flush()
task = await image_task_dao.create_task(db, CreateImageTaskParam(
@@ -185,7 +207,12 @@ class QaService:
await db.flush()
task_id = task.id
await db.commit()
processor = QaExerciseProcessor()
if type == 'scene_variation':
processor = SceneVariationProcessor()
else:
processor = QaExerciseProcessor()
asyncio.create_task(image_task_service.process_task(task_id, user_id, processor))
return {'task_id': str(task_id), 'status': 'accepted'}
@@ -203,12 +230,12 @@ class QaService:
'error_message': task.error_message,
}
async def list_exercises_by_image(self, image_id: int, user_id: Optional[int] = None) -> Optional[Dict[str, Any]]:
async def list_exercises_by_image(self, image_id: int, user_id: Optional[int] = None, type: Optional[str] = "scene_basic") -> Optional[Dict[str, Any]]:
async with async_db_session() as db:
image = await image_dao.get(db, image_id)
if not image:
return None
i = await qa_exercise_dao.get_latest_by_image_id(db, image_id)
i = await qa_exercise_dao.get_latest_by_image_id(db, image_id, type=type)
if not i:
return None
qs = await qa_question_dao.get_by_exercise_id(db, i.id)
@@ -225,7 +252,7 @@ class QaService:
'exercise': {
'id': str(i.id),
'image_id': str(i.image_id),
'title': i.title,
'type': i.type,
'description': i.description,
'status': i.status,
'question_count': i.question_count,
@@ -346,7 +373,7 @@ class QaService:
evaluation = {'type': 'cloze', 'result': result_text, 'detail': is_correct, 'selected': {'correct': [], 'incorrect': user_incorrect}, 'missing_correct': [cw for cw in correct_candidates]}
return evaluation, is_correct, input_str
async def submit_attempt(self, question_id: int, exercise_id: int, user_id: int, mode: str, selected_options: Optional[List[str]] = None, input_text: Optional[str] = None, cloze_options: Optional[List[str]] = None, file_id: Optional[int] = None, session_id: Optional[int] = None, is_trial: bool = False) -> Dict[str, Any]:
async def submit_attempt(self, question_id: int, exercise_id: int, user_id: int, mode: str, selected_options: Optional[List[str]] = None, input_text: Optional[str] = None, cloze_options: Optional[List[str]] = None, session_id: Optional[int] = None, is_trial: bool = False) -> Dict[str, Any]:
async with async_db_session.begin() as db:
q = await qa_question_dao.get(db, question_id)
if not q or q.exercise_id != exercise_id:
@@ -468,10 +495,9 @@ class QaService:
'evaluation': None
}
}
# Synchronous evaluation for choice/cloze
# Synchronous evaluation for choice/cloze/variation
if mode == EXERCISE_TYPE_CHOICE:
evaluation, is_correct, selected_list = self._evaluate_choice(q, attempt.choice_options)
# update ext with choice details
attempt.ext = {**(attempt.ext or {}), 'type': 'choice', 'choice': {'options': selected_list, 'evaluation': evaluation}}
await db.flush()
merged_eval = dict(attempt.evaluation or {})
@@ -497,7 +523,6 @@ class QaService:
s.progress = prog
await db.flush()
await db.commit()
# return latest result structure
session_id_val = (attempt.ext or {}).get('session_id')
return {
'session_id': str(session_id_val) if session_id_val is not None else None,
@@ -520,8 +545,6 @@ class QaService:
c_opts = cloze_options
evaluation, is_correct, input_str = self._evaluate_cloze(q, c_opts)
# update ext with cloze details
attempt.ext = {**(attempt.ext or {}), 'type': 'cloze', 'cloze': {'input': input_str, 'evaluation': evaluation}}
await db.flush()
merged_eval = dict(attempt.evaluation or {})
@@ -547,7 +570,6 @@ class QaService:
s.progress = prog
await db.flush()
await db.commit()
# return latest result structure
session_id_val = (attempt.ext or {}).get('session_id')
return {
'session_id': str(session_id_val) if session_id_val is not None else None,
@@ -558,6 +580,58 @@ class QaService:
}
}
if mode == 'variation':
ext_q = q.ext or {}
correct_file_id = ext_q.get('file_id')
# Get user selected file_id from selected_options
user_file_id = None
if selected_options and len(selected_options) > 0:
try:
user_file_id = selected_options[0]
except (ValueError, TypeError):
user_file_id = None
is_correct = 'incorrect'
if user_file_id is not None and correct_file_id is not None and int(user_file_id) == int(correct_file_id):
is_correct = 'correct'
evaluation = {'type': 'variation', 'detail':is_correct, 'result': is_correct, 'correct_file_id': correct_file_id, 'user_file_id': user_file_id}
attempt.ext = {**(attempt.ext or {}), 'type': 'variation', 'variation': {'file_id': user_file_id, 'evaluation': evaluation}}
await db.flush()
merged_eval = dict(attempt.evaluation or {})
merged_eval['variation'] = {'file_id': user_file_id, 'evaluation': evaluation}
await qa_attempt_dao.update_status(db, attempt.id, 'completed', merged_eval)
if not is_trial:
s = await qa_session_dao.get_latest_by_user_exercise(db, user_id, exercise_id)
if s and s.exercise_id == attempt.exercise_id:
prog = dict(s.progress or {})
attempts = list(prog.get('attempts') or [])
prev = None
for a in attempts:
if a.get('attempt_id') == attempt.id:
prev = a.get('is_correct')
a['is_correct'] = is_correct
break
prev_correct = 1 if prev == 'correct' else 0
new_correct = 1 if is_correct == 'correct' else 0
correct_inc = new_correct - prev_correct
prog['attempts'] = attempts
prog['correct'] = int(prog.get('correct') or 0) + correct_inc
s.progress = prog
await db.flush()
await db.commit()
session_id_val = (attempt.ext or {}).get('session_id')
return {
'session_id': str(session_id_val) if session_id_val is not None else None,
'type': 'variation',
'variation': {
'file_id': user_file_id,
'evaluation': evaluation
}
}
async def _process_attempt_evaluation(self, task_id: int, user_id: int):
async with background_db_session() as db:
task = await image_task_dao.get(db, task_id)
@@ -675,7 +749,204 @@ class QaService:
'text': ft.get('text') or '',
'evaluation': ft.get('evaluation') or None,
}
if 'variation' in evalution:
va = evalution.get('variation') or {}
ret['variation'] = {
'file_id': va.get('file_id'),
'evaluation': va.get('evaluation') or None,
}
return ret
async def persist_image_from_url(self, image_url: str, user_id: int, filename: str = "generated_variation.png") -> int:
"""Download image from URL and persist to system file storage"""
async with aiohttp.ClientSession() as session:
async with session.get(image_url) as response:
if response.status != 200:
raise Exception(f"Failed to download image: {response.status}")
content = await response.read()
file_hash = hashlib.sha256(content).hexdigest()
content_type = "image/png" # Default to png as per filename default
# 1. Create DB record first (Pending state)
async with async_db_session.begin() as db:
meta_init = FileMetadata(
file_name=filename,
content_type=content_type,
file_size=0,
extra=None,
)
t_params = AddFileParam(
file_hash=file_hash,
file_name=filename,
content_type=content_type,
file_size=0,
storage_type="cos",
storage_path=None,
metadata_info=meta_init,
)
t_file = await file_dao.create(db, t_params)
await db.flush()
# Capture ID for use outside transaction
file_id = t_file.id
# 2. Upload to COS
# Note: We download the image because COS standard PutObject requires a body (bytes/stream).
# Direct fetch from URL (AsyncFetch) is asynchronous and not suitable for this synchronous flow.
cos_client = CosClient()
key = f"{file_id}_{filename}"
cos_client.upload_object(key, content)
# 3. Update DB record (Completed state)
async with async_db_session.begin() as db:
meta = FileMetadata(
file_name=filename,
content_type=content_type,
file_size=len(content),
extra=None,
)
update_params = UpdateFileParam(
file_hash=file_hash,
storage_path=key,
metadata_info=meta,
details={
"key": key,
"source": "ai_generation",
"user_id": user_id
}
)
await file_dao.update(db, file_id, update_params)
return int(file_id)
async def generate_scene_variations(self, exercise_id: int, user_id: int, db: AsyncSession = None) -> Tuple[int, Dict[str, Any]]:
"""
Execute the advanced workflow:
1. Generate variations text
2. Generate images
3. Persist images
4. Update exercise
"""
# If db is provided, use it (assumed to be in a transaction).
# Otherwise create a new transaction.
# However, to avoid code duplication, we'll implement a context manager helper or just branching logic.
# Helper to get DB session
from contextlib import asynccontextmanager
@asynccontextmanager
async def get_db():
if db:
yield db
else:
async with async_db_session.begin() as new_db:
yield new_db
async with get_db() as session:
exercise = await qa_exercise_dao.get(session, exercise_id)
if not exercise:
raise errors.NotFoundError(msg='Exercise not found')
image = await image_dao.get(session, exercise.image_id)
if not image:
raise errors.NotFoundError(msg='Image not found')
# Prepare payload from image details
rr = (image.details or {}).get('recognition_result') or {}
payload = {
'description': rr.get('description'),
'core_vocab': rr.get('core_vocab'),
'collocations': rr.get('collocations'),
'scene_tag': rr.get('scene_tag')
}
# Run AI tasks outside transaction (to avoid long holding of DB connection if db was created here)
# Note: If db was passed in from ImageTaskService, this is technically inside the outer transaction scope,
# but since we are not executing SQL here, it's just holding the session object.
gen_res = await SceneVariationGenerator.generate(payload, image.id, user_id)
# print(gen_res)
if not gen_res.get('success'):
raise Exception(f"Variation generation failed: {gen_res.get('error')}")
variations = gen_res.get('result', {}).get('new_descriptions', [])
token_usage = gen_res.get('token_usage', {})
if not variations:
raise Exception("No variations generated")
# Step 2: Generate images (Parallel)
variations_with_images = await Illustrator.process_variations(image.file_id, user_id, variations)
# Step 3: Persist images and update data
for i, v in enumerate(variations_with_images):
if v.get('success') and v.get('generated_image_url'):
try:
# Construct filename: exercise_{exercise_id}_variation_{image_id}.png
img_id = v.get('image_id', i + 1)
filename = f"exercise_{exercise_id}_variation_{img_id}.png"
file_id = await self.persist_image_from_url(v['generated_image_url'], user_id, filename=filename)
v['file_id'] = file_id
except Exception as e:
v['persist_error'] = str(e)
# Step 4: Update exercise
async with get_db() as session:
exercise = await qa_exercise_dao.get(session, exercise_id)
if not exercise:
# Should not happen given previous check, but good for safety
raise errors.NotFoundError(msg='Exercise not found')
# Create questions from variations
created = 0
for v in variations_with_images:
if v.get('success') and v.get('file_id'):
await qa_question_dao.create(session, {
'exercise_id': exercise.id,
'image_id': exercise.image_id,
'question': v.get('desc_en') or '',
'user_id': user_id,
'ext': {
'file_id': str(v.get('file_id')),
'desc_zh': v.get('desc_zh'),
'modification_type': v.get('modification_type'),
'modification_point': v.get('modification_point'),
'core_vocab': v.get('core_vocab'),
'collocation': v.get('collocation'),
'learning_note': v.get('learning_note'),
},
})
created += 1
ext = dict(exercise.ext or {})
ext['new_descriptions'] = variations_with_images
exercise.ext = ext
from sqlalchemy.orm.attributes import flag_modified
flag_modified(exercise, "ext")
exercise.question_count = created
exercise.status = 'published' if created > 0 else 'draft'
await session.flush()
if created > 0:
existing_session = await qa_session_dao.get_latest_by_user_exercise(session, user_id, exercise.id)
if not existing_session:
prog = {'current_index': 0, 'answered': 0, 'correct': 0, 'attempts': [], 'total_questions': created}
await qa_session_dao.create(session, {
'exercise_id': exercise.id,
'starter_user_id': user_id,
'share_id': None,
'status': 'ongoing',
'started_at': datetime.now(),
'completed_at': None,
'progress': prog,
'score': None,
'ext': None,
})
await session.flush()
return len(variations_with_images), token_usage
qa_service = QaService()

View File

@@ -13,7 +13,9 @@ from backend.app.ai.service.image_chat_service import image_chat_service
from backend.app.ai.crud.image_curd import image_dao
from backend.database.db import async_db_session, background_db_session
from backend.core.conf import settings
from backend.middleware.qwen import Qwen
from backend.core.llm import LLMFactory, AuditLogCallbackHandler
from langchain_core.messages import SystemMessage, HumanMessage
from backend.core.prompts.sentence_analysis import get_sentence_analysis_prompt
from backend.middleware.tencent_hunyuan import Hunyuan
from backend.app.admin.schema.wx import DictLevel
from backend.app.ai.service.scene_sentence_service import scene_sentence_service
@@ -72,118 +74,7 @@ class SceneSentenceProcessor(TaskProcessor):
class SentenceService:
@staticmethod
def _compose_prompt(payload: dict, mode: str) -> str:
base = (
"你是英语教育场景的专业助手,需基于给定的图片场景信息和基础内容,扩展生成适配英语进阶学习者的「句型卡片、模拟场景对话、句型套用练习」结构化内容,所有内容需贴合场景、功能导向,无语义重复,且符合日常沟通逻辑。\n"
"输入信息如下JSON\n"
f"{json.dumps(payload, ensure_ascii=False)}\n"
"输出要求:\n"
"1. 内容约束:基于基础句型扩展功能标签、场景说明,每句补充「发音提示(重音/连读)」\n"
"2. 格式约束严格按照下方JSON结构输出无额外解释确保字段完整、值为数组/字符串类型。\n"
"3. 语言约束所有英文内容符合日常沟通表达无语法错误中文翻译精准场景说明简洁易懂≤50字\n"
)
if mode == SENTENCE_TYPE_SCENE_SENTENCE:
base = (
"你是英语教育场景的专业助手,需基于给定的图片场景信息和基础内容,扩展生成适配英语进阶学习者的[场景句型]结构化内容,所有内容需贴合场景、功能导向,无语义重复,简洁清晰,准确务实,且符合外国人日常口语沟通习惯。\n"
"输入信息如下JSON\n"
f"{json.dumps(payload, ensure_ascii=False)}\n"
"输出要求:\n"
"0. description是图片的详细描述围绕描述展开后续的分析。\n"
"1. 内容约束:基于基础句型扩展功能标签、场景说明,每句补充「发音提示(重音/连读)」等输出结构中要求的内容,需符合现实生活和真实世界的习惯。\n"
"2. 语言约束所有英文内容符合日常沟通表达无语法错误中文翻译精准场景说明简洁易懂≤50字\n"
"3. 输出限制仅返回JSON字符串无其他解释文字确保可被`JSON.parse`直接解析,确保字段完整、值为数组/字符串类型,输出的 JSON 结构是:\n"
)
struct = (
"""
"sentence": { // 对象:场景句型模块(适配前端展示)
"total": 5, // 数字:句型数量(5-8)
"list": [ // 数组场景句型列表数量与total一致
{ "seq": 1, // 数字序号1-8
"sentence_en": "", // 字符串:英文句型, 使用输入信息中的 desc_en 与之顺序对应的句子
"sentence_zh": "", // 字符串:中文翻译,使用输入信息中的 desc_zh 与之顺序对应的句子
"function_tags": ["询问", "索要物品"], // 数组:功能标签(主+子)
"scene_explanation": "咖啡厅场景向店员礼貌索要菜单比“Give me the menu”更得体", // 字符串场景使用说明≤50字
"pronunciation_tip": "重音在menu /ˈmenjuː/have a look at 连读为 /hævəlʊkæt/", // 字符串:发音提示(重音/连读)
"core_vocab": ["menu", "look"], // 数组:核心词汇
"core_vocab_desc": ["n. 菜单", "v. 查看"], // 数组核心词汇在此句型中的含义与core_vocab顺序对应
"collocations": ["have a look at + 物品(查看某物)"], // 数组:核心搭配
"grammar_point": "情态动词Can表请求非正式主谓倒装结构Can + 主语 + 动词原形", // 核心语法解析
"common_mistakes": ["1. 漏介词atCan I have a look the menu", "2. look误读为/lʊk/(正确/luːk/", "3. 忘记在look后加atCan I have a look at the menu", ...], // 数组:句型中语法或单词用法可能出错的地方,包括但不限于常见发音错误,场景语气不当,单词单复数错误,主谓倒装错误、省略介词、省略主语等语法错误;
"pragmatic_alternative": ["Could I have a look at the menu?(更礼貌,正式场景)", "May I see the menu?(更正式,高阶)", ...], // 语用替代表达
"scene_transfer_tip": "迁移至餐厅场景Can I have a look at the wine list?把menu替换为wine list", // 场景迁移提示
"difficulty_tag": "intermediate", // 难度标签beginner/intermediate/advanced
"extended_example": ["Can I have a look at your phone?(向朋友借看手机,非正式场景)", ""], // 数组: 精简拓展例句
"response_pairs": [], // 数组对话回应搭配3-4个核心回应含肯定/否定/中性,带场景适配说明,设计意图:形成对话闭环,支持角色扮演/实际互动)
"fluency_hacks": "", // 字符串口语流畅度技巧≤30字聚焦填充词/弱读/语气调节,设计意图:贴近母语者表达节奏,避免生硬卡顿)
"cultural_note": "", // 字符串文化适配提示≤40字说明中外表达习惯差异设计意图避免文化误解提升沟通得体性
"practice_steps": [], // 数组分阶练习步骤3步每步1句话可操作设计意图提供明确学习路径衔接输入与输出提升口语落地能力
"avoid_scenarios": "", // 字符串避免使用场景≤35字明确禁忌场景+替代方案,设计意图:减少用错场合的尴尬,明确使用边界)
"self_check_list": [], // 数组自我检测清单3-4个可量化检查点含语法/发音/流畅度维度,设计意图:提供即时自查工具,无需他人批改验证效果)
"tone_intensity": "", // 字符串语气强度标注≤35字用“弱/中/强”+适用对象描述,设计意图:直观匹配语气与互动对象,避免语气不当)
"similar_sentence_distinction": "", // 字符串相似句型辨析≤40字聚焦使用场景+核心差异,不搞复杂语法,设计意图:理清易混点,避免张冠李戴)
"speech_rate_tip": "", // 字符串语速建议≤25字明确日常场景语速+关键部分节奏,设计意图:让表达更自然,提升沟通效率)
"personalized_tips": "" // 字符串个性化学习提示≤30字分初学者/进阶者给出重点建议,设计意图:适配不同水平需求,提升学习针对性)
} ] }
"""
)
return base + struct
if mode == SENTENCE_TYPE_SCENE_DIALOGUE:
struct = (
"""
"dialog": { // 对象:模拟场景对话模块(适配前端对话交互)
"roleOptions": ["customer", "barista"], // 数组可选角色固定值customer/barista
"defaultRole": "customer", // 字符串默认角色customer/barista二选一
"dialogRound": 2, // 数字对话轮数2-3轮
"list": [ // 数组对话轮次列表数量与dialogRound一致
{
"roundId": "dialog-001", // 字符串轮次唯一ID
"speaker": "barista", // 字符串本轮说话者customer/barista
"speakerEn": "Can I help you?", // 字符串:说话者英文内容
"speakerZh": "请问需要点什么?", // 字符串:说话者中文翻译
"responseOptions": [ // 数组用户可选回应固定3条
{
"optionId": "resp-001", // 字符串选项唯一ID
"optionEn": "I'd like to order a latte with less sugar.", // 字符串:选项英文内容
"optionZh": "我想点一杯少糖的拿铁。", // 字符串:选项中文翻译
"feedback": "✅ 完美该句型是咖啡厅点餐核心表达with精准补充饮品定制要求" // 字符串:选择后的交互反馈
}
]
}
]
}
"""
)
return base + "生成场景对话结构:" + struct
if mode == SENTENCE_TYPE_SCENE_EXERCISE:
struct = (
"""
"sentencePractice": { // 对象:句型套用练习模块(适配前端填空练习)
"total": 5, // 数字练习数量5-8道
"list": [ // 数组练习列表数量与total一致
{
"practiceId": "practice-001", // 字符串练习唯一ID
"baseSentenceEn": "I'd like to order ______", // 字符串:基础句型框架(挖空)
"baseSentenceZh": "我想点______", // 字符串:框架中文翻译
"keywordPool": [ // 数组可选关键词池3-4个
{
"wordEn": "latte", // 字符串:英文关键词
"wordZh": "拿铁", // 字符串:中文翻译
"type": "drink" // 字符串词汇类型drink/custom/food等
}
],
"wrongTips": [ // 数组常见错误提示2-3条
"错误order + bread面包→ 咖啡厅场景中order后优先接饮品面包需用“have”搭配"
],
"extendScene": { // 对象:拓展场景(迁移练习)
"sceneTag": "milk_tea_shop", // 字符串:拓展场景标签
"extendSentenceEn": "I'd like to order ______", // 字符串:拓展句型框架
"extendKeywordPool": ["milk tea", "taro balls", "sugar-free"] // 数组:拓展关键词池
}
}
]
"""
)
return base + "生成句型练习结构:" + struct
return base
return get_sentence_analysis_prompt(payload, mode)
@staticmethod
async def generate_scene_sentence(image_id: int, user_id: int, payload: dict) -> dict:
@@ -305,34 +196,38 @@ class SentenceService:
@staticmethod
async def _call_scene_llm(prompt: str, image_id: int, user_id: int, chat_type: str) -> Dict[str, Any]:
model_type = (settings.LLM_MODEL_TYPE or "").lower()
if model_type == "qwen":
try:
qres = await Qwen.chat(
messages=[{"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": prompt}],
image_id=image_id,
user_id=user_id,
api_type=chat_type
)
if qres and qres.get("success"):
return {"success": True, "result": qres.get("result"), "image_chat_id": None, "token_usage": qres.get("token_usage") or {}}
except Exception:
pass
return {"success": False, "error": "LLM call failed"}
else:
try:
res = await Hunyuan.chat(
messages=[{"role": "user", "content": prompt}],
image_id=image_id,
user_id=user_id,
system_prompt=None,
chat_type=chat_type
)
if res and res.get("success"):
return res
except Exception:
pass
return {"success": False, "error": "LLM call failed"}
messages = [
SystemMessage(content="You are a helpful assistant."),
HumanMessage(content=prompt)
]
metadata = {
"image_id": image_id,
"user_id": user_id,
"api_type": chat_type,
"model_name": settings.LLM_MODEL_TYPE
}
try:
llm = LLMFactory.create_llm(settings.LLM_MODEL_TYPE)
res = await llm.ainvoke(
messages,
config={"callbacks": [AuditLogCallbackHandler(metadata=metadata)]}
)
content = res.content
if not isinstance(content, str):
content = str(content)
token_usage = {}
if res.response_metadata:
token_usage = res.response_metadata.get("token_usage") or res.response_metadata.get("usage") or {}
return {
"success": True,
"result": content,
"image_chat_id": None,
"token_usage": token_usage
}
except Exception as e:
return {"success": False, "error": str(e)}
@staticmethod
async def generate_sentence_exercise_card(image_id: int, user_id: int, scene_tag: str, desc_en: List[str], desc_zh: List[str], core_vocab: List[str], collocations: List[str]) -> Dict[str, Any]:

View File

@@ -0,0 +1,199 @@
import asyncio
from typing import Dict, Any, List
import json
import os
from dashscope import MultiModalConversation
from backend.app.admin.service.file_service import file_service
from langchain_core.messages import SystemMessage, HumanMessage
from backend.core.llm import LLMFactory, AuditLogCallbackHandler
from backend.core.conf import settings
from backend.core.prompts.scene_variation import get_scene_variation_prompt
class SceneVariationGenerator:
"""
Component for generating scene variations text (Step 1 of the advanced workflow).
Using LangChain for LLM interaction.
"""
@staticmethod
async def generate(
payload: Dict[str, Any],
image_id: int,
user_id: int,
model_name: str = None
) -> Dict[str, Any]:
"""
Generate scene variations based on image payload.
Args:
payload: Dict containing description, core_vocab, collocations, scene_tag
image_id: ID of the source image
user_id: ID of the requesting user
model_name: Optional model override
Returns:
Dict containing success status, result (parsed JSON), and token usage
"""
prompt = get_scene_variation_prompt(payload)
messages = [
SystemMessage(content="You are a helpful assistant specialized in creating educational content variations."),
HumanMessage(content=prompt)
]
metadata = {
"image_id": image_id,
"user_id": user_id,
"api_type": "scene_variation",
"model_name": model_name or settings.LLM_MODEL_TYPE
}
try:
llm = LLMFactory.create_llm(model_name or settings.LLM_MODEL_TYPE)
res = await llm.ainvoke(
messages,
config={"callbacks": [AuditLogCallbackHandler(metadata=metadata)]}
)
content = res.content
if not isinstance(content, str):
content = str(content)
# Clean up potential markdown code blocks
if "```json" in content:
content = content.split("```json")[1].split("```")[0].strip()
elif "```" in content:
content = content.split("```")[1].split("```")[0].strip()
token_usage = {}
if res.response_metadata:
token_usage = res.response_metadata.get("token_usage") or res.response_metadata.get("usage") or {}
try:
parsed_result = json.loads(content)
except json.JSONDecodeError:
return {
"success": False,
"error": "Failed to parse LLM response as JSON",
"raw_content": content
}
return {
"success": True,
"result": parsed_result,
"token_usage": token_usage
}
except Exception as e:
return {"success": False, "error": str(e)}
class Illustrator:
"""
Component for generating edited images based on text descriptions (Step 2 of the advanced workflow).
Uses Dashscope MultiModalConversation API.
"""
@staticmethod
async def generate_image(
original_image_url: str,
edit_prompt: str,
api_key: str = None
) -> Dict[str, Any]:
"""
Call Dashscope API to edit an image based on the prompt.
Note: This is a blocking call wrapper.
"""
import dashscope
dashscope.base_http_api_url = 'https://dashscope.aliyuncs.com/api/v1'
messages = [
{
"role": "user",
"content": [
{"image": original_image_url},
{"text": edit_prompt}
]
}
]
try:
# Wrap the blocking SDK call in asyncio.to_thread
response = await asyncio.to_thread(
MultiModalConversation.call,
api_key=api_key or os.getenv("DASHSCOPE_API_KEY") or settings.QWEN_API_KEY,
model="qwen-image-edit-plus", # Assuming this is the model name for image editing
messages=messages,
stream=False,
n=1,
watermark=False,
negative_prompt="低质量, 模糊, 扭曲",
prompt_extend=True,
)
if response.status_code == 200:
image_url = response.output.choices[0].message.content[0]['image']
return {"success": True, "image_url": image_url}
else:
return {
"success": False,
"error": f"API Error {response.code}: {response.message}",
"status_code": response.status_code
}
except Exception as e:
return {"success": False, "error": str(e)}
@staticmethod
async def process_variations(
original_file_id: int,
user_id: int,
variations: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""
Process multiple variations in parallel.
Args:
original_file_id: The file ID of the original image
user_id: The user ID for permission check
variations: List of variation dicts (from SceneVariationGenerator)
Returns:
List of variations with added 'generated_image_url' field
"""
# 1. Get original image URL
try:
original_url = await file_service.get_presigned_download_url(original_file_id, user_id, True)
if not original_url:
raise Exception("Failed to get download URL for original image")
except Exception as e:
# If we can't get the original image, fail all
for v in variations:
v['error'] = f"Original image access failed: {str(e)}"
v['success'] = False
return variations
# 2. Create tasks for parallel execution
tasks = []
for variation in variations:
# Construct the edit prompt based on modification point and description
# We combine them to give the model better context
edit_prompt = f"{variation.get('modification_point', '')}. Describe the image with the following detail: {variation.get('desc_en', '')}"
tasks.append(
Illustrator.generate_image(
original_image_url=original_url,
edit_prompt=edit_prompt
)
)
# 3. Execute in parallel
results = await asyncio.gather(*tasks)
# 4. Merge results back into variations
for i, res in enumerate(results):
if res.get('success'):
variations[i]['generated_image_url'] = res.get('image_url')
variations[i]['success'] = True
else:
variations[i]['error'] = res.get('error')
variations[i]['success'] = False
return variations

View File

@@ -4,6 +4,7 @@
IMAGE_RECOGNITION_COST = 1 # 1000 / 1
SPEECH_ASSESSMENT_COST = 1
LLM_CHAT_COST = 1
IMAGE_GENERATION_COST = 20
QWEN_TOKEN_COST = 0.002
# Points action types

119
backend/core/llm.py Normal file
View File

@@ -0,0 +1,119 @@
import time
from typing import Any, Dict, List, Optional
from datetime import datetime
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.outputs import LLMResult
from langchain_core.messages import BaseMessage
from langchain_community.chat_models import ChatTongyi, ChatHunyuan
from backend.app.admin.schema.audit_log import CreateAuditLogParam
from backend.app.admin.service.audit_log_service import audit_log_service
from backend.core.conf import settings
from backend.common.log import log as logger
class AuditLogCallbackHandler(BaseCallbackHandler):
def __init__(self, metadata: Optional[Dict[str, Any]] = None):
super().__init__()
self.metadata = metadata or {}
self.start_time = 0.0
async def on_chat_model_start(
self, serialized: Dict[str, Any], messages: List[List[BaseMessage]], **kwargs: Any
) -> Any:
self.start_time = time.time()
if 'metadata' in kwargs:
self.metadata.update(kwargs['metadata'])
# Capture messages for audit log
try:
msgs = []
if messages and len(messages) > 0:
for m in messages[0]:
msgs.append({"role": m.type, "content": m.content})
self.metadata['messages'] = msgs
except Exception:
pass
async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> Any:
duration = time.time() - (self.start_time or time.time())
try:
# Extract info from the first generation
generation = response.generations[0][0]
message = generation.message
content = message.content
# Token usage
token_usage = response.llm_output.get("token_usage") or {}
if not token_usage and message.response_metadata:
token_usage = message.response_metadata.get("token_usage") or message.response_metadata.get("usage") or {}
model_name = response.llm_output.get("model_name") or self.metadata.get("model_name") or "unknown"
# Construct log
log_param = CreateAuditLogParam(
api_type=self.metadata.get("api_type", "chat"),
model_name=model_name,
request_data={"messages": self.metadata.get("messages")},
response_data={"content": content, "metadata": message.response_metadata},
token_usage=token_usage,
cost=0.0,
duration=duration,
status_code=200,
called_at=datetime.now(),
image_id=self.metadata.get("image_id", 0),
user_id=self.metadata.get("user_id", 0),
api_version=settings.FASTAPI_API_V1_PATH,
error_message=""
)
await audit_log_service.create(obj=log_param)
except Exception as e:
logger.error(f"Failed to write audit log: {e}")
async def on_llm_error(self, error: BaseException, **kwargs: Any) -> Any:
duration = time.time() - (self.start_time or time.time())
try:
log_param = CreateAuditLogParam(
api_type=self.metadata.get("api_type", "chat"),
model_name=self.metadata.get("model_name", "unknown"),
request_data={"metadata": self.metadata},
response_data={"error": str(error)},
token_usage={},
cost=0.0,
duration=duration,
status_code=500,
called_at=datetime.now(),
image_id=self.metadata.get("image_id", 0),
user_id=self.metadata.get("user_id", 0),
api_version=settings.FASTAPI_API_V1_PATH,
error_message=str(error)
)
await audit_log_service.create(obj=log_param)
except Exception as e:
logger.error(f"Failed to write audit log on error: {e}")
class LLMFactory:
@staticmethod
def create_llm(model_type: str = None, **kwargs):
model_type = (model_type or settings.LLM_MODEL_TYPE or "qwen").lower()
if model_type == 'qwen':
return ChatTongyi(
api_key=settings.QWEN_API_KEY,
model_name=settings.QWEN_TEXT_MODEL,
**kwargs
)
elif model_type == 'hunyuan':
return ChatHunyuan(
hunyuan_secret_id=settings.HUNYUAN_SECRET_ID,
hunyuan_secret_key=settings.HUNYUAN_SECRET_KEY,
**kwargs
)
else:
# Default to Qwen if unknown
logger.warning(f"Unknown model type {model_type}, defaulting to Qwen")
return ChatTongyi(
api_key=settings.QWEN_API_KEY,
model_name=settings.QWEN_TEXT_MODEL,
**kwargs
)

View File

@@ -0,0 +1,28 @@
import json
def get_qa_exercise_prompt(payload: dict) -> str:
return (
'### 任务目标\n'
'请基于给定的图片英语描述生成【3-4个细节类半开放问题】返回包含**问题、多版本回答、正确/错误选项、填词模式**的结构化JSON数据用于英语口语练习程序自动化调用。\n'
'### 图片描述\n'
+ json.dumps(payload, ensure_ascii=False) + '\n'
'### 生成要求\n'
'1. 问题规则细节类半开放特殊疑问句覆盖至少2个维度主体特征/动作行为/场景环境), 每个问题的维度不能重复,题干和选项都是英文;\n'
'2. JSON数据规则\n'
' - 根节点:`qa_list`数组3-4个问答对象\n'
' - 每个问答对象字段:\n'
' 1. `question`:问题内容;\n'
' 2. `dimension`:考察维度;\n'
' 3. `key_pronunciation_words`核心发音单词2-3个\n'
' 4. `answers`多版本回答spoken/written/friendly\n'
' 5. `correct_options`:正确选项数组(含`content`/`type`字段),每个选项都是一个陈述句;\n'
' 6. `incorrect_options`:错误选项数组(含`content`/`error_type`/`error_reason`字段),无语法类干扰;\n'
' 7. `cloze`:填词模式专项字段:\n'
' - `correct_word`:填空处原词,一个正确选项;\n'
' - `sentence`:含 correct_word 的完整句子;\n'
' - `distractor_words`近义词干扰项数组3-4个无语法类干扰\n'
'3. 输出限制仅返回JSON字符串无其他解释文字确保可被`JSON.parse`直接解析。\n'
'输入图片描述:' + json.dumps(payload, ensure_ascii=False) + '\n'
'### 输出JSON格式\n'
'{ "qa_list": [ { "question": "", "dimension": "", "key_pronunciation_words": [], "answers": { "spoken": "", "written": "", "friendly": "", "lively": "" }, "correct_options": [ { "content": "", "type": "core" } ], "incorrect_options": [ { "content": "", "error_type": "词汇混淆", "error_reason": "" } ], "cloze": { "sentence": "", "correct_word": "", "distractor_words": [] } } ] }'
)

View File

@@ -0,0 +1,130 @@
from typing import List
def get_recognition_prompt(type: str, exclude_words: List[str] | None = None) -> str:
"""获取图像识别提示词"""
if type == 'word':
prompt = (
"""
Vision-to-English-Chinese education module.
Core objective: Analyze the image based on its PRIMARY SCENE (e.g., office, restaurant, subway, kitchen) and CENTRAL OBJECTS, generate English-Chinese sentence pairs for three learning levels (matching primary/intermediate/advanced English learners), with sentences focused on PRACTICAL, REUSABLE communication (not just grammatical complexity).
// LEVEL Definition (Binding learning goals + functions + complexity)
level1 (Beginner):
- Learning goal: Recognize core vocabulary + use basic functional sentences (describe objects/scenes, simple requests)
- Vocab: High-frequency daily words (no uncommon words)
- Grammar: Present continuous, modal verbs (can/could/would), simple clauses
- Word count per sentence: ≤15 words
- Sentence type: 6 unique functional types (detailed description, polite request, ask for information, suggest action, state need, confirm fact, express feeling)
- The sentence structure of the described object: quantity + name + feature + purpose.
level2 (Intermediate):
- Learning goal: Master scene-specific collocations + practical communication sentences (daily/office interaction)
- Vocab: Scene-specific common words + fixed collocations (e.g., "print a document", "place an order")
- Grammar: Complex clauses, passive voice, subjunctive mood (as appropriate to the scene)
- Word count per sentence: ≤25 words
- Sentence type: 8-12 unique functional types (detailed scene analysis, formal/informal contrast, conditional statement, explain purpose, ask follow-up questions, express suggestion, summarize information, clarify meaning)
// Output Requirements
1. JSON Structure (add core vocab/collocation for easy parsing):
{
"scene_tag": ["xxx", "xxx"], // e.g., "office", "café", "supermarket" (Multiple tags that are consistent with the main scene of the picture)
"description": "", // Clear and accurate description of the content of the picture, including but not limited to objects, relationships, colors, etc.
"level1": {
"desc_en": ["sentence1", "sentence2", ...], // 6 to 8 distinct sentences with different modalities (without repeating the same meaning or function. Don't use Chinese. Consistent with native English speakers' daily communication habits)
"desc_zh": ["translation1", "translation2", ...], // one-to-one with desc_en, chinese translation must be natural and not stiff, consistent with native English speakers' daily communication habits.
},
"level2": {
"desc_en": [
"Requirement: 8-12 daily spoken English sentences matching the image scenario (prioritize short sentences, ≤20 words)",
"Type: Declarative sentences / polite interrogative sentences that can be used directly (avoid formal language and complex clauses)",
"Scenario Adaptation: Strictly align with the real-life scenario shown in the image (e.g., restaurant ordering, asking for directions on the subway, chatting with friends, etc.)",
"Core Principle: Natural and not stiff, consistent with native English speakers' daily communication habits (e.g., prefer \"How's it going?\" over \"How are you recently?\")"
],
"desc_zh": [
"Requirement: Colloquial Chinese translations of the corresponding English sentences",
"Principle: Avoid literal translations and formal expressions; conform to daily Chinese speaking habits (e.g., translate \"Could you pass the salt?\" as \"能递下盐吗?\" instead of \"你能把盐递给我吗?\")",
"Adaptability: Translations should fit the logical expression of Chinese scenarios (e.g., more polite for workplace communication, more casual for friend chats)"
],
"core_vocab": [
"Requirement: 5-8 core spoken words for the scenario",
"Standard: High-frequency daily use (avoid rare words and academic terms); can directly replace key words in sentences for reuse",
"Example: For the \"supermarket shopping\" scenario, prioritize words like \"discount, check out, cart\" that can be directly applied to sentences"
],
"collocations": [
"Requirement: 5-8 high-frequency spoken collocations for the scenario",
"Standard: Short and practical fixed collocations; can be used by directly replacing core words (avoid complex phrases)",
"Example: For the \"food delivery ordering\" scenario, collocations include \"order food, pick up the phone (for delivery calls), track the order\""
],
"pragmatic_notes": [
"Requirement: 2-4 scenario-specific pragmatic notes (avoid general descriptions)",
"Content: Clear usage scenarios + tone adaptation + practical skills (e.g., \"Suitable for chatting with friends; casual tone; starting with the filler word 'actually' makes it more natural\")",
"Practical Value: Include \"replacement skills\" (e.g., \"Sentence pattern 'I'm in the mood for + [food]' can be used by directly replacing the food noun\")"
]
}
}
2. Uniqueness: No repetition in SEMANTICS/FUNCTIONS (not just literal repetition) — e.g., avoid two sentences both meaning "This is a laptop" (even with different wording).
3. Focus: Prioritize ARTIFICIAL/CENTRAL objects and PRIMARY scene (ignore trivial background elements) — e.g., for a café image, focus on "coffee", "barista", "menu" (not "wall", "floor").
4. Practicality: All sentences must be directly usable in real-life communication (avoid meaningless grammatical exercises like "I am eat a apple" corrected to "I am eating an apple").
5. Accuracy: Translations must be accurate (not literal) and match the context of the image scene.
6. Output Limit: Only return the JSON string, without any explanatory text. Ensure that it can be directly parsed by `JSON.parse`.
"""
)
if exclude_words:
exclude_str = ". ".join(exclude_words)
prompt += f"Avoid using these words: {exclude_str}."
return prompt
elif type == 'food':
return (
"你是一个专业美食识别AI请严格按以下步骤分析图片\n"
"1. 识别最显著菜品名称(需具体到品种/烹饪方式):\n"
"- 示例:清蒸鲈鱼(非清蒸鱼)、罗宋汤(非蔬菜汤)\n"
"- 无法确定具体菜品时返回“无法识别出菜品”\n"
"2. 提取核心食材3-5种主料\n"
"- 排除调味料(油/盐/酱油等)\n"
"- 混合菜(如沙拉/炒饭)列出可见食材\n"
"- 无法识别时写“未知”\n"
"3. 输出格式严格JSON, 如果有多个占据显著位置的菜品,可以将多个菜品罗列出来放到 json 数组中:\n"
"[{ dish_name: 具体菜品名1 | 无法识别出菜品, method: 烹饪方式, main_ingredients: [食材1, 食材2] },\n"
"{ dish_name: 具体菜品名2 | 无法识别出菜品, method: 烹饪方式, main_ingredients: [食材1, 食材2] }]"
)
elif type == 'scene':
return (
"""
# 角色
你是专注于英语教育的轻量级场景化句型分析助手仅输出JSON格式结果无多余解释/话术。
# 输入信息
场景标签scene_tag
英文句型sentence_en
中文翻译sentence_zh
# 输出要求
1. 功能标签生成2个标签主标签+子标签),主标签仅限「询问/请求/陈述/表达需求/建议/确认/表达感受/指出位置」,子标签需贴合场景和句型核心功能(如“索要物品”“点餐”“职场沟通”);
2. 场景说明50-80字简洁说明该句型的使用场景、语用价值如礼貌性/适配对象),语言通俗,适配英语进阶学习者;
3. 输出格式严格遵循以下JSON结构无换行/多余字符:
{
"functionTags": ["主标签", "子标签"],
"sceneExplanation": "场景说明文本"
}
# 约束
- 功能标签必须贴合「场景标签」+「句型内容」,不脱离场景;
- 场景说明不堆砌术语,聚焦“怎么用/什么时候用”,而非语法分析;
- 严格控制字符数功能标签仅2个场景说明50-80字。
# 示例参考
【输入】
场景标签café
英文句型Can I have a look at the menu?
中文翻译:我能看一下菜单吗?
【输出】
{"functionTags":["询问","索要物品"],"sceneExplanation":"该句型适用于咖啡厅/餐厅场景向服务人员礼貌索要菜单比直接说“Give me the menu”更得体适配所有餐饮消费场景的基础沟通。"}
"""
)
else:
return ""

View File

@@ -0,0 +1,62 @@
import json
def get_scene_variation_prompt(payload: dict) -> str:
scene_tag = payload.get("scene_tag")
core_vocab = payload.get("core_vocab")
collocations = payload.get("collocations")
description = payload.get("description")
return f"""
Vision-to-English-Chinese Listening Description Generator (Intermediate Level).
Core Objective: Based on the ORIGINAL IMAGE'S scene tags, core vocabulary, and collocations, generate 2 sets of NEW English-Chinese sentence pairs (each set for one new image) for Intermediate English learners. The new descriptions must: 1) Serve listening practice (clear, distinguishable, key information prominent); 2) Expand learning scope via diverse modifications (synonyms/antonyms, background replacement, perspective shift, etc.); 3) Include new practical vocabulary/collocations; 4) Corresponding to a specific modification of the original image (ensure "description-image" consistency).
// Reusable Assets from Original Image (MUST use these to ensure learning continuity)
- Original Description: {description} (e.g., "A blue cup on a table with a print on it") — new descriptions must be modified based on the original one.
- Original Scene Tags: {scene_tag} (e.g., "office", "café", "supermarket") — new descriptions must stay in this scene (no scene switching).
- Original Core Vocab: {core_vocab} (e.g., "cup", "table", "print") — new descriptions can use synonyms/antonyms or extend related words (e.g., "cup""mug", "table""desk", "print""scan").
- Original Collocations: {collocations} (e.g., "print a document", "place an order") — new descriptions can adapt, extend, or reverse these collocations (e.g., "print a document""scan a report", "place an order""cancel an order").
// Intermediate Level Definition (Strictly Follow)
- Vocab: Scene-specific common words + extended synonyms/antonyms + new related vocabulary (avoid rare/academic terms).
- Grammar: Complex clauses, passive voice, conditional statements (as appropriate to the scene).
- Word Count: ≤25 words per sentence (concise but informative, suitable for listening comprehension).
- Style: Natural colloquial English (consistent with native speakers' daily/office communication) — avoid formal/written language.
// Allowed Modification Dimensions (At Least 1 Dimension per Description, No Repetition Across 2 Sets)
1. Vocabulary Transformation: Replace original core words with synonyms/antonyms (e.g., "blue""navy", "buy""purchase", "arrive""depart").
2. Background Replacement: Change the original scene's background (e.g., café → office pantry, subway → bus, kitchen → restaurant kitchen).
3. Perspective Shift: Adjust the observation perspective (e.g., front view → side view, close-up → wide shot, user's perspective → third-person perspective).
4. Posture/Action Modification: Change the posture of people/objects or add/modify actions (e.g., "sitting at the desk""standing beside the desk", "a closed laptop""an open laptop displaying a report").
5. Subject Transformation: Add/remove/replace core objects (e.g., "a cup on the table""a mug and a notebook on the table", "a pen""a marker", remove "a tissue box").
6. Collocation Adaptation: Extend or reverse original collocations (e.g., "take notes""take detailed notes", "make a call""miss a call").
// Key Requirements for Listening Practice
1. Distinguishability: The 2 sets of descriptions must have CLEAR DIFFERENCES in core information (e.g., Image 1: synonyms + posture change, Image 2: background replacement + add object, Image 3: antonyms + perspective shift) — avoid ambiguous or similar descriptions.
2. Clarity: Key modification information (new vocabulary, background, perspective, etc.) must be placed at the BEGINNING of the sentence (e.g., "In a office pantry, a navy mug sits beside an open laptop" → not "There's something beside the laptop in a different room").
3. New Learning Content: Each description must include 2 new elements (vocabulary/collocations/modifications) for learners to acquire (e.g., new word "pantry", new collocation "open laptop displaying a report").
4. Practicality: Sentences must be directly usable in real-life communication (e.g., "Actually, I prefer using a marker to take notes in meetings" instead of "A marker is used for taking notes in meetings").
5. Translation Quality: Chinese translations (desc_zh) must be colloquial, accurate (no literal translations), and match the English context (e.g., "navy mug""藏青色马克杯" instead of "海军杯", "office pantry""办公室茶水间" instead of "办公室食品储藏室").
// Output Structure (JSON, ONLY return JSON string, no extra text)
{{
"new_descriptions": [
{{
"image_id": 1,
"modification_type": "Specific dimension (e.g., 'synonyms + posture change')",
"modification_point": "Detailed modification based on original image (e.g., 'Replace 'blue cup' with 'navy mug'; change 'sitting' to 'standing beside the desk')",
"desc_en": "Intermediate-level English sentence (meets vocabulary/grammar/word count requirements)",
"desc_zh": "Colloquial Chinese translation",
"core_vocab": ["new_word1", "new_word2"], // 2-3 new words (synonyms/antonyms/extended words)
"collocation": "Practical adapted collocation (e.g., 'open laptop displaying a report')",
"learning_note": "Brief explanation of new content (e.g., 'navy: a dark blue color; suitable for describing objects in formal scenes')"
}},...
]
}}
// Output Rules
1. Only return JSON string (no explanatory text) — ensure direct parsing via JSON.parse.
2. Modification types across 2 sets must be different (cover diverse dimensions).
3. Modification points must be SPECIFIC and operable (avoid vague descriptions like "change something").
4. Sentences must be natural oral English (no rigid grammatical structures).
5. New core vocab and collocations must be closely related to the original image's content (ensure learning continuity).
"""

View File

@@ -0,0 +1,120 @@
import json
from backend.common.const import (
SENTENCE_TYPE_SCENE_SENTENCE,
SENTENCE_TYPE_SCENE_DIALOGUE,
SENTENCE_TYPE_SCENE_EXERCISE
)
def get_sentence_analysis_prompt(payload: dict, mode: str) -> str:
base = (
"你是英语教育场景的专业助手,需基于给定的图片场景信息和基础内容,扩展生成适配英语进阶学习者的「句型卡片、模拟场景对话、句型套用练习」结构化内容,所有内容需贴合场景、功能导向,无语义重复,且符合日常沟通逻辑。\n"
"输入信息如下JSON\n"
f"{json.dumps(payload, ensure_ascii=False)}\n"
"输出要求:\n"
"1. 内容约束:基于基础句型扩展功能标签、场景说明,每句补充「发音提示(重音/连读)」\n"
"2. 格式约束严格按照下方JSON结构输出无额外解释确保字段完整、值为数组/字符串类型。\n"
"3. 语言约束所有英文内容符合日常沟通表达无语法错误中文翻译精准场景说明简洁易懂≤50字\n"
)
if mode == SENTENCE_TYPE_SCENE_SENTENCE:
base = (
"你是英语教育场景的专业助手,需基于给定的图片场景信息和基础内容,扩展生成适配英语进阶学习者的[场景句型]结构化内容,所有内容需贴合场景、功能导向,无语义重复,简洁清晰,准确务实,且符合外国人日常口语沟通习惯。\n"
"输入信息如下JSON\n"
f"{json.dumps(payload, ensure_ascii=False)}\n"
"输出要求:\n"
"0. description是图片的详细描述围绕描述展开后续的分析。\n"
"1. 内容约束:基于基础句型扩展功能标签、场景说明,每句补充「发音提示(重音/连读)」等输出结构中要求的内容,需符合现实生活和真实世界的习惯。\n"
"2. 语言约束所有英文内容符合日常沟通表达无语法错误中文翻译精准场景说明简洁易懂≤50字\n"
"3. 输出限制仅返回JSON字符串无其他解释文字确保可被`JSON.parse`直接解析,确保字段完整、值为数组/字符串类型,输出的 JSON 结构是:\n"
)
struct = (
"""
"sentence": { // 对象:场景句型模块(适配前端展示)
"total": 5, // 数字:句型数量(5-8)
"list": [ // 数组场景句型列表数量与total一致
{ "seq": 1, // 数字序号1-8
"sentence_en": "", // 字符串:英文句型, 使用输入信息中的 desc_en 与之顺序对应的句子
"sentence_zh": "", // 字符串:中文翻译,使用输入信息中的 desc_zh 与之顺序对应的句子
"function_tags": ["询问", "索要物品"], // 数组:功能标签(主+子)
"scene_explanation": "咖啡厅场景向店员礼貌索要菜单比“Give me the menu”更得体", // 字符串场景使用说明≤50字
"pronunciation_tip": "重音在menu /ˈmenjuː/have a look at 连读为 /hævəlʊkæt/", // 字符串:发音提示(重音/连读)
"core_vocab": ["menu", "look"], // 数组:核心词汇
"core_vocab_desc": ["n. 菜单", "v. 查看"], // 数组核心词汇在此句型中的含义与core_vocab顺序对应
"collocations": ["have a look at + 物品(查看某物)"], // 数组:核心搭配
"grammar_point": "情态动词Can表请求非正式主谓倒装结构Can + 主语 + 动词原形", // 核心语法解析
"common_mistakes": ["1. 漏介词atCan I have a look the menu", "2. look误读为/lʊk/(正确/luːk/", "3. 忘记在look后加atCan I have a look at the menu", ...], // 数组:句型中语法或单词用法可能出错的地方,包括但不限于常见发音错误,场景语气不当,单词单复数错误,主谓倒装错误、省略介词、省略主语等语法错误;
"pragmatic_alternative": ["Could I have a look at the menu?(更礼貌,正式场景)", "May I see the menu?(更正式,高阶)", ...], // 语用替代表达
"scene_transfer_tip": "迁移至餐厅场景Can I have a look at the wine list?把menu替换为wine list", // 场景迁移提示
"difficulty_tag": "intermediate", // 难度标签beginner/intermediate/advanced
"extended_example": ["Can I have a look at your phone?(向朋友借看手机,非正式场景)", ""], // 数组: 精简拓展例句
"response_pairs": [], // 数组对话回应搭配3-4个核心回应含肯定/否定/中性,带场景适配说明,设计意图:形成对话闭环,支持角色扮演/实际互动)
"fluency_hacks": "", // 字符串口语流畅度技巧≤30字聚焦填充词/弱读/语气调节,设计意图:贴近母语者表达节奏,避免生硬卡顿)
"cultural_note": "", // 字符串文化适配提示≤40字说明中外表达习惯差异设计意图避免文化误解提升沟通得体性
"practice_steps": [], // 数组分阶练习步骤3步每步1句话可操作设计意图提供明确学习路径衔接输入与输出提升口语落地能力
"avoid_scenarios": "", // 字符串避免使用场景≤35字明确禁忌场景+替代方案,设计意图:减少用错场合的尴尬,明确使用边界)
"self_check_list": [], // 数组自我检测清单3-4个可量化检查点含语法/发音/流畅度维度,设计意图:提供即时自查工具,无需他人批改验证效果)
"tone_intensity": "", // 字符串语气强度标注≤35字用“弱/中/强”+适用对象描述,设计意图:直观匹配语气与互动对象,避免语气不当)
"similar_sentence_distinction": "", // 字符串相似句型辨析≤40字聚焦使用场景+核心差异,不搞复杂语法,设计意图:理清易混点,避免张冠李戴)
"speech_rate_tip": "", // 字符串语速建议≤25字明确日常场景语速+关键部分节奏,设计意图:让表达更自然,提升沟通效率)
"personalized_tips": "" // 字符串个性化学习提示≤30字分初学者/进阶者给出重点建议,设计意图:适配不同水平需求,提升学习针对性)
} ] }
"""
)
return base + struct
if mode == SENTENCE_TYPE_SCENE_DIALOGUE:
struct = (
"""
"dialog": { // 对象:模拟场景对话模块(适配前端对话交互)
"roleOptions": ["customer", "barista"], // 数组可选角色固定值customer/barista
"defaultRole": "customer", // 字符串默认角色customer/barista二选一
"dialogRound": 2, // 数字对话轮数2-3轮
"list": [ // 数组对话轮次列表数量与dialogRound一致
{
"roundId": "dialog-001", // 字符串轮次唯一ID
"speaker": "barista", // 字符串本轮说话者customer/barista
"speakerEn": "Can I help you?", // 字符串:说话者英文内容
"speakerZh": "请问需要点什么?", // 字符串:说话者中文翻译
"responseOptions": [ // 数组用户可选回应固定3条
{
"optionId": "resp-001", // 字符串选项唯一ID
"optionEn": "I'd like to order a latte with less sugar.", // 字符串:选项英文内容
"optionZh": "我想点一杯少糖的拿铁。", // 字符串:选项中文翻译
"feedback": "✅ 完美该句型是咖啡厅点餐核心表达with精准补充饮品定制要求" // 字符串:选择后的交互反馈
}
]
}
]
}
"""
)
return base + "生成场景对话结构:" + struct
if mode == SENTENCE_TYPE_SCENE_EXERCISE:
struct = (
"""
"sentencePractice": { // 对象:句型套用练习模块(适配前端填空练习)
"total": 5, // 数字练习数量5-8道
"list": [ // 数组练习列表数量与total一致
{
"practiceId": "practice-001", // 字符串练习唯一ID
"baseSentenceEn": "I'd like to order ______", // 字符串:基础句型框架(挖空)
"baseSentenceZh": "我想点______", // 字符串:框架中文翻译
"keywordPool": [ // 数组可选关键词池3-4个
{
"wordEn": "latte", // 字符串:英文关键词
"wordZh": "拿铁", // 字符串:中文翻译
"type": "drink" // 字符串词汇类型drink/custom/food等
}
],
"wrongTips": [ // 数组常见错误提示2-3条
"错误order + bread面包→ 咖啡厅场景中order后优先接饮品面包需用“have”搭配"
],
"extendScene": { // 对象:拓展场景(迁移练习)
"sceneTag": "milk_tea_shop", // 字符串:拓展场景标签
"extendSentenceEn": "I'd like to order ______", // 字符串:拓展句型框架
"extendKeywordPool": ["milk tea", "taro balls", "sugar-free"] // 数组:拓展关键词池
}
}
]
"""
)
return base + "生成句型练习结构:" + struct
return base

View File

@@ -272,169 +272,9 @@ class Qwen:
@staticmethod
def get_recognition_prompt(type: str, exclude_words: List[str] | None = None) -> str:
"""获取图像识别提示词"""
# 根据dict_level确定词汇级别
vocabulary_level = "elementary level"
specificity = "basic and common"
from backend.core.prompts.recognition import get_recognition_prompt as get_prompt
return get_prompt(type, exclude_words)
# if dict_level:
# if dict_level == "LEVEL1":
# vocabulary_level = "elementary level"
# specificity = "basic and common"
# elif dict_level == "LEVEL2":
# vocabulary_level = "junior high school level"
# specificity = "more specific and detailed"
# elif dict_level == "LEVEL3":
# vocabulary_level = "college English test level"
# specificity = "precise and technical"
# elif dict_level == "LEVEL4":
# vocabulary_level = "TOEFL/IELTS level"
# specificity = "highly specialized and academic"
if type == 'word':
prompt = (
# "Vision-to-English education module."
# "Analyze image. Output JSON: "
# "Output JSON: {LEVEL1: [{description: str, desc_ipa:str, ref_word: str, word_ipa: str}, ...], LEVEL2: {...}, LEVEL3: {...}}. "
# "Each level: 4 singular lowercase nouns(single-word only, no hyphens or compounds) with one 20-word description each."
# "And each description must have a corresponding International Phonetic Alphabet (IPA) transcription in the 'desc_ipa' field."
# "Vocabulary progression: basic and common → some details and specific → technical and academic. "
# "Ensure all ref_words are unique across levels - no repetition."
# "Focus: primary/central/artificial objects."
# v2:
# "Vision-to-English-Chinese education module. Analyze and describe the image in three levels: "
# "LEVEL1 (simple vocabulary and basic grammar, ~10 words),"
# "LEVEL2 (detailed and complex vocabulary, 15-20 words),"
# "LEVEL3 (professional, uncommon words and complex grammar, ≤25 words)."
# "For each level, provide 6-8 English sentences and Chinese translations."
# "Output JSON: {LEVEL1: {desc_en:[], desc_zh:[]}, LEVEL2: {}, LEVEL3: {}}."
# "Ensure all description are unique - no repetition."
# "Focus: primary/central/artificial objects."
# v3
"""
Vision-to-English-Chinese education module.
Core objective: Analyze the image based on its PRIMARY SCENE (e.g., office, restaurant, subway, kitchen) and CENTRAL OBJECTS, generate English-Chinese sentence pairs for three learning levels (matching primary/intermediate/advanced English learners), with sentences focused on PRACTICAL, REUSABLE communication (not just grammatical complexity).
// LEVEL Definition (Binding learning goals + functions + complexity)
level1 (Beginner):
- Learning goal: Recognize core vocabulary + use basic functional sentences (describe objects/scenes, simple requests)
- Vocab: High-frequency daily words (no uncommon words)
- Grammar: Present continuous, modal verbs (can/could/would), simple clauses
- Word count per sentence: ≤15 words
- Sentence type: 6 unique functional types (detailed description, polite request, ask for information, suggest action, state need, confirm fact, express feeling)
- The sentence structure of the described object: quantity + name + feature + purpose.
level2 (Intermediate):
- Learning goal: Master scene-specific collocations + practical communication sentences (daily/office interaction)
- Vocab: Scene-specific common words + fixed collocations (e.g., "print a document", "place an order")
- Grammar: Complex clauses, passive voice, subjunctive mood (as appropriate to the scene)
- Word count per sentence: ≤25 words
- Sentence type: 8-12 unique functional types (detailed scene analysis, formal/informal contrast, conditional statement, explain purpose, ask follow-up questions, express suggestion, summarize information, clarify meaning)
// Output Requirements
1. JSON Structure (add core vocab/collocation for easy parsing):
{
"scene_tag": ["xxx", "xxx"], // e.g., "office", "café", "supermarket" (Multiple tags that are consistent with the main scene of the picture)
"description": "", // Clear and accurate description of the content of the picture, including but not limited to objects, relationships, colors, etc.
"level1": {
"desc_en": ["sentence1", "sentence2", ...], // 6 to 8 distinct sentences with different modalities (without repeating the same meaning or function. Don't use Chinese. Consistent with native English speakers' daily communication habits)
"desc_zh": ["translation1", "translation2", ...], // one-to-one with desc_en, chinese translation must be natural and not stiff, consistent with native English speakers' daily communication habits.
},
"level2": {
"desc_en": [
"Requirement: 8-12 daily spoken English sentences matching the image scenario (prioritize short sentences, ≤20 words)",
"Type: Declarative sentences / polite interrogative sentences that can be used directly (avoid formal language and complex clauses)",
"Scenario Adaptation: Strictly align with the real-life scenario shown in the image (e.g., restaurant ordering, asking for directions on the subway, chatting with friends, etc.)",
"Core Principle: Natural and not stiff, consistent with native English speakers' daily communication habits (e.g., prefer \"How's it going?\" over \"How are you recently?\")"
],
"desc_zh": [
"Requirement: Colloquial Chinese translations of the corresponding English sentences",
"Principle: Avoid literal translations and formal expressions; conform to daily Chinese speaking habits (e.g., translate \"Could you pass the salt?\" as \"能递下盐吗?\" instead of \"你能把盐递给我吗?\")",
"Adaptability: Translations should fit the logical expression of Chinese scenarios (e.g., more polite for workplace communication, more casual for friend chats)"
],
"core_vocab": [
"Requirement: 5-8 core spoken words for the scenario",
"Standard: High-frequency daily use (avoid rare words and academic terms); can directly replace key words in sentences for reuse",
"Example: For the \"supermarket shopping\" scenario, prioritize words like \"discount, check out, cart\" that can be directly applied to sentences"
],
"collocations": [
"Requirement: 5-8 high-frequency spoken collocations for the scenario",
"Standard: Short and practical fixed collocations; can be used by directly replacing core words (avoid complex phrases)",
"Example: For the \"food delivery ordering\" scenario, collocations include \"order food, pick up the phone (for delivery calls), track the order\""
],
"pragmatic_notes": [
"Requirement: 2-4 scenario-specific pragmatic notes (avoid general descriptions)",
"Content: Clear usage scenarios + tone adaptation + practical skills (e.g., \"Suitable for chatting with friends; casual tone; starting with the filler word 'actually' makes it more natural\")",
"Practical Value: Include \"replacement skills\" (e.g., \"Sentence pattern 'I'm in the mood for + [food]' can be used by directly replacing the food noun\")"
]
}
}
2. Uniqueness: No repetition in SEMANTICS/FUNCTIONS (not just literal repetition) — e.g., avoid two sentences both meaning "This is a laptop" (even with different wording).
3. Focus: Prioritize ARTIFICIAL/CENTRAL objects and PRIMARY scene (ignore trivial background elements) — e.g., for a café image, focus on "coffee", "barista", "menu" (not "wall", "floor").
4. Practicality: All sentences must be directly usable in real-life communication (avoid meaningless grammatical exercises like "I am eat a apple" corrected to "I am eating an apple").
5. Accuracy: Translations must be accurate (not literal) and match the context of the image scene.
6. Output Limit: Only return the JSON string, without any explanatory text. Ensure that it can be directly parsed by `JSON.parse`.
"""
)
if exclude_words:
exclude_str = ". ".join(exclude_words)
prompt += f"Avoid using these words: {exclude_str}."
return prompt
elif type == 'food':
return (
"你是一个专业美食识别AI请严格按以下步骤分析图片\n"
"1. 识别最显著菜品名称(需具体到品种/烹饪方式):\n"
"- 示例:清蒸鲈鱼(非清蒸鱼)、罗宋汤(非蔬菜汤)\n"
"- 无法确定具体菜品时返回“无法识别出菜品”\n"
"2. 提取核心食材3-5种主料\n"
"- 排除调味料(油/盐/酱油等)\n"
"- 混合菜(如沙拉/炒饭)列出可见食材\n"
"- 无法识别时写“未知”\n"
"3. 输出格式严格JSON, 如果有多个占据显著位置的菜品,可以将多个菜品罗列出来放到 json 数组中:\n"
"[{ dish_name: 具体菜品名1 | 无法识别出菜品, method: 烹饪方式, main_ingredients: [食材1, 食材2] },\n"
"{ dish_name: 具体菜品名2 | 无法识别出菜品, method: 烹饪方式, main_ingredients: [食材1, 食材2] }]"
)
elif type == 'scene':
return (
"""
# 角色
你是专注于英语教育的轻量级场景化句型分析助手仅输出JSON格式结果无多余解释/话术。
# 输入信息
场景标签scene_tag
英文句型sentence_en
中文翻译sentence_zh
# 输出要求
1. 功能标签生成2个标签主标签+子标签),主标签仅限「询问/请求/陈述/表达需求/建议/确认/表达感受/指出位置」,子标签需贴合场景和句型核心功能(如“索要物品”“点餐”“职场沟通”);
2. 场景说明50-80字简洁说明该句型的使用场景、语用价值如礼貌性/适配对象),语言通俗,适配英语进阶学习者;
3. 输出格式严格遵循以下JSON结构无换行/多余字符:
{
"functionTags": ["主标签", "子标签"],
"sceneExplanation": "场景说明文本"
}
# 约束
- 功能标签必须贴合「场景标签」+「句型内容」,不脱离场景;
- 场景说明不堆砌术语,聚焦“怎么用/什么时候用”,而非语法分析;
- 严格控制字符数功能标签仅2个场景说明50-80字。
# 示例参考
【输入】
场景标签café
英文句型Can I have a look at the menu?
中文翻译:我能看一下菜单吗?
【输出】
{"functionTags":["询问","索要物品"],"sceneExplanation":"该句型适用于咖啡厅/餐厅场景向服务人员礼貌索要菜单比直接说“Give me the menu”更得体适配所有餐饮消费场景的基础沟通。"}
"""
)
else:
return ""
@staticmethod
async def recognize_image(params: QwenRecognizeImageParams) -> Dict[str, Any]:

View File

@@ -139,6 +139,7 @@ jinja2==3.1.6
# fastapi
# fastapi-best-architecture
langchain==1.2.3
langchain-community==0.4.1
kombu==5.5.1
# via celery
loguru==0.7.3