Files
backend/backend/middleware/qwen.py
2025-12-13 11:54:59 +08:00

598 lines
28 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
import requests
import time
import json
import dashscope
from dashscope.api_entities.dashscope_response import DashScopeAPIResponse
from backend.app.admin.model.audit_log import AuditLog
from backend.app.admin.schema.audit_log import CreateAuditLogParam
from backend.app.admin.schema.qwen import QwenEmbedImageParams, QwenRecognizeImageParams
from backend.app.admin.service.audit_log_service import audit_log_service
from backend.common.exception import errors
from backend.core.conf import settings
from backend.common.log import log as logger
from typing import Dict, Any, List, Union
from sqlalchemy.exc import IntegrityError
from asyncio import sleep
class Qwen:
# 创建一个类级别的线程池执行器
_executor = ThreadPoolExecutor(max_workers=10)
# API端点配置
RECOGNITION_URL = "https://dashscope.aliyuncs.com/api/v1/services/aigc/multimodal-generation/generation"
EMBEDDING_URL = "https://dashscope.aliyuncs.com/api/v1/services/embeddings/multimodal-embedding"
@staticmethod
def get_recognition_prompt(type: str, exclude_words: List[str] | None = None) -> str:
"""获取图像识别提示词"""
# 根据dict_level确定词汇级别
vocabulary_level = "elementary level"
specificity = "basic and common"
# if dict_level:
# if dict_level == "LEVEL1":
# vocabulary_level = "elementary level"
# specificity = "basic and common"
# elif dict_level == "LEVEL2":
# vocabulary_level = "junior high school level"
# specificity = "more specific and detailed"
# elif dict_level == "LEVEL3":
# vocabulary_level = "college English test level"
# specificity = "precise and technical"
# elif dict_level == "LEVEL4":
# vocabulary_level = "TOEFL/IELTS level"
# specificity = "highly specialized and academic"
if type == 'word':
prompt = (
# "Vision-to-English education module."
# "Analyze image. Output JSON: "
# "Output JSON: {LEVEL1: [{description: str, desc_ipa:str, ref_word: str, word_ipa: str}, ...], LEVEL2: {...}, LEVEL3: {...}}. "
# "Each level: 4 singular lowercase nouns(single-word only, no hyphens or compounds) with one 20-word description each."
# "And each description must have a corresponding International Phonetic Alphabet (IPA) transcription in the 'desc_ipa' field."
# "Vocabulary progression: basic and common → some details and specific → technical and academic. "
# "Ensure all ref_words are unique across levels - no repetition."
# "Focus: primary/central/artificial objects."
# v2:
# "Vision-to-English-Chinese education module. Analyze and describe the image in three levels: "
# "LEVEL1 (simple vocabulary and basic grammar, ~10 words),"
# "LEVEL2 (detailed and complex vocabulary, 15-20 words),"
# "LEVEL3 (professional, uncommon words and complex grammar, ≤25 words)."
# "For each level, provide 6-8 English sentences and Chinese translations."
# "Output JSON: {LEVEL1: {desc_en:[], desc_zh:[]}, LEVEL2: {}, LEVEL3: {}}."
# "Ensure all description are unique - no repetition."
# "Focus: primary/central/artificial objects."
# v3
"""
Vision-to-English-Chinese education module.
Core objective: Analyze the image based on its PRIMARY SCENE (e.g., office, restaurant, subway, kitchen) and CENTRAL OBJECTS, generate English-Chinese sentence pairs for three learning levels (matching primary/intermediate/advanced English learners), with sentences focused on PRACTICAL, REUSABLE communication (not just grammatical complexity).
// LEVEL Definition (Binding learning goals + functions + complexity)
level1 (Beginner):
- Learning goal: Recognize core vocabulary + use basic functional sentences (describe objects/scenes, simple requests)
- Vocab: High-frequency daily words (no uncommon words)
- Grammar: Simple present/past tense, basic SV/SVO structure
- Word count per sentence: ~10 words
- Sentence type: 6 unique functional types (describe object, simple request, state fact, point out location, express preference, ask simple question)
level2 (Intermediate):
- Learning goal: Master scene-specific collocations + practical communication sentences (daily/office interaction)
- Vocab: Scene-specific common words + fixed collocations (e.g., "print a document", "place an order")
- Grammar: Present continuous, modal verbs (can/could/would), simple clauses
- Word count per sentence: 15-20 words
- Sentence type: 7 unique functional types (detailed description, polite request, ask for information, suggest action, state need, confirm fact, express feeling)
level3 (Advanced):
- Learning goal: Use professional/scene-specific uncommon words + context-aware pragmatic expressions (with tone/formality differences)
- Vocab: Scene-specific professional words (e.g., "barista" for café, "agenda" for meeting) + advanced collocations
- Grammar: Complex clauses, passive voice, subjunctive mood (as appropriate to the scene)
- Word count per sentence: ≤25 words
- Sentence type: 8 unique functional types (detailed scene analysis, formal/informal contrast, conditional statement, explain purpose, ask follow-up questions, express suggestion, summarize information, clarify meaning)
// Output Requirements
1. JSON Structure (add core vocab/collocation for easy parsing):
{
"scene_tag": "xxx", // e.g., "office", "café", "supermarket" (primary scene of the image)
"level1": {
"desc_en": ["sentence1", "sentence2", ...], // 6 unique sentences (no repeated semantics/functions)
"desc_zh": ["translation1", "translation2", ...], // one-to-one with desc_en
"core_vocab": ["word1", "word2", ...], // 3-5 core high-frequency words from LEVEL1 sentences
"collocations": ["word1 + collocation1", ...] // 2-3 fixed collocations (e.g., "sit + on a chair")
},
"level2": {
"desc_en": ["sentence1", ...], // 7 unique sentences
"desc_zh": ["translation1", ...],
"core_vocab": ["word1", ...], // 4-6 scene-specific words
"collocations": ["collocation1", ...] // 3-4 scene-specific collocations
},
"level3": {
"desc_en": ["sentence1", ...], // 8 unique sentences
"desc_zh": ["translation1", ...],
"core_vocab": ["word1", ...], // 5-7 professional/uncommon words
"collocations": ["collocation1", ...], // 4-5 advanced collocations
"pragmatic_notes": ["note1", ...] // 1-2 notes on tone/formality (e.g., "Could you..." is politer than "Can you...")
}
}
2. Uniqueness: No repetition in SEMANTICS/FUNCTIONS (not just literal repetition) — e.g., avoid two sentences both meaning "This is a laptop" (even with different wording).
3. Focus: Prioritize ARTIFICIAL/CENTRAL objects and PRIMARY scene (ignore trivial background elements) — e.g., for a café image, focus on "coffee", "barista", "menu" (not "wall", "floor").
4. Practicality: All sentences must be directly usable in real-life communication (avoid meaningless grammatical exercises like "I am eat a apple" corrected to "I am eating an apple").
5. Accuracy: Translations must be accurate (not literal) and match the context of the image scene.
"""
)
if exclude_words:
exclude_str = ". ".join(exclude_words)
prompt += f"Avoid using these words: {exclude_str}."
return prompt
elif type == 'food':
return (
"你是一个专业美食识别AI请严格按以下步骤分析图片\n"
"1. 识别最显著菜品名称(需具体到品种/烹饪方式):\n"
"- 示例:清蒸鲈鱼(非清蒸鱼)、罗宋汤(非蔬菜汤)\n"
"- 无法确定具体菜品时返回“无法识别出菜品”\n"
"2. 提取核心食材3-5种主料\n"
"- 排除调味料(油/盐/酱油等)\n"
"- 混合菜(如沙拉/炒饭)列出可见食材\n"
"- 无法识别时写“未知”\n"
"3. 输出格式严格JSON, 如果有多个占据显著位置的菜品,可以将多个菜品罗列出来放到 json 数组中:\n"
"[{ dish_name: 具体菜品名1 | 无法识别出菜品, method: 烹饪方式, main_ingredients: [食材1, 食材2] },\n"
"{ dish_name: 具体菜品名2 | 无法识别出菜品, method: 烹饪方式, main_ingredients: [食材1, 食材2] }]"
)
elif type == 'scene':
return (
"""
# 角色
你是专注于英语教育的轻量级场景化句型分析助手仅输出JSON格式结果无多余解释/话术。
# 输入信息
场景标签scene_tag
英文句型sentence_en
中文翻译sentence_zh
# 输出要求
1. 功能标签生成2个标签主标签+子标签),主标签仅限「询问/请求/陈述/表达需求/建议/确认/表达感受/指出位置」,子标签需贴合场景和句型核心功能(如“索要物品”“点餐”“职场沟通”);
2. 场景说明50-80字简洁说明该句型的使用场景、语用价值如礼貌性/适配对象),语言通俗,适配英语进阶学习者;
3. 输出格式严格遵循以下JSON结构无换行/多余字符:
{
"functionTags": ["主标签", "子标签"],
"sceneExplanation": "场景说明文本"
}
# 约束
- 功能标签必须贴合「场景标签」+「句型内容」,不脱离场景;
- 场景说明不堆砌术语,聚焦“怎么用/什么时候用”,而非语法分析;
- 严格控制字符数功能标签仅2个场景说明50-80字。
# 示例参考
【输入】
场景标签café
英文句型Can I have a look at the menu?
中文翻译:我能看一下菜单吗?
【输出】
{"functionTags":["询问","索要物品"],"sceneExplanation":"该句型适用于咖啡厅/餐厅场景向服务人员礼貌索要菜单比直接说“Give me the menu”更得体适配所有餐饮消费场景的基础沟通。"}
"""
)
else:
return ""
@staticmethod
async def recognize_image(params: QwenRecognizeImageParams) -> Dict[str, Any]:
"""调用通义千问API识别图像内容支持词典等级参数"""
image_data = f"data:image/{params.format};base64,{params.data}"
return await Qwen._call_api(
api_type="recognition",
dict_level=params.dict_level,
image_id=params.image_id,
user_id=params.user_id,
input=[
{
"role": "user",
"content": [
{"image": image_data},
{"text": Qwen.get_recognition_prompt(params.type, params.exclude_words)}
]
}
]
)
@staticmethod
async def embed_image(params: QwenEmbedImageParams) -> Dict[str, Any]:
"""调用通义千问API获取图像嵌入向量"""
image_data = f"data:image/{params.format};base64,{params.data}"
return await Qwen._call_api(
api_type="embedding",
dict_level=params.dict_level,
image_id=params.image_id,
user_id=params.user_id,
input=[{'image': image_data}]
)
@staticmethod
async def _call_api(api_type: str, image_id: int, user_id: int, input: Union[Dict, List], dict_level: str | None = None) -> \
Dict[str, Any]:
"""通用API调用方法"""
model_name = ""
api_key = settings.QWEN_API_KEY
response = None
start_time = time.time()
start_at = datetime.now()
response_data = {}
status_code = 500
error_message = ""
try:
if api_type == "recognition":
model_name = settings.QWEN_VISION_MODEL
if isinstance(input, list):
# 使用线程池执行器来异步执行阻塞调用
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
Qwen._executor,
lambda: dashscope.MultiModalConversation.call(
api_key=api_key,
model=model_name,
messages=input,
)
)
else:
# 使用线程池执行器来异步执行阻塞调用
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
Qwen._executor,
lambda: dashscope.MultiModalConversation.call(
api_key=api_key,
model=model_name,
messages=[input],
)
)
elif api_type == "embedding":
model_name = settings.QWEN_VISION_EMBEDDING_MODEL
if isinstance(input, list):
# 使用线程池执行器来异步执行阻塞调用
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
Qwen._executor,
lambda: dashscope.MultiModalEmbedding.call(
api_key=api_key,
model=model_name,
input=input if isinstance(input, list) else [input],
)
)
else:
# 使用线程池执行器来异步执行阻塞调用
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
Qwen._executor,
lambda: dashscope.MultiModalEmbedding.call(
api_key=api_key,
model=model_name,
input=[input],
)
)
else:
raise errors.ForbiddenError(msg=f'Qwen 不支持类型[{api_type}]')
# Handle the DashScope response properly
if hasattr(response, 'status_code'):
status_code = response.status_code
else:
# If response doesn't have status_code, try to get it from other attributes
status_code = getattr(response, 'code', 500)
duration = time.time() - start_time
# Convert response to dict for further processing
if hasattr(response, 'output'):
response_data = {
"output": response.output,
"usage": getattr(response, 'usage', {}),
"code": getattr(response, 'code', None),
"message": getattr(response, 'message', None)
}
else:
# If response doesn't have output, convert it to dict
response_data = {}
if hasattr(response, '__dict__'):
response_data = response.__dict__
else:
response_data = {"output": {}, "message": str(response)}
if status_code == 200:
# 处理异步响应
if response_data.get("output", {}).get("task_status") == "PENDING":
return await Qwen._handle_async_response(api_type, model_name, input if isinstance(input, dict) else {"messages": input}, response_data, duration, dict_level or "")
audit_log = CreateAuditLogParam(
api_type=api_type,
model_name=model_name,
response_data=response_data,
request_data=None,
token_usage=response_data.get("usage", {}),
duration=duration,
status_code=status_code,
error_message=error_message,
called_at=start_at,
image_id=image_id,
user_id=user_id,
cost=0,
api_version=settings.FASTAPI_API_V1_PATH,
dict_level=dict_level,
)
# 记录审计日志
Qwen._audit_log(api_type, audit_log)
return Qwen._parse_response(api_type, response_data)
else:
error_message = f"API error: {getattr(response, 'message', 'Unknown error')}"
logger.error(f"{api_type} API error: {status_code} - {error_message}")
return {
"success": False,
"error": error_message,
"status_code": status_code
}
except Exception as e:
error_message = str(e)
logger.exception(f"{api_type} API exception: {error_message}")
return {
"success": False,
"error": error_message
}
finally:
# 确保所有调用都记录审计日志
if error_message:
Qwen._log_audit(
api_type=api_type,
dict_level=dict_level or "",
model_name=model_name,
request_data=input if isinstance(input, dict) else {"messages": input},
response_data={"error": error_message},
duration=time.time() - start_time,
status_code=status_code,
error_message=error_message,
image_id=image_id,
user_id=user_id,
called_at=start_at
)
@staticmethod
def _handle_async_response(api_type: str, model_name: str, request_data: dict,
initial_response: dict, initial_duration: float, dict_level: str) -> dict:
"""处理异步API响应"""
task_id = initial_response.get("output", {}).get("task_id")
if not task_id:
return {
"success": False,
"error": "Async task ID missing"
}
# 记录初始调用审计日志
Qwen._log_audit(
api_type=api_type,
dict_level=dict_level,
model_name=model_name,
request_data=request_data,
response_data=initial_response,
duration=initial_duration,
status_code=202
)
# 轮询任务状态
start_time = time.time()
headers = {
"Authorization": f"Bearer {settings.QWEN_API_KEY}",
"Content-Type": "application/json"
}
while True:
time.sleep(settings.ASYNC_POLL_INTERVAL)
try:
response = requests.get(
f"https://dashscope.aliyuncs.com/api/v1/tasks/{task_id}",
headers=headers
)
if response.status_code == 200:
task_data = response.json()
task_status = task_data.get("output", {}).get("task_status")
total_duration = time.time() - start_time + initial_duration
if task_status == "SUCCEEDED":
# 记录最终审计日志
Qwen._log_audit(
api_type=api_type,
dict_level=dict_level,
model_name=request_data.get("model", ""),
request_data=request_data,
response_data=task_data,
duration=total_duration,
status_code=200
)
return Qwen._parse_response(api_type, task_data)
elif task_status in ["FAILED", "CANCELED"]:
error_message = task_data.get("message", "Async task failed")
logger.error(f"Async task failed: {task_id} - {error_message}")
Qwen._log_audit(
api_type=api_type,
dict_level=dict_level,
model_name=request_data.get("model", ""),
request_data=request_data,
response_data=task_data,
duration=total_duration,
status_code=500,
error_message=error_message,
)
return {
"success": False,
"error": error_message
}
else:
error_message = f"Task status check failed: {response.text}"
logger.error(error_message)
return {
"success": False,
"error": error_message
}
except Exception as e:
error_message = str(e)
logger.exception(f"Async task polling error: {error_message}")
return {
"success": False,
"error": error_message
}
@staticmethod
def _parse_response(api_type: str, response_data: dict) -> dict:
"""解析API响应"""
if api_type == "recognition":
# 解析识别结果
result = response_data.get("output", {}).get("choices", [{}])[0].get("message", {}).get("content", "")
if isinstance(result, list):
result = " ".join([item.get("text", "") for item in result if item.get("text")])
return {
"success": True,
"result": result.strip().lower(),
"token_usage": response_data.get("usage", {})
}
elif api_type == "embedding":
# 解析嵌入向量
embeddings = response_data.get("output", {}).get("embeddings", [])
if embeddings and embeddings[0].get("embedding"):
embedding = embeddings[0]["embedding"]
return {
"success": True,
"embedding": embedding,
"token_usage": response_data.get("usage", {})
}
return {
"success": False,
"error": "No embedding found in response"
}
else:
return {}
@staticmethod
def _calculate_cost(api_type: str, token_usage: dict) -> float:
"""Calculate API call cost based on API type and token usage"""
cost = 0
if api_type == "recognition":
# Cost calculation: input_tokens * 0.0016 + output_tokens * 0.004
input_tokens = token_usage.get("input_tokens", 0)
output_tokens = token_usage.get("output_tokens", 0)
# cost = input_tokens * 0.0016 + output_tokens * 0.004
cost = input_tokens * 0.001 + output_tokens * 0.01
if cost > 0:
cost = cost / 1000
elif api_type == "embedding":
# 假设每张图片 $0.001
cost = 0.01
# Round to 6 decimal places to prevent data truncation issues
return round(cost, 6)
@staticmethod
def _audit_log(api_type: str, params: CreateAuditLogParam):
# Set cost using the shared calculation method
params.cost = Qwen._calculate_cost(api_type, params.token_usage or {})
try:
async def _create_audit_log():
try:
# Add retry logic for foreign key constraint violations
max_retries = 3
for attempt in range(max_retries):
try:
await audit_log_service.create(obj=params)
break # Success, exit retry loop
except IntegrityError as e:
if "foreign key constraint" in str(e).lower() and attempt < max_retries - 1:
# Wait before retry to allow transaction to commit
await sleep(0.1 * (2 ** attempt)) # Exponential backoff
continue
else:
# Log the error and continue without raising
logger.error(f"Failed to save audit log after {max_retries} attempts: {str(e)}")
break
except Exception as e:
logger.error(f"Unexpected error saving audit log: {str(e)}")
break
except Exception as e:
logger.error(f"Failed to save audit log: {str(e)}")
# Run in background without blocking
asyncio.create_task(_create_audit_log())
except Exception as e:
logger.error(f"Failed to save audit log: {str(e)}")
@staticmethod
def _log_audit(api_type: str, model_name: str, request_data: dict,
response_data: dict, duration: float, status_code: int,
error_message: str | None = None, dict_level: str | None = None,
image_id: int = 0, user_id: int = 0, called_at: datetime | None = None):
"""记录API调用审计日志"""
token_usage = response_data.get("usage", {})
cost = Qwen._calculate_cost(api_type, token_usage)
audit_log = CreateAuditLogParam(
api_type=api_type,
model_name=model_name,
request_data=request_data,
response_data=response_data,
token_usage=token_usage,
cost=cost,
duration=duration,
status_code=status_code,
error_message=error_message,
user_id=user_id,
image_id=image_id,
called_at=called_at or datetime.now(),
api_version=settings.FASTAPI_API_V1_PATH,
dict_level=dict_level,
)
try:
# Use background tasks for audit logging to avoid blocking
# This is the correct pattern used in other parts of the codebase
async def _create_audit_log():
try:
# Add retry logic for foreign key constraint violations
max_retries = 3
for attempt in range(max_retries):
try:
await audit_log_service.create(obj=audit_log)
break # Success, exit retry loop
except IntegrityError as e:
if "foreign key constraint" in str(e).lower() and attempt < max_retries - 1:
# Wait before retry to allow transaction to commit
await sleep(0.1 * (2 ** attempt)) # Exponential backoff
continue
else:
# Log the error and continue without raising
logger.error(f"Failed to save audit log after {max_retries} attempts: {str(e)}")
break
except Exception as e:
logger.error(f"Unexpected error saving audit log: {str(e)}")
break
except Exception as e:
logger.error(f"Failed to save audit log: {str(e)}")
# Run in background without blocking
asyncio.create_task(_create_audit_log())
except Exception as e:
logger.error(f"Failed to save audit log: {str(e)}")