diff --git a/Dockerfile b/Dockerfile index 1793473..4c1f2f4 100755 --- a/Dockerfile +++ b/Dockerfile @@ -26,4 +26,4 @@ COPY deploy/app_server.conf /etc/supervisor/conf.d/ EXPOSE 80 -CMD ["uvicorn", "backend.main:app", "--host", "0.0.0.0", "--port", "80"] +CMD ["/bin/sh","-lc","cd backend && alembic upgrade head && cd .. && uvicorn backend.main:app --host 0.0.0.0 --port 80"] diff --git a/backend/app/admin/crud/points_debt_crud.py b/backend/app/admin/crud/points_debt_crud.py new file mode 100644 index 0000000..c3366ab --- /dev/null +++ b/backend/app/admin/crud/points_debt_crud.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +from typing import Optional, List +from sqlalchemy import select, update +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy_crud_plus import CRUDPlus + +from backend.app.admin.model.points import PointsDebt +from backend.utils.timezone import timezone + + +class PointsDebtCRUD(CRUDPlus[PointsDebt]): + async def add_pending(self, db: AsyncSession, user_id: int, amount: int, related_id: Optional[int], details: Optional[dict]) -> PointsDebt: + debt = PointsDebt(user_id=user_id, amount=amount, amount_settled=0, status='pending', related_id=related_id, details=details) + db.add(debt) + await db.flush() + return debt + + async def list_pending_by_user(self, db: AsyncSession, user_id: int) -> List[PointsDebt]: + stmt = select(self.model).where(self.model.user_id == user_id, self.model.status == 'pending').order_by(self.model.created_time.asc()) + result = await db.execute(stmt) + return list(result.scalars().all()) + + async def settle_partial(self, db: AsyncSession, debt_id: int, settle_amount: int) -> int: + stmt = update(self.model).where( + self.model.id == debt_id, + self.model.status == 'pending', + (self.model.amount - self.model.amount_settled) >= settle_amount + ).values( + amount_settled=self.model.amount_settled + settle_amount + ) + result = await db.execute(stmt) + return result.rowcount + + async def mark_settled(self, db: AsyncSession, debt_id: int) -> int: + stmt = update(self.model).where( + self.model.id == debt_id, + self.model.status == 'pending', + self.model.amount <= self.model.amount_settled + ).values( + status='settled', + settled_time=timezone.now() + ) + result = await db.execute(stmt) + return result.rowcount + + +points_debt_dao: PointsDebtCRUD = PointsDebtCRUD(PointsDebt) diff --git a/backend/app/admin/model/points.py b/backend/app/admin/model/points.py index 9557f5f..1f6c1e2 100644 --- a/backend/app/admin/model/points.py +++ b/backend/app/admin/model/points.py @@ -6,6 +6,7 @@ from sqlalchemy.dialects.mysql import JSON as MySQLJSON # Changed from postgres from sqlalchemy.orm import Mapped, mapped_column from backend.common.model import Base, id_key, snowflake_id_key +from backend.utils.timezone import timezone class Points(Base): @@ -78,3 +79,22 @@ class PointsConsumptionAlloc(Base): Index('idx_points_alloc_spend', 'spend_log_id'), {'comment': '积分消费分摊记录'} ) + + +class PointsDebt(Base): + __tablename__ = 'points_debt' + + id: Mapped[snowflake_id_key] = mapped_column(BigInteger, init=False, primary_key=True) + user_id: Mapped[int] = mapped_column(BigInteger, ForeignKey('wx_user.id'), nullable=False, comment='用户ID') + amount: Mapped[int] = mapped_column(BigInteger, nullable=False, comment='欠费积分总额') + related_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True, comment='关联ID') + amount_settled: Mapped[int] = mapped_column(BigInteger, default=0, nullable=False, comment='已清偿积分') + status: Mapped[str] = mapped_column(String(16), default='pending', comment='状态:pending/settled') + details: Mapped[dict | None] = mapped_column(MySQLJSON, nullable=True, default=None, comment='附加信息') + settled_time: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), default=None, nullable=True, comment='清偿时间') + + __table_args__ = ( + Index('idx_points_debt_user_status', 'user_id', 'status'), + Index('idx_points_debt_user_time', 'user_id', 'created_time'), + {'comment': '积分欠费记录'} + ) diff --git a/backend/app/admin/service/points_service.py b/backend/app/admin/service/points_service.py index 8a2b1be..41e2b9c 100644 --- a/backend/app/admin/service/points_service.py +++ b/backend/app/admin/service/points_service.py @@ -4,6 +4,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from backend.app.admin.crud.points_crud import points_dao, points_log_dao from backend.app.admin.crud.points_lot_crud import points_lot_dao from backend.app.admin.crud.points_alloc_crud import points_alloc_dao +from backend.app.admin.crud.points_debt_crud import points_debt_dao from backend.app.admin.model.points import PointsConsumptionAlloc from backend.app.admin.model.points import Points, PointsLot from backend.database.db import async_db_session @@ -14,6 +15,8 @@ from backend.common.const import ( POINTS_ACTION_REFUND_UNFREEZE, POINTS_ACTION_REFUND_DEDUCT, POINTS_ACTION_COUPON, + POINTS_ACTION_DEBT_PENDING, + POINTS_ACTION_DEBT_SETTLED, ) @@ -82,7 +85,7 @@ class PointsService: "related_id": related_id, "details": log_details }) - + await PointsService._settle_pending_debts_with_db(db, user_id) return True @staticmethod @@ -138,7 +141,7 @@ class PointsService: "related_id": coupon_id, "details": log_details }) - + await PointsService._settle_pending_debts_with_db(db, user_id) return True @staticmethod @@ -186,12 +189,14 @@ class PointsService: if not points_account: return False available = max(0, (points_account.balance or 0) - (points_account.frozen_balance or 0)) - if available < amount: + if available <= 0: return False + # 仅扣除当前可用余额与请求金额中的较小值 + deduct_amount = min(available, amount) current_balance = points_account.balance - # 批次扣减 - remaining = amount + # 批次扣减(按 FIFO) + remaining = deduct_amount alloc_details = [] if action == POINTS_ACTION_REFUND_DEDUCT and related_id is not None: target_lot = await points_lot_dao.get_by_order(db, related_id) @@ -213,10 +218,11 @@ class PointsService: await points_lot_dao.deduct_from_lot(db, lot.id, take) alloc_details.append({"lot_id": lot.id, "order_id": lot.order_id, "points": take}) remaining -= take - result = await points_dao.deduct_balance_atomic(db, user_id, amount) + # 扣减账户余额(仅扣已分摊的金额) + result = await points_dao.deduct_balance_atomic(db, user_id, deduct_amount) if not result: return False - new_balance = current_balance - amount + new_balance = current_balance - deduct_amount # 记录积分变动日志 if action is None: @@ -224,7 +230,7 @@ class PointsService: spend_log = await points_log_dao.add_log(db, { "user_id": user_id, "action": action, - "amount": amount, + "amount": deduct_amount, "balance_after": new_balance, "related_id": related_id, "details": details or {} @@ -235,7 +241,18 @@ class PointsService: for a in alloc_details: alloc = PointsConsumptionAlloc(user_id=user_id, lot_id=a["lot_id"], spend_log_id=spend_log.id, points=a["points"]) await points_alloc_dao.add(db, alloc) - + # 如果请求金额大于可扣金额,则为欠费部分创建待扣记录 + if amount > deduct_amount: + debt_amount = amount - deduct_amount + await points_debt_dao.add_pending(db, user_id, debt_amount, related_id, details) + await points_log_dao.add_log(db, { + "user_id": user_id, + "action": POINTS_ACTION_DEBT_PENDING, + "amount": debt_amount, + "balance_after": new_balance, + "related_id": related_id, + "details": (details or {}) | {"partially_deducted": deduct_amount} + }) return True @staticmethod @@ -324,8 +341,61 @@ class PointsService: "related_id": order_id, "details": {"order_id": order_id, "amount_cents": amount_cents} }) + await PointsService._settle_pending_debts_with_db(db, user_id) return True + @staticmethod + async def create_debt_pending_with_db(db: AsyncSession, user_id: int, amount: int, related_id: Optional[int], details: Optional[dict]) -> bool: + if amount <= 0: + return False + acct = await points_dao.get_by_user_id(db, user_id) + if not acct: + acct = await points_dao.create_user_points(db, user_id) + await points_debt_dao.add_pending(db, user_id, amount, related_id, details) + await points_log_dao.add_log(db, { + "user_id": user_id, + "action": POINTS_ACTION_DEBT_PENDING, + "amount": amount, + "balance_after": acct.balance, + "related_id": related_id, + "details": details or {} + }) + return True + + @staticmethod + async def _settle_pending_debts_with_db(db: AsyncSession, user_id: int) -> None: + acct = await points_dao.get_by_user_id(db, user_id) + if not acct: + return + available = max(0, (acct.balance or 0) - (acct.frozen_balance or 0)) + if available <= 0: + return + debts = await points_debt_dao.list_pending_by_user(db, user_id) + for debt in debts: + remaining = max(0, debt.amount - debt.amount_settled) + if remaining <= 0: + await points_debt_dao.mark_settled(db, debt.id) + continue + if available <= 0: + break + settle = min(available, remaining) + ok = await PointsService.deduct_points_with_db(user_id, settle, db, related_id=debt.related_id, details={"debt_id": debt.id}, action=POINTS_ACTION_SPEND) + if not ok: + break + await points_debt_dao.settle_partial(db, debt.id, settle) + acct = await points_dao.get_by_user_id(db, user_id) + await points_log_dao.add_log(db, { + "user_id": user_id, + "action": POINTS_ACTION_DEBT_SETTLED, + "amount": settle, + "balance_after": acct.balance, + "related_id": debt.id, + "details": {"related_id": debt.related_id} + }) + available = max(0, (acct.balance or 0) - (acct.frozen_balance or 0)) + if debt.amount <= debt.amount_settled + settle: + await points_debt_dao.mark_settled(db, debt.id) + @staticmethod async def freeze_points_for_order(db: AsyncSession, user_id: int, order_id: int, points_to_freeze: int) -> bool: if points_to_freeze <= 0: diff --git a/backend/app/admin/service/product_service.py b/backend/app/admin/service/product_service.py index 4a42fe3..09d3707 100644 --- a/backend/app/admin/service/product_service.py +++ b/backend/app/admin/service/product_service.py @@ -18,10 +18,10 @@ class ProductService: @staticmethod async def init_products(items: List[dict] | None = None) -> int: default = [ - {"title": "+1000%", "description": "仅限一次", "points": 100, "amount_cents": 100, "one_time": True}, - {"title": "+100%", "description": "充值100积分", "points": 20, "amount_cents": 100, "one_time": False}, - {"title": "+200%", "description": "充值500积分", "points": 150, "amount_cents": 500, "one_time": False}, - {"title": "+500%", "description": "充值500积分", "points": 500, "amount_cents": 1000, "one_time": False}, + {"title": "+400%", "description": "仅限一次", "points": 100, "amount_cents": 100, "one_time": True}, + {"title": "+50%", "description": "加赠50积分", "points": 150, "amount_cents": 500, "one_time": False}, + {"title": "+100%", "description": "加赠200积分", "points": 400, "amount_cents": 1000, "one_time": False}, + {"title": "+200%", "description": "加赠800积分", "points": 1200, "amount_cents": 2000, "one_time": False}, # {"title": "100积分", "description": "测试100积分", "points": 100, "amount_cents": 1, "one_time": False}, ] payload = items or default diff --git a/backend/app/ai/crud/image_task_crud.py b/backend/app/ai/crud/image_task_crud.py index 8a51ebc..be0967d 100644 --- a/backend/app/ai/crud/image_task_crud.py +++ b/backend/app/ai/crud/image_task_crud.py @@ -220,5 +220,16 @@ class ImageTaskCRUD(CRUDPlus[ImageProcessingTask]): await db.rollback() raise e + async def get_latest_active_task(self, db: AsyncSession, user_id: int, image_id: int, ref_type: str) -> ImageProcessingTask | None: + stmt = select(self.model).where( + and_( + self.model.user_id == user_id, + self.model.image_id == image_id, + self.model.ref_type == ref_type, + self.model.status.in_([ImageTaskStatus.PENDING, ImageTaskStatus.PROCESSING]), + ) + ).order_by(self.model.created_time.desc()).limit(1) + result = await db.execute(stmt) + return result.scalar_one_or_none() image_task_dao: ImageTaskCRUD = ImageTaskCRUD(ImageProcessingTask) diff --git a/backend/app/ai/service/image_service.py b/backend/app/ai/service/image_service.py index f51a990..2e9fa48 100755 --- a/backend/app/ai/service/image_service.py +++ b/backend/app/ai/service/image_service.py @@ -199,7 +199,7 @@ class ImageService: # 检查用户积分是否足够(现在积分没有过期概念) if not await points_service.check_sufficient_points(current_user.id, IMAGE_RECOGNITION_COST): raise errors.ForbiddenError( - msg=f'积分不足,请充值以继续使用' + msg=f'积分不足,请获取积分后继续使用' ) # 尝试获取任务槽位 @@ -338,7 +338,7 @@ class ImageService: image = await image_dao.get(db, task.image_id) if image: total_tokens = task.result.get("token_usage", {}).get("total_tokens", 0) - points = math.ceil(max(total_tokens, 1)/1000)*0.15 * IMAGE_RECOGNITION_COST + points = math.ceil(max(total_tokens, 1)/1000) * IMAGE_RECOGNITION_COST points_deducted = await points_service.deduct_points_with_db( user_id=task.user_id, amount=math.ceil(points), @@ -348,8 +348,7 @@ class ImageService: action=POINTS_ACTION_IMAGE_RECOGNITION ) if not points_deducted: - logger.error(f"Failed to deduct points for user {task.user_id} for task {task_id}") - raise Exception("Failed to deduct points") + logger.warning(f"Insufficient points for user {task.user_id}: balance is zero, cannot deduct for task {task_id}") # Step 5: Update task status to completed await ImageService._update_task_status_with_db(task_id, ImageTaskStatus.COMPLETED, db) diff --git a/backend/app/ai/service/recording_service.py b/backend/app/ai/service/recording_service.py index 2e81a29..dff64d8 100644 --- a/backend/app/ai/service/recording_service.py +++ b/backend/app/ai/service/recording_service.py @@ -372,7 +372,7 @@ class RecordingService: # 检查用户积分是否足够(现在积分没有过期概念) if not await points_service.check_sufficient_points(user_id, SPEECH_ASSESSMENT_COST): - raise RuntimeError('积分不足,请充值以继续使用') + raise RuntimeError('积分不足,请获取积分后继续使用') try: # 调用腾讯云SOE API进行语音评估 @@ -447,7 +447,7 @@ class RecordingService: except Exception as e: raise RuntimeError(f"Failed to create recording record for file_id {file_id}: {str(e)}") if not await points_service.check_sufficient_points(user_id, SPEECH_ASSESSMENT_COST): - raise RuntimeError('积分不足,请充值以继续使用') + raise RuntimeError('积分不足,请获取积分后继续使用') try: result = await self.tencent_cloud.assessment_speech(file_id, ref_text, str(recording.id), image_id, user_id) details = {"assessment": result} diff --git a/backend/app/ai/service/scene_sentence_service.py b/backend/app/ai/service/scene_sentence_service.py index 1cc25c4..7fa0c52 100644 --- a/backend/app/ai/service/scene_sentence_service.py +++ b/backend/app/ai/service/scene_sentence_service.py @@ -11,7 +11,7 @@ from backend.app.admin.schema.wx import DictLevel class SceneSentenceService: @staticmethod - async def create_session_with_items(image_id: int, user_id: int, image_chat_id: Optional[int], scene_tag: Optional[List[str]] = None, items: List[Dict[str, Any]] = [], called_at: datetime = datetime.now()) -> Dict[str, Any]: + async def create_sentence_with_items(image_id: int, user_id: int, image_chat_id: Optional[int], scene_tag: Optional[List[str]] = None, items: List[Dict[str, Any]] = [], called_at: datetime = datetime.now()) -> Dict[str, Any]: async with async_db_session.begin() as db: last_seq = 0 if image_chat_id: diff --git a/backend/app/ai/service/sentence_service.py b/backend/app/ai/service/sentence_service.py index b2d5565..a6e4769 100644 --- a/backend/app/ai/service/sentence_service.py +++ b/backend/app/ai/service/sentence_service.py @@ -3,6 +3,7 @@ from typing import Optional, List, Dict, Any from datetime import datetime import json +import math from backend.app.ai.crud.sentence_card_crud import sentence_card_dao from backend.app.ai.crud.scene_sentence_crud import scene_sentence_dao, scene_sentence_item_dao @@ -21,7 +22,7 @@ from backend.app.ai.crud.image_task_crud import image_task_dao from backend.app.ai.schema.image_task import CreateImageTaskParam from backend.app.admin.service.points_service import points_service from backend.app.ai.service.rate_limit_service import rate_limit_service -from backend.common.const import SENTENCE_CARD_COST, SENTENCE_TYPE_SCENE_SENTENCE, SENTENCE_TYPE_SCENE_DIALOGUE, SENTENCE_TYPE_SCENE_EXERCISE +from backend.common.const import SENTENCE_CARD_COST, SENTENCE_TYPE_SCENE_SENTENCE, SENTENCE_TYPE_SCENE_DIALOGUE, SENTENCE_TYPE_SCENE_EXERCISE, LLM_CHAT_COST class SentenceService: @@ -46,7 +47,7 @@ class SentenceService: "1. 内容约束:基于基础句型扩展功能标签、场景说明,每句补充「发音提示(重音/连读)」等输出结构中要求的内容,需符合现实生活和真实世界的习惯。\n" "2. 格式约束:严格按照下方JSON结构输出,无额外解释,确保字段完整、值为数组/字符串类型。\n" "3. 语言约束:所有英文内容符合日常沟通表达,无语法错误;中文翻译精准,场景说明简洁易懂(≤50字)。\n" - "4. 严格按照JSON结构输出,无额外解释,确保字段完整、值为数组/字符串类型,输出的 json 结构是:\n" + "4. 严格按照JSON结构输出,无额外解释,确保字段完整、值为数组/字符串类型,输出的 JSON 结构是:\n" ) struct = ( """ @@ -147,23 +148,18 @@ class SentenceService: payload["user_level"] = "intermediate" prompt = SentenceService._compose_prompt(payload, SENTENCE_TYPE_SCENE_SENTENCE) start_at = datetime.now() - res = await Hunyuan.chat( - messages=[{"role": "user", "content": prompt}], - image_id=image_id, - user_id=user_id, - system_prompt=None, - chat_type=SENTENCE_TYPE_SCENE_SENTENCE - ) + res = await SentenceService._call_scene_llm(prompt, image_id, user_id, SENTENCE_TYPE_SCENE_SENTENCE) if not res.get("success"): return None image_chat_id = res.get("image_chat_id") + token_usage = res.get("token_usage") or {} parsed = {} try: parsed = json.loads(res.get("result")) if isinstance(res.get("result"), str) else res.get("result", {}) except Exception: parsed = {} - # print(parsed) + print(parsed) items = [] sc = parsed.get("sentence") or {} for idx, d in enumerate(sc.get("list", []), start=1): @@ -195,7 +191,7 @@ class SentenceService: "speech_rate_tip": d.get("speech_rate_tip") or d.get("speechRateTip"), "personalized_tips": d.get("personalized_tips") or d.get("personalizedTips"), }) - return await scene_sentence_service.create_session_with_items( + _ret = await scene_sentence_service.create_sentence_with_items( image_id=image_id, user_id=user_id, image_chat_id=image_chat_id, @@ -203,6 +199,9 @@ class SentenceService: items=items, called_at=start_at ) + if isinstance(_ret, dict): + _ret["token_usage"] = token_usage + return _ret @staticmethod async def generate_scene_dialogue(image_id: int, user_id: int, scene_tag: str, desc_en: List[str], desc_zh: List[str], core_vocab: List[str], collocations: List[str]) -> List[SentenceCard]: @@ -258,9 +257,39 @@ class SentenceService: "details": json.dumps(item, ensure_ascii=False), "called_at": start_at, }) - created.append(card) + created.append(card) return created + @staticmethod + async def _call_scene_llm(prompt: str, image_id: int, user_id: int, chat_type: str) -> Dict[str, Any]: + model_type = (settings.LLM_MODEL_TYPE or "").lower() + if model_type == "qwen": + try: + qres = await Qwen.chat( + messages=[{"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": prompt}], + image_id=image_id, + user_id=user_id + ) + if qres and qres.get("success"): + return {"success": True, "result": qres.get("result"), "image_chat_id": None, "token_usage": qres.get("token_usage") or {}} + except Exception: + pass + return {"success": False, "error": "LLM call failed"} + else: + try: + res = await Hunyuan.chat( + messages=[{"role": "user", "content": prompt}], + image_id=image_id, + user_id=user_id, + system_prompt=None, + chat_type=chat_type + ) + if res and res.get("success"): + return res + except Exception: + pass + return {"success": False, "error": "LLM call failed"} + @staticmethod async def generate_sentence_exercise_card(image_id: int, user_id: int, scene_tag: str, desc_en: List[str], desc_zh: List[str], core_vocab: List[str], collocations: List[str]) -> List[SentenceCard]: start_at = datetime.now() @@ -297,19 +326,27 @@ class SentenceService: @staticmethod async def create_scene_task(image_id: int, user_id: int, scene_type: str) -> dict: from backend.common.exception import errors - if not await points_service.check_sufficient_points(user_id, SENTENCE_CARD_COST): - raise errors.ForbiddenError(msg='积分不足,请充值以继续使用') - slot_acquired = await rate_limit_service.acquire_task_slot(user_id) - if not slot_acquired: - max_tasks = await rate_limit_service.get_user_task_limit(user_id) - raise errors.ForbiddenError(msg=f'用户同时最多只能运行 {max_tasks} 个任务,请等待现有任务完成后再试') if scene_type not in (SENTENCE_TYPE_SCENE_DIALOGUE, SENTENCE_TYPE_SCENE_EXERCISE, SENTENCE_TYPE_SCENE_SENTENCE): raise errors.BadRequestError(msg=f'不支持的任务类型 {scene_type}') async with async_db_session.begin() as db: image = await image_dao.get(db, image_id) if not image: raise errors.NotFoundError(msg='图片不存在') - dict_level = DictLevel.LEVEL2.value + latest_task = await image_task_dao.get_latest_active_task(db, user_id, image_id, scene_type) + if latest_task: + await db.commit() + return {"task_id": str(latest_task.id), "status": latest_task.status} + if not await points_service.check_sufficient_points(user_id, SENTENCE_CARD_COST): + raise errors.ForbiddenError(msg='积分不足,请获取积分后继续使用') + slot_acquired = await rate_limit_service.acquire_task_slot(user_id) + if not slot_acquired: + max_tasks = await rate_limit_service.get_user_task_limit(user_id) + raise errors.ForbiddenError(msg=f'用户同时最多只能运行 {max_tasks} 个任务,请等待现有任务完成后再试') + dict_level = DictLevel.LEVEL2.value + async with async_db_session.begin() as db: + image = await image_dao.get(db, image_id) + if not image: + raise errors.NotFoundError(msg='图片不存在') task_params = CreateImageTaskParam( image_id=image_id, user_id=user_id, @@ -346,9 +383,9 @@ class SentenceService: core_vocab = level2.get("core_vocab", []) collocations = level2.get("collocations", []) if task.ref_type == SENTENCE_TYPE_SCENE_DIALOGUE: - cards = await SentenceService.generate_scene_dialogue(task.image_id, task.user_id, scene_tag, desc_en, desc_zh, core_vocab, collocations) + await SentenceService.generate_scene_dialogue(task.image_id, task.user_id, scene_tag, desc_en, desc_zh, core_vocab, collocations) elif task.ref_type == SENTENCE_TYPE_SCENE_EXERCISE: - cards = await SentenceService.generate_sentence_exercise_card(task.image_id, task.user_id, scene_tag, desc_en, desc_zh, core_vocab, collocations) + await SentenceService.generate_sentence_exercise_card(task.image_id, task.user_id, scene_tag, desc_en, desc_zh, core_vocab, collocations) elif task.ref_type == SENTENCE_TYPE_SCENE_SENTENCE: payload = { "description": description, @@ -359,24 +396,31 @@ class SentenceService: "collocations": collocations, "user_level": "intermediate", } - cards = await SentenceService.generate_scene_sentence(task.image_id, task.user_id, payload) + result = await SentenceService.generate_scene_sentence(task.image_id, task.user_id, payload) else: raise Exception(f"Unsupported card type: {task.ref_type}") async with background_db_session() as db: await db.begin() image = await image_dao.get(db, task.image_id) + total_tokens = 0 + if isinstance(result, dict): + total_tokens = int((result.get("token_usage") or {}).get("total_tokens") or 0) + deduct_amount = SENTENCE_CARD_COST + if total_tokens > 0: + units = math.ceil(max(total_tokens, 1) / 1000) + deduct_amount = units * LLM_CHAT_COST points_deducted = await points_service.deduct_points_with_db( user_id=task.user_id, - amount=SENTENCE_CARD_COST, + amount=deduct_amount, db=db, related_id=image.id if image else None, - details={"task_id": task_id, "sentence_type": task.ref_type}, + details={"task_id": task_id, "sentence_type": task.ref_type, "token_usage": total_tokens}, action=task.ref_type ) if not points_deducted: raise Exception("Failed to deduct points") from backend.app.ai.tasks import update_task_status_with_retry - await update_task_status_with_retry(db, task_id, ImageTaskStatus.COMPLETED, result={"count": len(cards)}) + await update_task_status_with_retry(db, task_id, ImageTaskStatus.COMPLETED, result=result) await db.commit() task_processing_success = True except Exception as e: diff --git a/backend/common/const.py b/backend/common/const.py index c2a7367..7528b9a 100644 --- a/backend/common/const.py +++ b/backend/common/const.py @@ -1,12 +1,12 @@ """Constants used throughout the application.""" # Image recognition service cost in points -IMAGE_RECOGNITION_COST = 10 # 0.0015/1000 * 10 - -# Speech assessment service cost in points +IMAGE_RECOGNITION_COST = 2 # 0.0015/1000 * 1 SPEECH_ASSESSMENT_COST = 1 SENTENCE_CARD_COST = 2 +LLM_CHAT_COST = 1 +QWEN_TOKEN_COST = 0.002 # Points action types POINTS_ACTION_SYSTEM_GIFT = "system_gift" POINTS_ACTION_IMAGE_RECOGNITION = "image_recognition" @@ -17,6 +17,8 @@ POINTS_ACTION_SPEND = "spend" POINTS_ACTION_REFUND_FREEZE = "refund_freeze" POINTS_ACTION_REFUND_UNFREEZE = "refund_unfreeze" POINTS_ACTION_REFUND_DEDUCT = "refund_deduct" +POINTS_ACTION_DEBT_PENDING = "debt_pending" +POINTS_ACTION_DEBT_SETTLED = "debt_settled" API_TYPE_RECOGNITION = 'recognition' diff --git a/backend/core/conf.py b/backend/core/conf.py index 06e4562..1e7b376 100755 --- a/backend/core/conf.py +++ b/backend/core/conf.py @@ -53,6 +53,8 @@ class Settings(BaseSettings): QWEN_API_KEY: str QWEN_VISION_MODEL: str QWEN_VISION_EMBEDDING_MODEL: str + QWEN_TEXT_MODEL: str + LLM_MODEL_TYPE: str API_TIMEOUT: int = 600 ASYNC_POLL_INTERVAL: int = 1 ASYNC_MODE: str = "enable" diff --git a/backend/middleware/qwen.py b/backend/middleware/qwen.py index b798248..5c28360 100755 --- a/backend/middleware/qwen.py +++ b/backend/middleware/qwen.py @@ -15,7 +15,9 @@ from backend.app.admin.service.audit_log_service import audit_log_service from backend.common.exception import errors from backend.core.conf import settings from backend.common.log import log as logger -from typing import Dict, Any, List, Union +from typing import Dict, Any, List, Union, Optional +from math import ceil +from backend.common.const import QWEN_TOKEN_COST from sqlalchemy.exc import IntegrityError from asyncio import sleep @@ -28,6 +30,92 @@ class Qwen: RECOGNITION_URL = "https://dashscope.aliyuncs.com/api/v1/services/aigc/multimodal-generation/generation" EMBEDDING_URL = "https://dashscope.aliyuncs.com/api/v1/services/embeddings/multimodal-embedding" + @staticmethod + async def chat(messages: List[Dict[str, str]], image_id: int = 0, user_id: int = 0) -> Dict[str, Any]: + api_key = settings.QWEN_API_KEY + model_name = settings.QWEN_TEXT_MODEL + start_time = time.time() + start_at = datetime.now() + error_message = "" + status_code = 500 + response_data: Dict[str, Any] = {} + try: + loop = asyncio.get_event_loop() + response = await loop.run_in_executor( + Qwen._executor, + lambda: dashscope.Generation.call( + api_key=api_key, + model=model_name, + messages=messages, + result_format="message", + ) + ) + status_code = getattr(response, 'status_code', getattr(response, 'code', 500)) + response_data = { + "output": getattr(response, 'output', None), + "usage": getattr(response, 'usage', {}), + "code": getattr(response, 'code', None), + "message": getattr(response, 'message', None) + } + duration = time.time() - start_time + audit_log = CreateAuditLogParam( + api_type="chat", + model_name=model_name, + response_data=response_data, + request_data={"messages": messages}, + token_usage=response_data.get("usage", {}), + duration=duration, + status_code=status_code, + error_message=None, + called_at=start_at, + image_id=image_id, + user_id=user_id, + cost=0, + api_version=settings.FASTAPI_API_V1_PATH, + dict_level=None, + ) + Qwen._audit_log("chat", audit_log) + if status_code == 200: + content = "" + try: + content = response.output.choices[0].message.content + except Exception: + pass + return { + "success": True, + "result": content, + "token_usage": response_data.get("usage", {}) + } + else: + error_message = response_data.get("message") or "API error" + logger.error(f"chat API error: {status_code} - {error_message}") + return { + "success": False, + "error": error_message, + "status_code": status_code + } + except Exception as e: + error_message = str(e) + logger.exception(f"chat API exception: {error_message}") + return { + "success": False, + "error": error_message + } + finally: + if error_message: + Qwen._log_audit( + api_type="chat", + dict_level=None, + model_name=model_name, + request_data={"messages": messages}, + response_data={"error": error_message}, + duration=time.time() - start_time, + status_code=status_code, + error_message=error_message, + image_id=image_id, + user_id=user_id, + called_at=start_at + ) @staticmethod def get_recognition_prompt(type: str, exclude_words: List[str] | None = None) -> str: """获取图像识别提示词""" @@ -511,6 +599,10 @@ level2 (Intermediate): elif api_type == "embedding": # 假设每张图片 $0.001 cost = 0.01 + elif api_type == "chat": + total_tokens = int(token_usage.get("total_tokens") or 0) + units = ceil(total_tokens / 1000) if total_tokens > 0 else 0 + cost = units * QWEN_TOKEN_COST # Round to 6 decimal places to prevent data truncation issues return round(cost, 6) diff --git a/backend/tests/test_image_service_rate_limit.py b/backend/tests/test_image_service_rate_limit.py index 0b79534..ad5840b 100644 --- a/backend/tests/test_image_service_rate_limit.py +++ b/backend/tests/test_image_service_rate_limit.py @@ -65,7 +65,7 @@ class TestImageServiceRateLimit: ) # Verify the error message - assert "积分不足,请充值以继续使用" in str(exc_info.value.msg) + assert "积分不足,请获取积分后继续使用" in str(exc_info.value.msg) # Verify that check_sufficient_points was called with correct parameters mock_check_sufficient_points.assert_awaited_once_with(