Files
backend/backend/app/ai/service/qa_service.py
2026-01-10 10:57:33 +08:00

682 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import json
import math
from typing import Optional, List, Dict, Any, Tuple
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from backend.database.db import async_db_session, background_db_session
from backend.app.ai.crud.qa_crud import qa_exercise_dao, qa_question_dao, qa_attempt_dao, qa_session_dao
from backend.app.ai.crud.image_task_crud import image_task_dao
from backend.app.ai.crud.image_curd import image_dao
from backend.app.ai.model.image_task import ImageTaskStatus
from backend.app.ai.schema.image_task import CreateImageTaskParam
from backend.app.admin.service.points_service import points_service
from backend.app.ai.service.rate_limit_service import rate_limit_service
from backend.common.exception import errors
from backend.middleware.qwen import Qwen
from backend.middleware.tencent_hunyuan import Hunyuan
from backend.core.conf import settings
from backend.app.ai.service.recording_service import recording_service
from backend.common.const import EXERCISE_TYPE_CHOICE, EXERCISE_TYPE_CLOZE, EXERCISE_TYPE_FREE_TEXT, LLM_CHAT_COST
from backend.app.admin.schema.wx import DictLevel
from backend.app.ai.service.image_task_service import TaskProcessor, image_task_service
from backend.app.ai.model.image_task import ImageProcessingTask
from backend.app.ai.model.qa import QaQuestion
class QaExerciseProcessor(TaskProcessor):
async def process(self, db: AsyncSession, task: ImageProcessingTask) -> Tuple[Dict[str, Any], Dict[str, Any]]:
image = await image_dao.get(db, task.image_id)
exercise = await qa_exercise_dao.get(db, task.ref_id)
payload = {}
rr = (image.details or {}).get('recognition_result') or {}
description = ''
try:
d = rr.get('description')
if isinstance(d, str):
description = d
elif isinstance(d, list) and d:
description = d[0] if isinstance(d[0], str) else ''
except Exception:
description = ''
payload = {'description': description}
prompt = (
'### 任务目标\n'
'请基于给定的图片英语描述生成【3-4个细节类半开放问题】返回包含**问题、多版本回答、正确/错误选项、填词模式**的结构化JSON数据用于英语口语练习程序自动化调用。\n'
'### 图片描述\n'
+ json.dumps(payload, ensure_ascii=False) + '\n'
'### 生成要求\n'
'1. 问题规则细节类半开放特殊疑问句覆盖至少2个维度主体特征/动作行为/场景环境), 每个问题的维度不能重复,题干和选项都是英文;\n'
'2. JSON数据规则\n'
' - 根节点:`qa_list`数组3-4个问答对象\n'
' - 每个问答对象字段:\n'
' 1. `question`:问题内容;\n'
' 2. `dimension`:考察维度;\n'
' 3. `key_pronunciation_words`核心发音单词2-3个\n'
' 4. `answers`多版本回答spoken/written/friendly\n'
' 5. `correct_options`:正确选项数组(含`content`/`type`字段),每个选项都是一个陈述句;\n'
' 6. `incorrect_options`:错误选项数组(含`content`/`error_type`/`error_reason`字段),无语法类干扰;\n'
' 7. `cloze`:填词模式专项字段:\n'
' - `correct_word`:填空处原词,一个正确选项;\n'
' - `sentence`:含 correct_word 的完整句子;\n'
' - `distractor_words`近义词干扰项数组3-4个无语法类干扰\n'
'3. 输出限制仅返回JSON字符串无其他解释文字确保可被`JSON.parse`直接解析。\n'
'输入图片描述:' + json.dumps(payload, ensure_ascii=False) + '\n'
'### 输出JSON格式\n'
'{ "qa_list": [ { "question": "", "dimension": "", "key_pronunciation_words": [], "answers": { "spoken": "", "written": "", "friendly": "", "lively": "" }, "correct_options": [ { "content": "", "type": "core" } ], "incorrect_options": [ { "content": "", "error_type": "词汇混淆", "error_reason": "" } ], "cloze": { "sentence": "", "correct_word": "", "distractor_words": [] } } ] }'
)
res = await self._call_llm_chat(prompt=prompt, image_id=image.id, user_id=task.user_id, chat_type='qa_exercise')
if not res.get('success'):
raise Exception(res.get('error') or "LLM call failed")
token_usage = res.get('token_usage') or {}
items = []
try:
parsed = json.loads(res.get('result')) if isinstance(res.get('result'), str) else res.get('result')
if isinstance(parsed, dict):
items = parsed.get('qa_list') or []
elif isinstance(parsed, list):
items = parsed
except Exception:
items = []
created = 0
for it in items:
q = await qa_question_dao.create(db, {
'exercise_id': exercise.id,
'image_id': image.id,
'question': it.get('question') or '',
'payload': None,
'user_id': task.user_id,
'ext': {
'dimension': it.get('dimension'),
'key_pronunciation_words': it.get('key_pronunciation_words'),
'answers': it.get('answers'),
'cloze': it.get('cloze'),
'correct_options': it.get('correct_options'),
'incorrect_options': it.get('incorrect_options'),
},
})
created += 1
exercise.question_count = created
exercise.status = 'published' if created > 0 else 'draft'
await db.flush()
if created > 0:
existing_session = await qa_session_dao.get_latest_by_user_exercise(db, task.user_id, exercise.id)
if not existing_session:
prog = {'current_index': 0, 'answered': 0, 'correct': 0, 'attempts': [], 'total_questions': created}
await qa_session_dao.create(db, {
'exercise_id': exercise.id,
'starter_user_id': task.user_id,
'share_id': None,
'status': 'ongoing',
'started_at': datetime.now(),
'completed_at': None,
'progress': prog,
'score': None,
'ext': None,
})
await db.flush()
# Return result and token_usage.
# Note: image_task_service handles points deduction and final status update.
result = {'token_usage': token_usage, 'count': created}
return result, token_usage
async def _call_llm_chat(self, prompt: str, image_id: int, user_id: int, chat_type: str) -> Dict[str, Any]:
model_type = (settings.LLM_MODEL_TYPE or "").lower()
messages = [{"role": "system", "content": "You are a helpful assistant."}, {'role': 'user', 'content': prompt}]
if model_type == 'qwen':
try:
qres = await Qwen.chat(messages=[{'role': 'user', 'content': prompt}], image_id=image_id, user_id=user_id, api_type=chat_type)
if qres and qres.get('success'):
return {"success": True, "result": qres.get("result"), "token_usage": qres.get("token_usage") or {}}
except Exception as e:
return {"success": False, "error": str(e)}
return {"success": False, "error": "LLM call failed"}
else:
try:
res = await Hunyuan.chat(messages=messages, image_id=image_id, user_id=user_id, system_prompt=None, chat_type=chat_type)
if res and res.get('success'):
return res
except Exception as e:
return {"success": False, "error": str(e)}
return {"success": False, "error": "LLM call failed"}
class QaService:
async def create_exercise_task(self, image_id: int, user_id: int, title: Optional[str] = None, description: Optional[str] = None) -> Dict[str, Any]:
async with async_db_session.begin() as db:
# Check for existing active task
latest_task = await image_task_dao.get_latest_active_task(db, user_id, image_id, 'qa_exercise')
if latest_task:
return {'task_id': str(latest_task.id), 'status': latest_task.status}
if not await points_service.check_sufficient_points(user_id, LLM_CHAT_COST):
raise errors.ForbiddenError(msg='积分不足,请获取积分后继续使用')
slot_acquired = await rate_limit_service.acquire_task_slot(user_id)
if not slot_acquired:
max_tasks = await rate_limit_service.get_user_task_limit(user_id)
raise errors.ForbiddenError(msg=f'用户同时最多只能运行 {max_tasks} 个任务,请等待现有任务完成后再试')
async with async_db_session.begin() as db:
image = await image_dao.get(db, image_id)
if not image:
raise errors.NotFoundError(msg='Image not found')
exercise = await qa_exercise_dao.create(db, {
'image_id': image_id,
'created_by': user_id,
'title': title,
'description': description,
'status': 'draft',
})
await db.flush()
task = await image_task_dao.create_task(db, CreateImageTaskParam(
image_id=image_id,
user_id=user_id,
dict_level=(getattr(getattr(image, 'dict_level', None), 'name', None) or 'LEVEL1'),
ref_type='qa_exercise',
ref_id=exercise.id,
status=ImageTaskStatus.PENDING,
))
await db.flush()
task_id = task.id
await db.commit()
processor = QaExerciseProcessor()
asyncio.create_task(image_task_service.process_task(task_id, user_id, processor))
return {'task_id': str(task_id), 'status': 'accepted'}
async def get_task_status(self, task_id: int) -> Dict[str, Any]:
async with async_db_session() as db:
task = await image_task_dao.get(db, task_id)
if not task:
raise errors.NotFoundError(msg='Task not found')
return {
'task_id': str(task.id),
'image_id': str(task.image_id),
'ref_type': task.ref_type,
'ref_id': str(task.ref_id),
'status': task.status,
'error_message': task.error_message,
}
async def list_exercises_by_image(self, image_id: int, user_id: Optional[int] = None) -> Optional[Dict[str, Any]]:
async with async_db_session() as db:
image = await image_dao.get(db, image_id)
if not image:
return None
i = await qa_exercise_dao.get_latest_by_image_id(db, image_id)
if not i:
return None
qs = await qa_question_dao.get_by_exercise_id(db, i.id)
session = None
if user_id:
s = await qa_session_dao.get_latest_by_user_exercise(db, user_id, i.id)
if s:
session = {
'id': str(s.id),
'started_at': s.started_at.isoformat() if s.started_at else None,
'progress': s.progress,
}
ret = {
'exercise': {
'id': str(i.id),
'image_id': str(i.image_id),
'title': i.title,
'description': i.description,
'status': i.status,
'question_count': i.question_count,
},
'session': session,
'questions': [
{
'id': str(q.id),
'exercise_id': str(q.exercise_id),
'image_id': str(q.image_id),
'question': q.question,
'ext': q.ext,
} for q in qs
]
}
return ret
def _evaluate_choice(self, q: QaQuestion, selected_options: List[str]) -> Tuple[Dict[str, Any], str, List[str]]:
ext = q.ext or {}
raw_correct = ext.get('correct_options') or []
raw_incorrect = ext.get('incorrect_options') or []
def _norm(v):
try:
return str(v).strip().lower()
except Exception:
return str(v)
correct_set = set(_norm(o.get('content') if isinstance(o, dict) else o) for o in raw_correct)
incorrect_map = {}
for o in raw_incorrect:
c = _norm(o.get('content') if isinstance(o, dict) else o)
if isinstance(o, dict):
incorrect_map[c] = {
'content': o.get('content'),
'error_type': o.get('error_type'),
'error_reason': o.get('error_reason')
}
else:
incorrect_map[c] = {'content': o, 'error_type': None, 'error_reason': None}
selected_list = list(selected_options or [])
selected = set(_norm(s) for s in selected_list)
if not selected:
is_correct = 'incorrect'
result_text = '完全错误'
evaluation = {'type': 'choice', 'result': result_text, 'detail': 'no selection', 'selected': {'correct': [], 'incorrect': []}, 'missing_correct': [o.get('content') if isinstance(o, dict) else o for o in raw_correct]}
else:
selected_correct = []
for o in raw_correct:
c = _norm(o.get('content') if isinstance(o, dict) else o)
if c in selected:
selected_correct.append(o.get('content') if isinstance(o, dict) else o)
selected_incorrect = []
for s in selected_list:
ns = _norm(s)
if ns not in correct_set:
detail = incorrect_map.get(ns)
if detail:
selected_incorrect.append(detail)
else:
selected_incorrect.append({'content': s, 'error_type': 'unknown', 'error_reason': None})
missing_correct = []
for o in raw_correct:
c = _norm(o.get('content') if isinstance(o, dict) else o)
if c not in selected:
missing_correct.append(o.get('content') if isinstance(o, dict) else o)
if selected == correct_set and not selected_incorrect:
is_correct = 'correct'
result_text = '完全匹配'
evaluation = {'type': 'choice', 'result': result_text, 'detail': is_correct, 'selected': {'correct': selected_correct, 'incorrect': []}, 'missing_correct': []}
elif selected_correct:
is_correct = 'partial'
result_text = '部分匹配'
evaluation = {'type': 'choice', 'result': result_text, 'detail': is_correct, 'selected': {'correct': selected_correct, 'incorrect': selected_incorrect}, 'missing_correct': missing_correct}
else:
is_correct = 'incorrect'
result_text = '完全错误'
evaluation = {'type': 'choice', 'result': result_text, 'detail': is_correct, 'selected': {'correct': [], 'incorrect': selected_incorrect}, 'missing_correct': [o.get('content') if isinstance(o, dict) else o for o in raw_correct]}
return evaluation, is_correct, selected_list
def _evaluate_cloze(self, q: QaQuestion, cloze_options: Optional[List[str]]) -> Tuple[Dict[str, Any], str, str]:
ext = q.ext or {}
cloze = ext.get('cloze') or {}
correct_word = cloze.get('correct_word')
# Support multiple selections: treat as correct if any selected matches a correct answer
selection_list = [s for s in (cloze_options or []) if isinstance(s, str) and s.strip()]
input_str = selection_list[0] if selection_list else ''
def _norm(v):
try:
return str(v).strip().lower()
except Exception:
return str(v)
# correct answers may be a single string or a list
correct_candidates = []
if isinstance(correct_word, list):
correct_candidates = [cw for cw in correct_word if isinstance(cw, str) and cw.strip()]
elif isinstance(correct_word, str) and correct_word.strip():
correct_candidates = [correct_word]
correct_set = set(_norm(cw) for cw in correct_candidates)
user_correct = []
user_incorrect = []
for s in selection_list:
if _norm(s) in correct_set:
user_correct.append(s)
else:
user_incorrect.append({'content': s, 'error_type': None, 'error_reason': None})
if user_correct and not user_incorrect:
is_correct = 'correct'
result_text = '完全匹配'
evaluation = {'type': 'cloze', 'result': result_text, 'detail': is_correct, 'selected': {'correct': user_correct, 'incorrect': []}, 'missing_correct': []}
elif user_correct:
is_correct = 'partial'
result_text = '部分匹配'
evaluation = {'type': 'cloze', 'result': result_text, 'detail': is_correct, 'selected': {'correct': user_correct, 'incorrect': user_incorrect}, 'missing_correct': []}
else:
is_correct = 'incorrect'
result_text = '完全错误'
evaluation = {'type': 'cloze', 'result': result_text, 'detail': is_correct, 'selected': {'correct': [], 'incorrect': user_incorrect}, 'missing_correct': [cw for cw in correct_candidates]}
return evaluation, is_correct, input_str
async def submit_attempt(self, question_id: int, exercise_id: int, user_id: int, mode: str, selected_options: Optional[List[str]] = None, input_text: Optional[str] = None, cloze_options: Optional[List[str]] = None, file_id: Optional[int] = None, session_id: Optional[int] = None, is_trial: bool = False) -> Dict[str, Any]:
async with async_db_session.begin() as db:
q = await qa_question_dao.get(db, question_id)
if not q or q.exercise_id != exercise_id:
raise errors.NotFoundError(msg='Question not found')
# Optimization: If trial mode and synchronous evaluation (Choice/Cloze), skip DB persistence
if is_trial:
if mode == EXERCISE_TYPE_CHOICE:
evaluation, _, selected_list = self._evaluate_choice(q, selected_options)
return {
'session_id': None,
'type': 'choice',
'choice': {
'options': selected_list,
'evaluation': evaluation
}
}
elif mode == EXERCISE_TYPE_CLOZE:
c_opts = cloze_options
if not c_opts and input_text:
c_opts = [input_text]
evaluation, _, input_str = self._evaluate_cloze(q, c_opts)
return {
'session_id': None,
'type': 'cloze',
'cloze': {
'input': input_str,
'evaluation': evaluation
}
}
recording_id = None
attempt = await qa_attempt_dao.get_latest_by_user_question(db, user_id=user_id, question_id=question_id)
if attempt:
attempt.task_id = None
attempt.choice_options = selected_options if mode == EXERCISE_TYPE_CHOICE else attempt.choice_options
if mode == EXERCISE_TYPE_CLOZE:
if isinstance(cloze_options, list) and cloze_options:
attempt.cloze_options = cloze_options[0]
elif input_text:
attempt.cloze_options = input_text
attempt.input_text = input_text if mode == EXERCISE_TYPE_FREE_TEXT else attempt.input_text
attempt.status = 'pending'
ext0 = attempt.ext or {}
if session_id:
ext0['session_id'] = session_id
if is_trial:
ext0['is_trial'] = True
elif 'is_trial' in ext0:
del ext0['is_trial']
attempt.ext = ext0
await db.flush()
else:
attempt = await qa_attempt_dao.create(db, {
'question_id': question_id,
'exercise_id': exercise_id,
'user_id': user_id,
'task_id': None,
'recording_id': recording_id,
'choice_options': selected_options if mode == EXERCISE_TYPE_CHOICE else None,
'cloze_options': ((cloze_options[0] if isinstance(cloze_options, list) and cloze_options else (input_text if input_text else None)) if mode == EXERCISE_TYPE_CLOZE else None),
'input_text': input_text if mode == EXERCISE_TYPE_FREE_TEXT else None,
'status': 'pending',
'evaluation': None,
'ext': {'is_trial': True} if is_trial else None,
})
if not is_trial:
s = await qa_session_dao.get_latest_by_user_exercise(db, user_id, exercise_id)
if s and s.exercise_id == exercise_id:
prog = dict(s.progress or {})
attempts = list(prog.get('attempts') or [])
replaced = False
for idx, a in enumerate(attempts):
if a.get('question_id') == question_id and a.get('mode') == mode:
attempts[idx] = {
'attempt_id': attempt.id,
'question_id': str(question_id),
'mode': mode,
'created_at': datetime.now().isoformat(),
'is_correct': a.get('is_correct'),
}
replaced = True
break
if not replaced:
attempts.append({
'attempt_id': attempt.id,
'question_id': str(question_id),
'mode': mode,
'created_at': datetime.now().isoformat(),
'is_correct': None,
})
prog['answered'] = int(prog.get('answered') or 0) + 1
prog['attempts'] = attempts
s.progress = prog
attempt.ext = {**(attempt.ext or {}), 'session_id': s.id}
await db.flush()
if mode == EXERCISE_TYPE_FREE_TEXT:
attempt.ext = {**(attempt.ext or {}), 'type': 'free_text', 'free_text': {'text': attempt.input_text or '', 'evaluation': None}}
await db.flush()
async with async_db_session.begin() as db2:
task = await image_task_dao.create_task(db2, CreateImageTaskParam(
image_id=q.image_id,
user_id=user_id,
dict_level=DictLevel.LEVEL1.value,
ref_type='qa_question_attempt',
ref_id=attempt.id,
status=ImageTaskStatus.PENDING,
))
await db2.flush()
asyncio.create_task(self._process_attempt_evaluation(task.id, user_id))
session_id_val = (attempt.ext or {}).get('session_id')
return {
'session_id': str(session_id_val) if session_id_val is not None else None,
'type': 'free_text',
'free_text': {
'text': attempt.input_text or '',
'evaluation': None
}
}
# Synchronous evaluation for choice/cloze
if mode == EXERCISE_TYPE_CHOICE:
evaluation, is_correct, selected_list = self._evaluate_choice(q, attempt.choice_options)
# update ext with choice details
attempt.ext = {**(attempt.ext or {}), 'type': 'choice', 'choice': {'options': selected_list, 'evaluation': evaluation}}
await db.flush()
merged_eval = dict(attempt.evaluation or {})
merged_eval['choice'] = {'options': selected_list, 'evaluation': evaluation}
await qa_attempt_dao.update_status(db, attempt.id, 'completed', merged_eval)
if not is_trial:
s = await qa_session_dao.get_latest_by_user_exercise(db, user_id, exercise_id)
if s and s.exercise_id == attempt.exercise_id:
prog = dict(s.progress or {})
attempts = list(prog.get('attempts') or [])
prev = None
for a in attempts:
if a.get('attempt_id') == attempt.id:
prev = a.get('is_correct')
a['is_correct'] = is_correct
break
prev_correct = 1 if prev == 'correct' else 0
new_correct = 1 if is_correct == 'correct' else 0
correct_inc = new_correct - prev_correct
prog['attempts'] = attempts
prog['correct'] = int(prog.get('correct') or 0) + correct_inc
s.progress = prog
await db.flush()
await db.commit()
# return latest result structure
session_id_val = (attempt.ext or {}).get('session_id')
return {
'session_id': str(session_id_val) if session_id_val is not None else None,
'type': 'choice',
'choice': {
'options': selected_list,
'evaluation': evaluation
}
}
if mode == EXERCISE_TYPE_CLOZE:
c_opts: List[str] = []
if isinstance(cloze_options, list) and cloze_options:
c_opts = cloze_options
elif input_text:
c_opts = [input_text]
elif attempt.cloze_options:
c_opts = [attempt.cloze_options]
if cloze_options:
c_opts = cloze_options
evaluation, is_correct, input_str = self._evaluate_cloze(q, c_opts)
# update ext with cloze details
attempt.ext = {**(attempt.ext or {}), 'type': 'cloze', 'cloze': {'input': input_str, 'evaluation': evaluation}}
await db.flush()
merged_eval = dict(attempt.evaluation or {})
merged_eval['cloze'] = {'input': input_str, 'evaluation': evaluation}
await qa_attempt_dao.update_status(db, attempt.id, 'completed', merged_eval)
if not is_trial:
s = await qa_session_dao.get_latest_by_user_exercise(db, user_id, exercise_id)
if s and s.exercise_id == attempt.exercise_id:
prog = dict(s.progress or {})
attempts = list(prog.get('attempts') or [])
prev = None
for a in attempts:
if a.get('attempt_id') == attempt.id:
prev = a.get('is_correct')
a['is_correct'] = is_correct
break
prev_correct = 1 if prev == 'correct' else 0
new_correct = 1 if is_correct == 'correct' else 0
correct_inc = new_correct - prev_correct
prog['attempts'] = attempts
prog['correct'] = int(prog.get('correct') or 0) + correct_inc
s.progress = prog
await db.flush()
await db.commit()
# return latest result structure
session_id_val = (attempt.ext or {}).get('session_id')
return {
'session_id': str(session_id_val) if session_id_val is not None else None,
'type': 'cloze',
'cloze': {
'input': input_str,
'evaluation': evaluation
}
}
async def _process_attempt_evaluation(self, task_id: int, user_id: int):
async with background_db_session() as db:
task = await image_task_dao.get(db, task_id)
if not task:
return
await image_task_dao.update_task_status(db, task_id, ImageTaskStatus.PROCESSING)
attempt = await qa_attempt_dao.get(db, task.ref_id)
if not attempt:
await image_task_dao.update_task_status(db, task_id, ImageTaskStatus.FAILED, error_message='Attempt not found')
await db.commit()
return
is_trial = (attempt.ext or {}).get('is_trial', False)
# Only async evaluation for free_text/audio attempts
q = await qa_question_dao.get(db, attempt.question_id)
user_text = attempt.input_text or ''
answers = (q.ext or {}).get('answers') or {}
prompt = (
'根据给定标准答案判断用户回答是否正确输出JSON{is_correct: correct|partial|incorrect, feedback: string}。'
'标准答案:' + json.dumps(answers, ensure_ascii=False) +
'用户回答:' + user_text
)
res = await self._call_llm_chat(prompt=prompt, image_id=q.image_id, user_id=user_id, chat_type='qa_attempt')
if not res.get('success'):
await image_task_dao.update_task_status(db, task_id, ImageTaskStatus.FAILED, error_message=res.get('error'))
await db.commit()
return
try:
parsed = json.loads(res.get('result')) if isinstance(res.get('result'), str) else res.get('result')
except Exception:
parsed = {}
evaluation = {'type': 'free_text', 'result': parsed.get('is_correct'), 'feedback': parsed.get('feedback')}
# update ext with free_text details
attempt.ext = {**(attempt.ext or {}), 'type': 'free_text', 'free_text': {'text': attempt.input_text or '', 'evaluation': evaluation}}
await db.flush()
merged_eval = dict(attempt.evaluation or {})
merged_eval['free_text'] = {'text': attempt.input_text or '', 'evaluation': evaluation}
await qa_attempt_dao.update_status(db, attempt.id, 'completed', merged_eval)
await image_task_dao.update_task_status(db, task_id, ImageTaskStatus.COMPLETED, result={'mode': 'free_text', 'token_usage': res.get('token_usage') or {}})
if not is_trial:
s = await qa_session_dao.get_latest_by_user_exercise(db, user_id, attempt.exercise_id)
if s and s.exercise_id == attempt.exercise_id:
prog = dict(s.progress or {})
attempts = list(prog.get('attempts') or [])
prev = None
for a in attempts:
if a.get('attempt_id') == attempt.id:
prev = a.get('is_correct')
a['is_correct'] = parsed.get('is_correct')
break
prev_correct = 1 if prev == 'correct' else 0
new_correct = 1 if parsed.get('is_correct') == 'correct' else 0
correct_inc = new_correct - prev_correct
prog['attempts'] = attempts
prog['correct'] = int(prog.get('correct') or 0) + correct_inc
s.progress = prog
await db.flush()
await db.commit()
async def _call_llm_chat(self, prompt: str, image_id: int, user_id: int, chat_type: str) -> Dict[str, Any]:
model_type = (settings.LLM_MODEL_TYPE or "").lower()
messages = [{"role": "system", "content": "You are a helpful assistant."}, {'role': 'user', 'content': prompt}]
if model_type == 'qwen':
try:
qres = await Qwen.chat(messages=[{'role': 'user', 'content': prompt}], image_id=image_id, user_id=user_id, api_type=chat_type)
if qres and qres.get('success'):
return {"success": True, "result": qres.get("result"), "token_usage": qres.get("token_usage") or {}}
except Exception as e:
return {"success": False, "error": str(e)}
return {"success": False, "error": "LLM call failed"}
else:
try:
res = await Hunyuan.chat(messages=messages, image_id=image_id, user_id=user_id, system_prompt=None, chat_type=chat_type)
if res and res.get('success'):
return res
except Exception as e:
return {"success": False, "error": str(e)}
return {"success": False, "error": "LLM call failed"}
async def get_attempt_task_status(self, task_id: int) -> Dict[str, Any]:
return await self.get_task_status(task_id)
async def get_question_evaluation(self, question_id: int, user_id: int) -> Dict[str, Any]:
async with async_db_session() as db:
# Exclude trial attempts by default so they don't pollute normal mode history
latest = await qa_attempt_dao.get_latest_completed_by_user_question(db, user_id=user_id, question_id=question_id, exclude_trial=True)
if not latest:
latest = await qa_attempt_dao.get_latest_valid_by_user_question(db, user_id=user_id, question_id=question_id, exclude_trial=True)
if not latest:
return {}
evalution = latest.evaluation or {}
session_id = evalution.get('session_id')
ret = {
'session_id': str(session_id) if session_id is not None else None,
'type': evalution.get('type'),
}
if 'choice' in evalution:
ch = evalution.get('choice') or {}
ret['choice'] = {
'options': ch.get('options') or [],
'evaluation': ch.get('evaluation') or None,
}
if 'cloze' in evalution:
cz = evalution.get('cloze') or {}
ret['cloze'] = {
'input': cz.get('input') or '',
'evaluation': cz.get('evaluation') or None,
}
if 'free_text' in evalution:
ft = evalution.get('free_text') or {}
ret['free_text'] = {
'text': ft.get('text') or '',
'evaluation': ft.get('evaluation') or None,
}
return ret
qa_service = QaService()