This commit is contained in:
Felix
2025-12-21 16:17:26 +08:00
parent bc90d368f7
commit 841b2cd8b5
14 changed files with 339 additions and 51 deletions

View File

@@ -26,4 +26,4 @@ COPY deploy/app_server.conf /etc/supervisor/conf.d/
EXPOSE 80
CMD ["uvicorn", "backend.main:app", "--host", "0.0.0.0", "--port", "80"]
CMD ["/bin/sh","-lc","cd backend && alembic upgrade head && cd .. && uvicorn backend.main:app --host 0.0.0.0 --port 80"]

View File

@@ -0,0 +1,48 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Optional, List
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy_crud_plus import CRUDPlus
from backend.app.admin.model.points import PointsDebt
from backend.utils.timezone import timezone
class PointsDebtCRUD(CRUDPlus[PointsDebt]):
async def add_pending(self, db: AsyncSession, user_id: int, amount: int, related_id: Optional[int], details: Optional[dict]) -> PointsDebt:
debt = PointsDebt(user_id=user_id, amount=amount, amount_settled=0, status='pending', related_id=related_id, details=details)
db.add(debt)
await db.flush()
return debt
async def list_pending_by_user(self, db: AsyncSession, user_id: int) -> List[PointsDebt]:
stmt = select(self.model).where(self.model.user_id == user_id, self.model.status == 'pending').order_by(self.model.created_time.asc())
result = await db.execute(stmt)
return list(result.scalars().all())
async def settle_partial(self, db: AsyncSession, debt_id: int, settle_amount: int) -> int:
stmt = update(self.model).where(
self.model.id == debt_id,
self.model.status == 'pending',
(self.model.amount - self.model.amount_settled) >= settle_amount
).values(
amount_settled=self.model.amount_settled + settle_amount
)
result = await db.execute(stmt)
return result.rowcount
async def mark_settled(self, db: AsyncSession, debt_id: int) -> int:
stmt = update(self.model).where(
self.model.id == debt_id,
self.model.status == 'pending',
self.model.amount <= self.model.amount_settled
).values(
status='settled',
settled_time=timezone.now()
)
result = await db.execute(stmt)
return result.rowcount
points_debt_dao: PointsDebtCRUD = PointsDebtCRUD(PointsDebt)

View File

@@ -6,6 +6,7 @@ from sqlalchemy.dialects.mysql import JSON as MySQLJSON # Changed from postgres
from sqlalchemy.orm import Mapped, mapped_column
from backend.common.model import Base, id_key, snowflake_id_key
from backend.utils.timezone import timezone
class Points(Base):
@@ -78,3 +79,22 @@ class PointsConsumptionAlloc(Base):
Index('idx_points_alloc_spend', 'spend_log_id'),
{'comment': '积分消费分摊记录'}
)
class PointsDebt(Base):
__tablename__ = 'points_debt'
id: Mapped[snowflake_id_key] = mapped_column(BigInteger, init=False, primary_key=True)
user_id: Mapped[int] = mapped_column(BigInteger, ForeignKey('wx_user.id'), nullable=False, comment='用户ID')
amount: Mapped[int] = mapped_column(BigInteger, nullable=False, comment='欠费积分总额')
related_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True, comment='关联ID')
amount_settled: Mapped[int] = mapped_column(BigInteger, default=0, nullable=False, comment='已清偿积分')
status: Mapped[str] = mapped_column(String(16), default='pending', comment='状态pending/settled')
details: Mapped[dict | None] = mapped_column(MySQLJSON, nullable=True, default=None, comment='附加信息')
settled_time: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), default=None, nullable=True, comment='清偿时间')
__table_args__ = (
Index('idx_points_debt_user_status', 'user_id', 'status'),
Index('idx_points_debt_user_time', 'user_id', 'created_time'),
{'comment': '积分欠费记录'}
)

View File

@@ -4,6 +4,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from backend.app.admin.crud.points_crud import points_dao, points_log_dao
from backend.app.admin.crud.points_lot_crud import points_lot_dao
from backend.app.admin.crud.points_alloc_crud import points_alloc_dao
from backend.app.admin.crud.points_debt_crud import points_debt_dao
from backend.app.admin.model.points import PointsConsumptionAlloc
from backend.app.admin.model.points import Points, PointsLot
from backend.database.db import async_db_session
@@ -14,6 +15,8 @@ from backend.common.const import (
POINTS_ACTION_REFUND_UNFREEZE,
POINTS_ACTION_REFUND_DEDUCT,
POINTS_ACTION_COUPON,
POINTS_ACTION_DEBT_PENDING,
POINTS_ACTION_DEBT_SETTLED,
)
@@ -82,7 +85,7 @@ class PointsService:
"related_id": related_id,
"details": log_details
})
await PointsService._settle_pending_debts_with_db(db, user_id)
return True
@staticmethod
@@ -138,7 +141,7 @@ class PointsService:
"related_id": coupon_id,
"details": log_details
})
await PointsService._settle_pending_debts_with_db(db, user_id)
return True
@staticmethod
@@ -186,12 +189,14 @@ class PointsService:
if not points_account:
return False
available = max(0, (points_account.balance or 0) - (points_account.frozen_balance or 0))
if available < amount:
if available <= 0:
return False
# 仅扣除当前可用余额与请求金额中的较小值
deduct_amount = min(available, amount)
current_balance = points_account.balance
# 批次扣减
remaining = amount
# 批次扣减(按 FIFO
remaining = deduct_amount
alloc_details = []
if action == POINTS_ACTION_REFUND_DEDUCT and related_id is not None:
target_lot = await points_lot_dao.get_by_order(db, related_id)
@@ -213,10 +218,11 @@ class PointsService:
await points_lot_dao.deduct_from_lot(db, lot.id, take)
alloc_details.append({"lot_id": lot.id, "order_id": lot.order_id, "points": take})
remaining -= take
result = await points_dao.deduct_balance_atomic(db, user_id, amount)
# 扣减账户余额(仅扣已分摊的金额)
result = await points_dao.deduct_balance_atomic(db, user_id, deduct_amount)
if not result:
return False
new_balance = current_balance - amount
new_balance = current_balance - deduct_amount
# 记录积分变动日志
if action is None:
@@ -224,7 +230,7 @@ class PointsService:
spend_log = await points_log_dao.add_log(db, {
"user_id": user_id,
"action": action,
"amount": amount,
"amount": deduct_amount,
"balance_after": new_balance,
"related_id": related_id,
"details": details or {}
@@ -235,7 +241,18 @@ class PointsService:
for a in alloc_details:
alloc = PointsConsumptionAlloc(user_id=user_id, lot_id=a["lot_id"], spend_log_id=spend_log.id, points=a["points"])
await points_alloc_dao.add(db, alloc)
# 如果请求金额大于可扣金额,则为欠费部分创建待扣记录
if amount > deduct_amount:
debt_amount = amount - deduct_amount
await points_debt_dao.add_pending(db, user_id, debt_amount, related_id, details)
await points_log_dao.add_log(db, {
"user_id": user_id,
"action": POINTS_ACTION_DEBT_PENDING,
"amount": debt_amount,
"balance_after": new_balance,
"related_id": related_id,
"details": (details or {}) | {"partially_deducted": deduct_amount}
})
return True
@staticmethod
@@ -324,8 +341,61 @@ class PointsService:
"related_id": order_id,
"details": {"order_id": order_id, "amount_cents": amount_cents}
})
await PointsService._settle_pending_debts_with_db(db, user_id)
return True
@staticmethod
async def create_debt_pending_with_db(db: AsyncSession, user_id: int, amount: int, related_id: Optional[int], details: Optional[dict]) -> bool:
if amount <= 0:
return False
acct = await points_dao.get_by_user_id(db, user_id)
if not acct:
acct = await points_dao.create_user_points(db, user_id)
await points_debt_dao.add_pending(db, user_id, amount, related_id, details)
await points_log_dao.add_log(db, {
"user_id": user_id,
"action": POINTS_ACTION_DEBT_PENDING,
"amount": amount,
"balance_after": acct.balance,
"related_id": related_id,
"details": details or {}
})
return True
@staticmethod
async def _settle_pending_debts_with_db(db: AsyncSession, user_id: int) -> None:
acct = await points_dao.get_by_user_id(db, user_id)
if not acct:
return
available = max(0, (acct.balance or 0) - (acct.frozen_balance or 0))
if available <= 0:
return
debts = await points_debt_dao.list_pending_by_user(db, user_id)
for debt in debts:
remaining = max(0, debt.amount - debt.amount_settled)
if remaining <= 0:
await points_debt_dao.mark_settled(db, debt.id)
continue
if available <= 0:
break
settle = min(available, remaining)
ok = await PointsService.deduct_points_with_db(user_id, settle, db, related_id=debt.related_id, details={"debt_id": debt.id}, action=POINTS_ACTION_SPEND)
if not ok:
break
await points_debt_dao.settle_partial(db, debt.id, settle)
acct = await points_dao.get_by_user_id(db, user_id)
await points_log_dao.add_log(db, {
"user_id": user_id,
"action": POINTS_ACTION_DEBT_SETTLED,
"amount": settle,
"balance_after": acct.balance,
"related_id": debt.id,
"details": {"related_id": debt.related_id}
})
available = max(0, (acct.balance or 0) - (acct.frozen_balance or 0))
if debt.amount <= debt.amount_settled + settle:
await points_debt_dao.mark_settled(db, debt.id)
@staticmethod
async def freeze_points_for_order(db: AsyncSession, user_id: int, order_id: int, points_to_freeze: int) -> bool:
if points_to_freeze <= 0:

View File

@@ -18,10 +18,10 @@ class ProductService:
@staticmethod
async def init_products(items: List[dict] | None = None) -> int:
default = [
{"title": "+1000%", "description": "仅限一次", "points": 100, "amount_cents": 100, "one_time": True},
{"title": "+100%", "description": "充值100积分", "points": 20, "amount_cents": 100, "one_time": False},
{"title": "+200%", "description": "充值500积分", "points": 150, "amount_cents": 500, "one_time": False},
{"title": "+500%", "description": "充值500积分", "points": 500, "amount_cents": 1000, "one_time": False},
{"title": "+400%", "description": "仅限一次", "points": 100, "amount_cents": 100, "one_time": True},
{"title": "+50%", "description": "加赠50积分", "points": 150, "amount_cents": 500, "one_time": False},
{"title": "+100%", "description": "加赠200积分", "points": 400, "amount_cents": 1000, "one_time": False},
{"title": "+200%", "description": "加赠800积分", "points": 1200, "amount_cents": 2000, "one_time": False},
# {"title": "100积分", "description": "测试100积分", "points": 100, "amount_cents": 1, "one_time": False},
]
payload = items or default

View File

@@ -220,5 +220,16 @@ class ImageTaskCRUD(CRUDPlus[ImageProcessingTask]):
await db.rollback()
raise e
async def get_latest_active_task(self, db: AsyncSession, user_id: int, image_id: int, ref_type: str) -> ImageProcessingTask | None:
stmt = select(self.model).where(
and_(
self.model.user_id == user_id,
self.model.image_id == image_id,
self.model.ref_type == ref_type,
self.model.status.in_([ImageTaskStatus.PENDING, ImageTaskStatus.PROCESSING]),
)
).order_by(self.model.created_time.desc()).limit(1)
result = await db.execute(stmt)
return result.scalar_one_or_none()
image_task_dao: ImageTaskCRUD = ImageTaskCRUD(ImageProcessingTask)

View File

@@ -199,7 +199,7 @@ class ImageService:
# 检查用户积分是否足够(现在积分没有过期概念)
if not await points_service.check_sufficient_points(current_user.id, IMAGE_RECOGNITION_COST):
raise errors.ForbiddenError(
msg=f'积分不足,请充值以继续使用'
msg=f'积分不足,请获取积分后继续使用'
)
# 尝试获取任务槽位
@@ -338,7 +338,7 @@ class ImageService:
image = await image_dao.get(db, task.image_id)
if image:
total_tokens = task.result.get("token_usage", {}).get("total_tokens", 0)
points = math.ceil(max(total_tokens, 1)/1000)*0.15 * IMAGE_RECOGNITION_COST
points = math.ceil(max(total_tokens, 1)/1000) * IMAGE_RECOGNITION_COST
points_deducted = await points_service.deduct_points_with_db(
user_id=task.user_id,
amount=math.ceil(points),
@@ -348,8 +348,7 @@ class ImageService:
action=POINTS_ACTION_IMAGE_RECOGNITION
)
if not points_deducted:
logger.error(f"Failed to deduct points for user {task.user_id} for task {task_id}")
raise Exception("Failed to deduct points")
logger.warning(f"Insufficient points for user {task.user_id}: balance is zero, cannot deduct for task {task_id}")
# Step 5: Update task status to completed
await ImageService._update_task_status_with_db(task_id, ImageTaskStatus.COMPLETED, db)

View File

@@ -372,7 +372,7 @@ class RecordingService:
# 检查用户积分是否足够(现在积分没有过期概念)
if not await points_service.check_sufficient_points(user_id, SPEECH_ASSESSMENT_COST):
raise RuntimeError('积分不足,请充值以继续使用')
raise RuntimeError('积分不足,请获取积分后继续使用')
try:
# 调用腾讯云SOE API进行语音评估
@@ -447,7 +447,7 @@ class RecordingService:
except Exception as e:
raise RuntimeError(f"Failed to create recording record for file_id {file_id}: {str(e)}")
if not await points_service.check_sufficient_points(user_id, SPEECH_ASSESSMENT_COST):
raise RuntimeError('积分不足,请充值以继续使用')
raise RuntimeError('积分不足,请获取积分后继续使用')
try:
result = await self.tencent_cloud.assessment_speech(file_id, ref_text, str(recording.id), image_id, user_id)
details = {"assessment": result}

View File

@@ -11,7 +11,7 @@ from backend.app.admin.schema.wx import DictLevel
class SceneSentenceService:
@staticmethod
async def create_session_with_items(image_id: int, user_id: int, image_chat_id: Optional[int], scene_tag: Optional[List[str]] = None, items: List[Dict[str, Any]] = [], called_at: datetime = datetime.now()) -> Dict[str, Any]:
async def create_sentence_with_items(image_id: int, user_id: int, image_chat_id: Optional[int], scene_tag: Optional[List[str]] = None, items: List[Dict[str, Any]] = [], called_at: datetime = datetime.now()) -> Dict[str, Any]:
async with async_db_session.begin() as db:
last_seq = 0
if image_chat_id:

View File

@@ -3,6 +3,7 @@
from typing import Optional, List, Dict, Any
from datetime import datetime
import json
import math
from backend.app.ai.crud.sentence_card_crud import sentence_card_dao
from backend.app.ai.crud.scene_sentence_crud import scene_sentence_dao, scene_sentence_item_dao
@@ -21,7 +22,7 @@ from backend.app.ai.crud.image_task_crud import image_task_dao
from backend.app.ai.schema.image_task import CreateImageTaskParam
from backend.app.admin.service.points_service import points_service
from backend.app.ai.service.rate_limit_service import rate_limit_service
from backend.common.const import SENTENCE_CARD_COST, SENTENCE_TYPE_SCENE_SENTENCE, SENTENCE_TYPE_SCENE_DIALOGUE, SENTENCE_TYPE_SCENE_EXERCISE
from backend.common.const import SENTENCE_CARD_COST, SENTENCE_TYPE_SCENE_SENTENCE, SENTENCE_TYPE_SCENE_DIALOGUE, SENTENCE_TYPE_SCENE_EXERCISE, LLM_CHAT_COST
class SentenceService:
@@ -46,7 +47,7 @@ class SentenceService:
"1. 内容约束:基于基础句型扩展功能标签、场景说明,每句补充「发音提示(重音/连读)」等输出结构中要求的内容,需符合现实生活和真实世界的习惯。\n"
"2. 格式约束严格按照下方JSON结构输出无额外解释确保字段完整、值为数组/字符串类型。\n"
"3. 语言约束所有英文内容符合日常沟通表达无语法错误中文翻译精准场景说明简洁易懂≤50字\n"
"4. 严格按照JSON结构输出无额外解释确保字段完整、值为数组/字符串类型,输出的 json 结构是:\n"
"4. 严格按照JSON结构输出无额外解释确保字段完整、值为数组/字符串类型,输出的 JSON 结构是:\n"
)
struct = (
"""
@@ -147,23 +148,18 @@ class SentenceService:
payload["user_level"] = "intermediate"
prompt = SentenceService._compose_prompt(payload, SENTENCE_TYPE_SCENE_SENTENCE)
start_at = datetime.now()
res = await Hunyuan.chat(
messages=[{"role": "user", "content": prompt}],
image_id=image_id,
user_id=user_id,
system_prompt=None,
chat_type=SENTENCE_TYPE_SCENE_SENTENCE
)
res = await SentenceService._call_scene_llm(prompt, image_id, user_id, SENTENCE_TYPE_SCENE_SENTENCE)
if not res.get("success"):
return None
image_chat_id = res.get("image_chat_id")
token_usage = res.get("token_usage") or {}
parsed = {}
try:
parsed = json.loads(res.get("result")) if isinstance(res.get("result"), str) else res.get("result", {})
except Exception:
parsed = {}
# print(parsed)
print(parsed)
items = []
sc = parsed.get("sentence") or {}
for idx, d in enumerate(sc.get("list", []), start=1):
@@ -195,7 +191,7 @@ class SentenceService:
"speech_rate_tip": d.get("speech_rate_tip") or d.get("speechRateTip"),
"personalized_tips": d.get("personalized_tips") or d.get("personalizedTips"),
})
return await scene_sentence_service.create_session_with_items(
_ret = await scene_sentence_service.create_sentence_with_items(
image_id=image_id,
user_id=user_id,
image_chat_id=image_chat_id,
@@ -203,6 +199,9 @@ class SentenceService:
items=items,
called_at=start_at
)
if isinstance(_ret, dict):
_ret["token_usage"] = token_usage
return _ret
@staticmethod
async def generate_scene_dialogue(image_id: int, user_id: int, scene_tag: str, desc_en: List[str], desc_zh: List[str], core_vocab: List[str], collocations: List[str]) -> List[SentenceCard]:
@@ -261,6 +260,36 @@ class SentenceService:
created.append(card)
return created
@staticmethod
async def _call_scene_llm(prompt: str, image_id: int, user_id: int, chat_type: str) -> Dict[str, Any]:
model_type = (settings.LLM_MODEL_TYPE or "").lower()
if model_type == "qwen":
try:
qres = await Qwen.chat(
messages=[{"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": prompt}],
image_id=image_id,
user_id=user_id
)
if qres and qres.get("success"):
return {"success": True, "result": qres.get("result"), "image_chat_id": None, "token_usage": qres.get("token_usage") or {}}
except Exception:
pass
return {"success": False, "error": "LLM call failed"}
else:
try:
res = await Hunyuan.chat(
messages=[{"role": "user", "content": prompt}],
image_id=image_id,
user_id=user_id,
system_prompt=None,
chat_type=chat_type
)
if res and res.get("success"):
return res
except Exception:
pass
return {"success": False, "error": "LLM call failed"}
@staticmethod
async def generate_sentence_exercise_card(image_id: int, user_id: int, scene_tag: str, desc_en: List[str], desc_zh: List[str], core_vocab: List[str], collocations: List[str]) -> List[SentenceCard]:
start_at = datetime.now()
@@ -297,19 +326,27 @@ class SentenceService:
@staticmethod
async def create_scene_task(image_id: int, user_id: int, scene_type: str) -> dict:
from backend.common.exception import errors
if not await points_service.check_sufficient_points(user_id, SENTENCE_CARD_COST):
raise errors.ForbiddenError(msg='积分不足,请充值以继续使用')
slot_acquired = await rate_limit_service.acquire_task_slot(user_id)
if not slot_acquired:
max_tasks = await rate_limit_service.get_user_task_limit(user_id)
raise errors.ForbiddenError(msg=f'用户同时最多只能运行 {max_tasks} 个任务,请等待现有任务完成后再试')
if scene_type not in (SENTENCE_TYPE_SCENE_DIALOGUE, SENTENCE_TYPE_SCENE_EXERCISE, SENTENCE_TYPE_SCENE_SENTENCE):
raise errors.BadRequestError(msg=f'不支持的任务类型 {scene_type}')
async with async_db_session.begin() as db:
image = await image_dao.get(db, image_id)
if not image:
raise errors.NotFoundError(msg='图片不存在')
latest_task = await image_task_dao.get_latest_active_task(db, user_id, image_id, scene_type)
if latest_task:
await db.commit()
return {"task_id": str(latest_task.id), "status": latest_task.status}
if not await points_service.check_sufficient_points(user_id, SENTENCE_CARD_COST):
raise errors.ForbiddenError(msg='积分不足,请获取积分后继续使用')
slot_acquired = await rate_limit_service.acquire_task_slot(user_id)
if not slot_acquired:
max_tasks = await rate_limit_service.get_user_task_limit(user_id)
raise errors.ForbiddenError(msg=f'用户同时最多只能运行 {max_tasks} 个任务,请等待现有任务完成后再试')
dict_level = DictLevel.LEVEL2.value
async with async_db_session.begin() as db:
image = await image_dao.get(db, image_id)
if not image:
raise errors.NotFoundError(msg='图片不存在')
task_params = CreateImageTaskParam(
image_id=image_id,
user_id=user_id,
@@ -346,9 +383,9 @@ class SentenceService:
core_vocab = level2.get("core_vocab", [])
collocations = level2.get("collocations", [])
if task.ref_type == SENTENCE_TYPE_SCENE_DIALOGUE:
cards = await SentenceService.generate_scene_dialogue(task.image_id, task.user_id, scene_tag, desc_en, desc_zh, core_vocab, collocations)
await SentenceService.generate_scene_dialogue(task.image_id, task.user_id, scene_tag, desc_en, desc_zh, core_vocab, collocations)
elif task.ref_type == SENTENCE_TYPE_SCENE_EXERCISE:
cards = await SentenceService.generate_sentence_exercise_card(task.image_id, task.user_id, scene_tag, desc_en, desc_zh, core_vocab, collocations)
await SentenceService.generate_sentence_exercise_card(task.image_id, task.user_id, scene_tag, desc_en, desc_zh, core_vocab, collocations)
elif task.ref_type == SENTENCE_TYPE_SCENE_SENTENCE:
payload = {
"description": description,
@@ -359,24 +396,31 @@ class SentenceService:
"collocations": collocations,
"user_level": "intermediate",
}
cards = await SentenceService.generate_scene_sentence(task.image_id, task.user_id, payload)
result = await SentenceService.generate_scene_sentence(task.image_id, task.user_id, payload)
else:
raise Exception(f"Unsupported card type: {task.ref_type}")
async with background_db_session() as db:
await db.begin()
image = await image_dao.get(db, task.image_id)
total_tokens = 0
if isinstance(result, dict):
total_tokens = int((result.get("token_usage") or {}).get("total_tokens") or 0)
deduct_amount = SENTENCE_CARD_COST
if total_tokens > 0:
units = math.ceil(max(total_tokens, 1) / 1000)
deduct_amount = units * LLM_CHAT_COST
points_deducted = await points_service.deduct_points_with_db(
user_id=task.user_id,
amount=SENTENCE_CARD_COST,
amount=deduct_amount,
db=db,
related_id=image.id if image else None,
details={"task_id": task_id, "sentence_type": task.ref_type},
details={"task_id": task_id, "sentence_type": task.ref_type, "token_usage": total_tokens},
action=task.ref_type
)
if not points_deducted:
raise Exception("Failed to deduct points")
from backend.app.ai.tasks import update_task_status_with_retry
await update_task_status_with_retry(db, task_id, ImageTaskStatus.COMPLETED, result={"count": len(cards)})
await update_task_status_with_retry(db, task_id, ImageTaskStatus.COMPLETED, result=result)
await db.commit()
task_processing_success = True
except Exception as e:

View File

@@ -1,12 +1,12 @@
"""Constants used throughout the application."""
# Image recognition service cost in points
IMAGE_RECOGNITION_COST = 10 # 0.0015/1000 * 10
# Speech assessment service cost in points
IMAGE_RECOGNITION_COST = 2 # 0.0015/1000 * 1
SPEECH_ASSESSMENT_COST = 1
SENTENCE_CARD_COST = 2
LLM_CHAT_COST = 1
QWEN_TOKEN_COST = 0.002
# Points action types
POINTS_ACTION_SYSTEM_GIFT = "system_gift"
POINTS_ACTION_IMAGE_RECOGNITION = "image_recognition"
@@ -17,6 +17,8 @@ POINTS_ACTION_SPEND = "spend"
POINTS_ACTION_REFUND_FREEZE = "refund_freeze"
POINTS_ACTION_REFUND_UNFREEZE = "refund_unfreeze"
POINTS_ACTION_REFUND_DEDUCT = "refund_deduct"
POINTS_ACTION_DEBT_PENDING = "debt_pending"
POINTS_ACTION_DEBT_SETTLED = "debt_settled"
API_TYPE_RECOGNITION = 'recognition'

View File

@@ -53,6 +53,8 @@ class Settings(BaseSettings):
QWEN_API_KEY: str
QWEN_VISION_MODEL: str
QWEN_VISION_EMBEDDING_MODEL: str
QWEN_TEXT_MODEL: str
LLM_MODEL_TYPE: str
API_TIMEOUT: int = 600
ASYNC_POLL_INTERVAL: int = 1
ASYNC_MODE: str = "enable"

View File

@@ -15,7 +15,9 @@ from backend.app.admin.service.audit_log_service import audit_log_service
from backend.common.exception import errors
from backend.core.conf import settings
from backend.common.log import log as logger
from typing import Dict, Any, List, Union
from typing import Dict, Any, List, Union, Optional
from math import ceil
from backend.common.const import QWEN_TOKEN_COST
from sqlalchemy.exc import IntegrityError
from asyncio import sleep
@@ -28,6 +30,92 @@ class Qwen:
RECOGNITION_URL = "https://dashscope.aliyuncs.com/api/v1/services/aigc/multimodal-generation/generation"
EMBEDDING_URL = "https://dashscope.aliyuncs.com/api/v1/services/embeddings/multimodal-embedding"
@staticmethod
async def chat(messages: List[Dict[str, str]], image_id: int = 0, user_id: int = 0) -> Dict[str, Any]:
api_key = settings.QWEN_API_KEY
model_name = settings.QWEN_TEXT_MODEL
start_time = time.time()
start_at = datetime.now()
error_message = ""
status_code = 500
response_data: Dict[str, Any] = {}
try:
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
Qwen._executor,
lambda: dashscope.Generation.call(
api_key=api_key,
model=model_name,
messages=messages,
result_format="message",
)
)
status_code = getattr(response, 'status_code', getattr(response, 'code', 500))
response_data = {
"output": getattr(response, 'output', None),
"usage": getattr(response, 'usage', {}),
"code": getattr(response, 'code', None),
"message": getattr(response, 'message', None)
}
duration = time.time() - start_time
audit_log = CreateAuditLogParam(
api_type="chat",
model_name=model_name,
response_data=response_data,
request_data={"messages": messages},
token_usage=response_data.get("usage", {}),
duration=duration,
status_code=status_code,
error_message=None,
called_at=start_at,
image_id=image_id,
user_id=user_id,
cost=0,
api_version=settings.FASTAPI_API_V1_PATH,
dict_level=None,
)
Qwen._audit_log("chat", audit_log)
if status_code == 200:
content = ""
try:
content = response.output.choices[0].message.content
except Exception:
pass
return {
"success": True,
"result": content,
"token_usage": response_data.get("usage", {})
}
else:
error_message = response_data.get("message") or "API error"
logger.error(f"chat API error: {status_code} - {error_message}")
return {
"success": False,
"error": error_message,
"status_code": status_code
}
except Exception as e:
error_message = str(e)
logger.exception(f"chat API exception: {error_message}")
return {
"success": False,
"error": error_message
}
finally:
if error_message:
Qwen._log_audit(
api_type="chat",
dict_level=None,
model_name=model_name,
request_data={"messages": messages},
response_data={"error": error_message},
duration=time.time() - start_time,
status_code=status_code,
error_message=error_message,
image_id=image_id,
user_id=user_id,
called_at=start_at
)
@staticmethod
def get_recognition_prompt(type: str, exclude_words: List[str] | None = None) -> str:
"""获取图像识别提示词"""
@@ -511,6 +599,10 @@ level2 (Intermediate):
elif api_type == "embedding":
# 假设每张图片 $0.001
cost = 0.01
elif api_type == "chat":
total_tokens = int(token_usage.get("total_tokens") or 0)
units = ceil(total_tokens / 1000) if total_tokens > 0 else 0
cost = units * QWEN_TOKEN_COST
# Round to 6 decimal places to prevent data truncation issues
return round(cost, 6)

View File

@@ -65,7 +65,7 @@ class TestImageServiceRateLimit:
)
# Verify the error message
assert "积分不足,请充值以继续使用" in str(exc_info.value.msg)
assert "积分不足,请获取积分后继续使用" in str(exc_info.value.msg)
# Verify that check_sufficient_points was called with correct parameters
mock_check_sufficient_points.assert_awaited_once_with(