This commit is contained in:
Felix
2026-01-02 17:48:40 +08:00
parent 904ab008f6
commit 229174ac5e
2 changed files with 212 additions and 191 deletions

View File

@@ -33,6 +33,7 @@ class QaQuestionSchema(SchemaBase):
ext: Optional[Dict[str, Any]] = None
class QaSessionSchema(SchemaBase):
id: Optional[str] = None
started_at: Optional[str] = None
progress: Optional[Dict[str, Any]] = None

View File

@@ -53,7 +53,7 @@ class QaExerciseProcessor(TaskProcessor):
' 1. `question`:问题内容;\n'
' 2. `dimension`:考察维度;\n'
' 3. `key_pronunciation_words`核心发音单词2-3个\n'
' 4. `answers`多版本回答spoken/written/friendly/lively\n'
' 4. `answers`多版本回答spoken/written/friendly\n'
' 5. `correct_options`:正确选项数组(含`content`/`type`字段);\n'
' 6. `incorrect_options`:错误选项数组(含`content`/`error_type`/`error_reason`字段);\n'
' 7. `cloze`:填词模式专项字段:\n'
@@ -148,6 +148,12 @@ class QaExerciseProcessor(TaskProcessor):
class QaService:
async def create_exercise_task(self, image_id: int, user_id: int, title: Optional[str] = None, description: Optional[str] = None) -> Dict[str, Any]:
async with async_db_session.begin() as db:
# Check for existing active task
latest_task = await image_task_dao.get_latest_active_task(db, user_id, image_id, 'qa_exercise')
if latest_task:
return {'task_id': str(latest_task.id), 'status': latest_task.status}
if not await points_service.check_sufficient_points(user_id, LLM_CHAT_COST):
raise errors.ForbiddenError(msg='积分不足,请获取积分后继续使用')
slot_acquired = await rate_limit_service.acquire_task_slot(user_id)
@@ -210,6 +216,7 @@ class QaService:
s = await qa_session_dao.get_latest_by_user_exercise(db, user_id, i.id)
if s:
session = {
'id': str(s.id),
'started_at': s.started_at.isoformat() if s.started_at else None,
'progress': s.progress,
}
@@ -265,138 +272,208 @@ class QaService:
'input_text': input_text if mode == EXERCISE_TYPE_FREE_TEXT else None,
'status': 'pending',
'evaluation': None,
'ext': {'session_id': session_id} if session_id else None,
'ext': None,
})
if session_id:
s = await qa_session_dao.get(db, session_id)
if s and s.exercise_id == exercise_id:
prog = s.progress or {}
attempts = list(prog.get('attempts') or [])
replaced = False
for idx, a in enumerate(attempts):
if a.get('question_id') == question_id:
attempts[idx] = {
'attempt_id': attempt.id,
'question_id': question_id,
'mode': mode,
'created_at': datetime.now().isoformat(),
'is_correct': a.get('is_correct'),
}
replaced = True
break
if not replaced:
attempts.append({
s = await qa_session_dao.get_latest_by_user_exercise(db, user_id, exercise_id)
if s and s.exercise_id == exercise_id:
prog = dict(s.progress or {})
attempts = list(prog.get('attempts') or [])
replaced = False
for idx, a in enumerate(attempts):
if a.get('question_id') == question_id and a.get('mode') == mode:
attempts[idx] = {
'attempt_id': attempt.id,
'question_id': question_id,
'question_id': str(question_id),
'mode': mode,
'created_at': datetime.now().isoformat(),
'is_correct': None,
})
prog['answered'] = int(prog.get('answered') or 0) + 1
'is_correct': a.get('is_correct'),
}
replaced = True
break
if not replaced:
attempts.append({
'attempt_id': attempt.id,
'question_id': str(question_id),
'mode': mode,
'created_at': datetime.now().isoformat(),
'is_correct': None,
})
prog['answered'] = int(prog.get('answered') or 0) + 1
prog['attempts'] = attempts
s.progress = prog
attempt.ext = {**(attempt.ext or {}), 'session_id': s.id}
await db.flush()
if mode == EXERCISE_TYPE_FREE_TEXT:
attempt.ext = {**(attempt.ext or {}), 'type': 'free_text', 'free_text': {'text': attempt.input_text or '', 'evaluation': None}}
await db.flush()
async with async_db_session.begin() as db2:
task = await image_task_dao.create_task(db2, CreateImageTaskParam(
image_id=q.image_id,
user_id=user_id,
dict_level=DictLevel.LEVEL1.value,
ref_type='qa_question_attempt',
ref_id=attempt.id,
status=ImageTaskStatus.PENDING,
))
await db2.flush()
asyncio.create_task(self._process_attempt_evaluation(task.id, user_id))
session_id_val = (attempt.ext or {}).get('session_id')
return {
'session_id': str(session_id_val) if session_id_val is not None else None,
'type': 'free_text',
'free_text': {
'text': attempt.input_text or '',
'evaluation': None
}
}
# Synchronous evaluation for choice/cloze
if mode == EXERCISE_TYPE_CHOICE:
ext = q.ext or {}
raw_correct = ext.get('correct_options') or []
raw_incorrect = ext.get('incorrect_options') or []
def _norm(v):
try:
return str(v).strip().lower()
except Exception:
return str(v)
correct_set = set(_norm(o.get('content') if isinstance(o, dict) else o) for o in raw_correct)
incorrect_map = {}
for o in raw_incorrect:
c = _norm(o.get('content') if isinstance(o, dict) else o)
if isinstance(o, dict):
incorrect_map[c] = {
'content': o.get('content'),
'error_type': o.get('error_type'),
'error_reason': o.get('error_reason')
}
else:
incorrect_map[c] = {'content': o, 'error_type': None, 'error_reason': None}
selected_list = list(attempt.choice_options or [])
selected = set(_norm(s) for s in selected_list)
if not selected:
is_correct = 'incorrect'
result_text = '完全错误'
evaluation = {'type': 'choice', 'result': result_text, 'detail': 'no selection', 'selected': {'correct': [], 'incorrect': []}, 'missing_correct': [o.get('content') if isinstance(o, dict) else o for o in raw_correct]}
# update ext with choice details
attempt.ext = {**(attempt.ext or {}), 'type': 'choice', 'choice': {'options': selected_list, 'evaluation': evaluation}}
await db.flush()
merged_eval = dict(attempt.evaluation or {})
merged_eval['choice'] = {'options': selected_list, 'evaluation': evaluation}
await qa_attempt_dao.update_status(db, attempt.id, 'completed', merged_eval)
else:
selected_correct = []
for o in raw_correct:
c = _norm(o.get('content') if isinstance(o, dict) else o)
if c in selected:
selected_correct.append(o.get('content') if isinstance(o, dict) else o)
selected_incorrect = []
for s in selected_list:
ns = _norm(s)
if ns not in correct_set:
detail = incorrect_map.get(ns)
if detail:
selected_incorrect.append(detail)
else:
selected_incorrect.append({'content': s, 'error_type': 'unknown', 'error_reason': None})
missing_correct = []
for o in raw_correct:
c = _norm(o.get('content') if isinstance(o, dict) else o)
if c not in selected:
missing_correct.append(o.get('content') if isinstance(o, dict) else o)
if selected == correct_set and not selected_incorrect:
is_correct = 'correct'
result_text = '完全匹配'
evaluation = {'type': 'choice', 'result': result_text, 'detail': is_correct, 'selected': {'correct': selected_correct, 'incorrect': []}, 'missing_correct': []}
elif selected_correct:
is_correct = 'partial'
result_text = '部分匹配'
evaluation = {'type': 'choice', 'result': result_text, 'detail': is_correct, 'selected': {'correct': selected_correct, 'incorrect': selected_incorrect}, 'missing_correct': missing_correct}
else:
is_correct = 'incorrect'
result_text = '完全错误'
evaluation = {'type': 'choice', 'result': result_text, 'detail': is_correct, 'selected': {'correct': [], 'incorrect': selected_incorrect}, 'missing_correct': [o.get('content') if isinstance(o, dict) else o for o in raw_correct]}
# update ext with choice details
attempt.ext = {**(attempt.ext or {}), 'type': 'choice', 'choice': {'options': selected_list, 'evaluation': evaluation}}
await db.flush()
merged_eval = dict(attempt.evaluation or {})
merged_eval['choice'] = {'options': selected_list, 'evaluation': evaluation}
await qa_attempt_dao.update_status(db, attempt.id, 'completed', merged_eval)
s = await qa_session_dao.get_latest_by_user_exercise(db, user_id, exercise_id)
if s and s.exercise_id == attempt.exercise_id:
prog = dict(s.progress or {})
attempts = list(prog.get('attempts') or [])
prev = None
for a in attempts:
if a.get('attempt_id') == attempt.id:
prev = a.get('is_correct')
a['is_correct'] = is_correct
break
prev_correct = 1 if prev == 'correct' else 0
new_correct = 1 if is_correct == 'correct' else 0
correct_inc = new_correct - prev_correct
prog['attempts'] = attempts
prog['correct'] = int(prog.get('correct') or 0) + correct_inc
s.progress = prog
await db.flush()
if mode == EXERCISE_TYPE_FREE_TEXT:
attempt.ext = {**(attempt.ext or {}), 'type': 'free_text', 'free_text': {'text': attempt.input_text or '', 'evaluation': None}}
await db.flush()
async with async_db_session.begin() as db2:
task = await image_task_dao.create_task(db2, CreateImageTaskParam(
image_id=q.image_id,
user_id=user_id,
dict_level=DictLevel.LEVEL1.value,
ref_type='qa_question_attempt',
ref_id=attempt.id,
status=ImageTaskStatus.PENDING,
))
await db2.flush()
asyncio.create_task(self._process_attempt_evaluation(task.id, user_id))
session_id_val = (attempt.ext or {}).get('session_id')
return {
'session_id': str(session_id_val) if session_id_val is not None else None,
'type': 'free_text',
'free_text': {
'text': attempt.input_text or '',
'evaluation': None
}
}
# Synchronous evaluation for choice/cloze
if mode == EXERCISE_TYPE_CHOICE:
ext = q.ext or {}
raw_correct = ext.get('correct_options') or []
raw_incorrect = ext.get('incorrect_options') or []
def _norm(v):
try:
return str(v).strip().lower()
except Exception:
return str(v)
correct_set = set(_norm(o.get('content') if isinstance(o, dict) else o) for o in raw_correct)
incorrect_map = {}
for o in raw_incorrect:
c = _norm(o.get('content') if isinstance(o, dict) else o)
if isinstance(o, dict):
incorrect_map[c] = {
'content': o.get('content'),
'error_type': o.get('error_type'),
'error_reason': o.get('error_reason')
}
else:
incorrect_map[c] = {'content': o, 'error_type': None, 'error_reason': None}
selected_list = list(attempt.choice_options or [])
selected = set(_norm(s) for s in selected_list)
if not selected:
is_correct = 'incorrect'
result_text = '完全错误'
evaluation = {'type': 'choice', 'result': result_text, 'detail': 'no selection', 'selected': {'correct': [], 'incorrect': []}, 'missing_correct': [o.get('content') if isinstance(o, dict) else o for o in raw_correct]}
# update ext with choice details
attempt.ext = {**(attempt.ext or {}), 'type': 'choice', 'choice': {'options': selected_list, 'evaluation': evaluation}}
await db.flush()
merged_eval = dict(attempt.evaluation or {})
merged_eval['choice'] = {'options': selected_list, 'evaluation': evaluation}
await qa_attempt_dao.update_status(db, attempt.id, 'completed', merged_eval)
else:
selected_correct = []
for o in raw_correct:
c = _norm(o.get('content') if isinstance(o, dict) else o)
if c in selected:
selected_correct.append(o.get('content') if isinstance(o, dict) else o)
selected_incorrect = []
for s in selected_list:
ns = _norm(s)
if ns not in correct_set:
detail = incorrect_map.get(ns)
if detail:
selected_incorrect.append(detail)
else:
selected_incorrect.append({'content': s, 'error_type': 'unknown', 'error_reason': None})
missing_correct = []
for o in raw_correct:
c = _norm(o.get('content') if isinstance(o, dict) else o)
if c not in selected:
missing_correct.append(o.get('content') if isinstance(o, dict) else o)
if selected == correct_set and not selected_incorrect:
await db.commit()
# return latest result structure
return await self.get_question_evaluation(question_id, user_id)
if mode == EXERCISE_TYPE_CLOZE:
ext = q.ext or {}
cloze = ext.get('cloze') or {}
correct_word = cloze.get('correct_word')
distractors = cloze.get('distractor_words') or []
# Support multiple selections: treat as correct if any selected matches a correct answer
selection_list = cloze_options or ([attempt.cloze_options] if attempt.cloze_options else ([attempt.input_text] if attempt.input_text else []))
selection_list = [s for s in selection_list if isinstance(s, str) and s.strip()]
user_text_first = (selection_list[0] if selection_list else '').strip()
def _norm(v):
try:
return str(v).strip().lower()
except Exception:
return str(v)
# correct answers may be a single string or a list
correct_candidates = []
if isinstance(correct_word, list):
correct_candidates = [cw for cw in correct_word if isinstance(cw, str) and cw.strip()]
elif isinstance(correct_word, str) and correct_word.strip():
correct_candidates = [correct_word]
correct_set = set(_norm(cw) for cw in correct_candidates)
user_correct = []
user_incorrect = []
for s in selection_list:
if _norm(s) in correct_set:
user_correct.append(s)
else:
user_incorrect.append({'content': s, 'error_type': None, 'error_reason': None})
if user_correct and not user_incorrect:
is_correct = 'correct'
result_text = '完全匹配'
evaluation = {'type': 'choice', 'result': result_text, 'detail': is_correct, 'selected': {'correct': selected_correct, 'incorrect': []}, 'missing_correct': []}
elif selected_correct:
evaluation = {'type': 'cloze', 'result': result_text, 'detail': is_correct, 'selected': {'correct': user_correct, 'incorrect': []}, 'missing_correct': []}
elif user_correct:
is_correct = 'partial'
result_text = '部分匹配'
evaluation = {'type': 'choice', 'result': result_text, 'detail': is_correct, 'selected': {'correct': selected_correct, 'incorrect': selected_incorrect}, 'missing_correct': missing_correct}
evaluation = {'type': 'cloze', 'result': result_text, 'detail': is_correct, 'selected': {'correct': user_correct, 'incorrect': user_incorrect}, 'missing_correct': []}
else:
is_correct = 'incorrect'
result_text = '完全错误'
evaluation = {'type': 'choice', 'result': result_text, 'detail': is_correct, 'selected': {'correct': [], 'incorrect': selected_incorrect}, 'missing_correct': [o.get('content') if isinstance(o, dict) else o for o in raw_correct]}
# update ext with choice details
attempt.ext = {**(attempt.ext or {}), 'type': 'choice', 'choice': {'options': selected_list, 'evaluation': evaluation}}
mc = correct_candidates if correct_candidates else []
evaluation = {'type': 'cloze', 'result': result_text, 'detail': is_correct, 'selected': {'correct': [], 'incorrect': user_incorrect}, 'missing_correct': mc}
# update ext with cloze details
attempt.ext = {**(attempt.ext or {}), 'type': 'cloze', 'cloze': {'input': attempt.cloze_options or user_text_first, 'evaluation': evaluation}}
await db.flush()
merged_eval = dict(attempt.evaluation or {})
merged_eval['choice'] = {'options': selected_list, 'evaluation': evaluation}
merged_eval['cloze'] = {'input': attempt.cloze_options or user_text_first, 'evaluation': evaluation}
await qa_attempt_dao.update_status(db, attempt.id, 'completed', merged_eval)
sid = (attempt.ext or {}).get('session_id') if attempt.ext else None
if sid:
s = await qa_session_dao.get(db, sid)
s = await qa_session_dao.get_latest_by_user_exercise(db, user_id, exercise_id)
if s and s.exercise_id == attempt.exercise_id:
prog = s.progress or {}
prog = dict(s.progress or {})
attempts = list(prog.get('attempts') or [])
prev = None
for a in attempts:
if a.get('attempt_id') == attempt.id:
prev = a.get('is_correct')
@@ -409,65 +486,9 @@ class QaService:
prog['correct'] = int(prog.get('correct') or 0) + correct_inc
s.progress = prog
await db.flush()
await db.commit()
# return latest result structure
return await self.get_question_evaluation(question_id, user_id)
if mode == EXERCISE_TYPE_CLOZE:
ext = q.ext or {}
cloze = ext.get('cloze') or {}
correct_word = cloze.get('correct_word')
distractors = cloze.get('distractor_words') or []
# Support multiple selections: treat as correct if any selected matches a correct answer
selection_list = cloze_options or ([attempt.cloze_options] if attempt.cloze_options else ([attempt.input_text] if attempt.input_text else []))
selection_list = [s for s in selection_list if isinstance(s, str) and s.strip()]
user_text_first = (selection_list[0] if selection_list else '').strip()
def _norm(v):
try:
return str(v).strip().lower()
except Exception:
return str(v)
# correct answers may be a single string or a list
correct_candidates = []
if isinstance(correct_word, list):
correct_candidates = [cw for cw in correct_word if isinstance(cw, str) and cw.strip()]
elif isinstance(correct_word, str) and correct_word.strip():
correct_candidates = [correct_word]
correct_set = set(_norm(cw) for cw in correct_candidates)
selected_set = set(_norm(s) for s in selection_list)
is_correct = 'correct' if (selected_set and (selected_set & correct_set)) else 'incorrect'
result_text = '完全匹配' if is_correct == 'correct' else '完全错误'
if is_correct == 'incorrect':
mc = correct_candidates if correct_candidates else []
evaluation = {'type': 'cloze', 'result': result_text, 'detail': is_correct, 'missing_correct': mc}
else:
evaluation = {'type': 'cloze', 'result': result_text, 'detail': is_correct}
# update ext with cloze details
attempt.ext = {**(attempt.ext or {}), 'type': 'cloze', 'cloze': {'input': attempt.cloze_options or user_text_first, 'evaluation': evaluation}}
await db.flush()
merged_eval = dict(attempt.evaluation or {})
merged_eval['cloze'] = {'input': attempt.cloze_options or user_text_first, 'evaluation': evaluation}
await qa_attempt_dao.update_status(db, attempt.id, 'completed', merged_eval)
sid = (attempt.ext or {}).get('session_id') if attempt.ext else None
if sid:
s = await qa_session_dao.get(db, sid)
if s and s.exercise_id == attempt.exercise_id:
prog = s.progress or {}
attempts = list(prog.get('attempts') or [])
for a in attempts:
if a.get('attempt_id') == attempt.id:
prev = a.get('is_correct')
a['is_correct'] = is_correct
break
prev_correct = 1 if prev == 'correct' else 0
new_correct = 1 if is_correct == 'correct' else 0
correct_inc = new_correct - prev_correct
prog['attempts'] = attempts
prog['correct'] = int(prog.get('correct') or 0) + correct_inc
s.progress = prog
await db.flush()
await db.commit()
# return latest result structure
return await self.get_question_evaluation(question_id, user_id)
await db.commit()
# return latest result structure
return await self.get_question_evaluation(question_id, user_id)
async def _process_attempt_evaluation(self, task_id: int, user_id: int):
async with background_db_session() as db:
@@ -506,24 +527,23 @@ class QaService:
merged_eval['free_text'] = {'text': attempt.input_text or '', 'evaluation': evaluation}
await qa_attempt_dao.update_status(db, attempt.id, 'completed', merged_eval)
await image_task_dao.update_task_status(db, task_id, ImageTaskStatus.COMPLETED, result={'mode': 'free_text', 'token_usage': res.get('token_usage') or {}})
sid = (attempt.ext or {}).get('session_id') if attempt.ext else None
if sid:
s = await qa_session_dao.get(db, sid)
if s and s.exercise_id == attempt.exercise_id:
prog = s.progress or {}
attempts = list(prog.get('attempts') or [])
for a in attempts:
if a.get('attempt_id') == attempt.id:
prev = a.get('is_correct')
a['is_correct'] = parsed.get('is_correct')
break
prev_correct = 1 if prev == 'correct' else 0
new_correct = 1 if parsed.get('is_correct') == 'correct' else 0
correct_inc = new_correct - prev_correct
prog['attempts'] = attempts
prog['correct'] = int(prog.get('correct') or 0) + correct_inc
s.progress = prog
await db.flush()
s = await qa_session_dao.get_latest_by_user_exercise(db, user_id, attempt.exercise_id)
if s and s.exercise_id == attempt.exercise_id:
prog = dict(s.progress or {})
attempts = list(prog.get('attempts') or [])
prev = None
for a in attempts:
if a.get('attempt_id') == attempt.id:
prev = a.get('is_correct')
a['is_correct'] = parsed.get('is_correct')
break
prev_correct = 1 if prev == 'correct' else 0
new_correct = 1 if parsed.get('is_correct') == 'correct' else 0
correct_inc = new_correct - prev_correct
prog['attempts'] = attempts
prog['correct'] = int(prog.get('correct') or 0) + correct_inc
s.progress = prog
await db.flush()
await db.commit()
async def _call_llm_chat(self, prompt: str, image_id: int, user_id: int, chat_type: str) -> Dict[str, Any]: