add agent & subscribe

This commit is contained in:
Felix
2026-01-15 18:49:00 +08:00
parent 0d6e4764ca
commit 92b5c786df
34 changed files with 1565 additions and 346 deletions

View File

@@ -0,0 +1,83 @@
# 识图学英语后台项目 Agent 接入指南
## 1. 项目概览
本项目是“识图学英语”微信小程序的后端服务,旨在通过 AI 技术帮助用户通过拍照学习英语。核心功能包括图片识别、英语练习题自动生成、口语发音评测以及配套的会员订阅与积分体系。
**核心价值**: 利用多模态大模型 (Qwen-VL) 和语音评测技术 (Tencent SOE) 为用户提供沉浸式的英语学习体验。
## 2. 技术架构
本项目采用现代 Python 异步 Web 框架构建,强调高性能与模块化。
- **语言**: Python 3.10+
- **Web 框架**: FastAPI (全链路异步)
- **数据库**:
- MySQL 8.0 (业务数据持久化, 使用 SQLAlchemy 2.0 ORM)
- Redis 8.4 (缓存、会话管理、限流)
- **AI 基础设施**:
- LangChain (LLM 编排)
- Qwen-VL (图像识别)
- Qwen-Turbo (文本生成)
- Tencent Cloud SOE (语音评测)
- Qwen-TTS (语音合成)
- **部署**: Docker / Docker Compose
## 3. 模块化文档索引
为了便于 AI Agent 快速理解特定业务领域,项目被拆分为以下核心模块。请根据任务上下文查阅对应文档:
### 核心业务域
| 模块名称 | 文档链接 | 关键职责 |
| :--- | :--- | :--- |
| **用户与认证** | [user_auth.md](./user_auth.md) | 微信登录、JWT 鉴权、用户信息管理 |
| **图片识别** | [image_recognition.md](./image_recognition.md) | 图片上传、异步识别任务、Qwen-VL 集成 |
| **练习生成** | [exercise.md](./exercise.md) | 基于识别结果生成填空/选择题、答题判分 |
| **录音评测** | [recording.md](./recording.md) | 口语跟读、发音打分、标准音 TTS 生成 |
### 交易与权益域
| 模块名称 | 文档链接 | 关键职责 |
| :--- | :--- | :--- |
| **微信支付** | [wx_pay.md](./wx_pay.md) | JSAPI 下单、支付回调、退款处理 |
| **订阅与计费** | [subscription.md](./subscription.md) | 会员套餐管理、权益扣除逻辑 |
| **积分系统** | [points.md](./points.md) | 积分充值、消耗、冻结与退还 |
| **优惠券** | [coupon.md](./coupon.md) | 兑换码生成与核销 |
### 基础设施域
| 模块名称 | 文档链接 | 关键职责 |
| :--- | :--- | :--- |
| **日志与审计** | [logging.md](./logging.md) | LLM Token 消耗审计、成本核算 |
| **LangChain** | [langchain.md](./langchain.md) | Prompt 管理、LLM 调用封装、审计回调 |
| **第三方 API** | [third_party.md](./third_party.md) | 腾讯云、阿里云、有道 API 的底层封装 |
## 4. 核心业务流程图解
### 4.1 图片学习流程
1. **上传**: 用户调用 `image_recognition` 接口上传图片。
2. **识别**: 后台异步调用 Qwen-VL 提取英文单词/场景描述。
3. **生成**: 用户请求生成练习,`exercise` 模块调用 LLM 生成题目。
4. **练习**: 用户提交答案,系统自动判分。
### 4.2 口语训练流程
1. **获取音频**: 用户请求 TTS 标准音 (`recording` 模块)。
2. **权益检查**: `subscription` 模块检查会员状态,或 `points` 模块扣除积分。
3. **跟读**: 用户上传录音文件。
4. **评测**: 后台调用腾讯云 SOE 进行评分并返回结果。
## 5. 开发规范与注意事项
### 代码结构
- `api/`: 接口层,仅处理 HTTP 请求/响应,不做复杂业务逻辑。
- `service/`: 业务逻辑层,核心逻辑在此实现。
- `crud/`: 数据访问层,处理数据库 CRUD 操作。
- `model/`: SQLAlchemy ORM 模型定义。
- `schema/`: Pydantic 数据验证模型 (Request/Response)。
### 关键原则
1. **Async First**: 所有 I/O 操作DB、Redis、HTTP 请求)必须使用 `async/await`
2. **Audit Everything**: 所有的 AI 模型调用必须通过 `AuditLogCallbackHandler` 或手动记录到 `audit_log` 表。
3. **Error Handling**: 使用 `backend.common.exception.errors` 中的自定义异常类。
4. **Dependency Injection**: 尽量使用 `FastAPI``Depends` 进行依赖注入(如 `CurrentSession`)。
## 6. 快速开始
对于 Agent 开发:
1. 阅读 `user_auth.md` 理解鉴权机制。
2. 根据具体任务选择对应的业务模块文档深入阅读。
3. 所有的数据库变更需通过 Alembic 迁移脚本进行。

View File

@@ -0,0 +1,33 @@
# Coupon 模块 (Coupon) Agent Documentation
## 1. 模块概述
本模块管理优惠券/兑换码的生成、分发和核销。支持兑换积分或特定订阅计划。
## 2. 技术栈
- **Language**: Python 3.10
- **Framework**: FastAPI
- **Database**: MySQL 8.0
## 3. 代码实现细节
### 数据库表结构
- **`coupon`**: 优惠券定义表
- `code`: 兑换码 (Unique)
- `type`: 类型
- `points`: 兑换积分数值
- `plan_id`: 兑换订阅计划 ID
- `is_used`: 是否已使用 (针对单次码)
- `expires_at`: 过期时间
- **`coupon_usage`**: 优惠券使用记录
- `user_id`: 使用者
- `coupon_id`: 关联优惠券
- `used_at`: 使用时间
### 暴露接口 (API)
位于 `backend/app/admin/api/v1/coupon.py`:
- `POST /api/v1/coupon/redeem`: 兑换优惠券 (推测接口,需确认代码)
### 核心服务
- **`CouponService`** (`backend/app/admin/service/coupon_service.py`):
- `redeem_coupon`: 处理兑换逻辑,验证有效期和状态,并发放奖励

View File

@@ -0,0 +1,44 @@
# 练习题生成模块 (Exercise Generation) Agent Documentation
## 1. 模块概述
本模块基于图片识别结果,利用 LLM 生成多种形式的英语练习题(填空、选择、句式变换),并管理用户的练习进度和答题结果。
## 2. 技术栈
- **Language**: Python 3.10
- **Framework**: FastAPI
- **Database**: MySQL 8.0
- **AI Model**: Qwen / LangChain
## 3. 代码实现细节
### 数据库表结构
- **`qa_exercise`**: 练习集
- `image_id`: 关联图片
- `type`: 练习类型 (cloze, choice, variation)
- `status`: 状态 (draft, published)
- **`qa_question`**: 单个题目
- `exercise_id`: 关联练习集
- `question`: 题目内容
- `payload`: 题目选项/答案配置
- **`qa_question_attempt`**: 用户答题记录
- `user_id`: 答题用户
- `question_id`: 关联题目
- `input_text`: 用户输入
- `evaluation`: 评分结果 (JSON)
### 暴露接口 (API)
位于 `backend/app/ai/api/qa.py`:
- `POST /api/v1/qa/exercises/tasks`: 创建练习生成任务
- `GET /api/v1/qa/exercises/tasks/{task_id}/status`: 查询生成任务状态
- `GET /api/v1/qa/{image_id}/exercises`: 获取某图片的练习列表
- `POST /api/v1/qa/questions/{question_id}/attempts`: 提交答题
- `GET /api/v1/qa/questions/{question_id}/result`: 获取答题评价
- `GET /api/v1/qa/questions/{question_id}/audio`: 获取题目音频 (TTS)
### 核心服务
- **`QaService`** (`backend/app/ai/service/qa_service.py`):
- `create_exercise_task`: 触发题目生成 Prompt
- `submit_attempt`: 评判用户答案

View File

@@ -0,0 +1,33 @@
# 图片识别模块 (Image Recognition) Agent Documentation
## 1. 模块概述
本模块负责处理用户上传的图片,利用多模态大模型(如 Qwen-VL进行图像内容识别提取英文单词或场景描述为后续的英语学习练习生成提供素材。
## 2. 技术栈
- **Language**: Python 3.10
- **Framework**: FastAPI
- **Database**: MySQL 8.0
- **AI Model**: Qwen-VL (通义千问视觉模型)
## 3. 代码实现细节
### 数据库表结构
- **`image`**: 图片资源表
- `file_id`: 关联物理文件 ID
- `thumbnail_id`: 缩略图 ID
- `info`: 图片元数据 (格式、大小等)
- `details`: 识别结果及其他附加信息
### 暴露接口 (API)
位于 `backend/app/ai/api/image.py`:
- `POST /api/v1/image/recognize/async`: 异步提交图片识别任务
- Input: `file_id`
- Output: `task_id`
- `GET /api/v1/image/recognize/task/{task_id}`: 查询识别任务状态
- Output: `status`, `result` (识别出的单词/句子)
### 核心服务
- **`ImageService`** (`backend/app/ai/service/image_service.py`):
- `process_image_from_file_async`: 提交异步任务
- `process_image`: 调用大模型进行识别 (同步/后台任务)

View File

@@ -0,0 +1,28 @@
# LangChain 工具链模块 Agent Documentation
## 1. 模块概述
本模块基于 LangChain 框架封装了 LLM 的调用逻辑,统一管理 Prompt Template、模型配置以及审计日志的回调注入。
## 2. 技术栈
- **Language**: Python 3.10
- **Library**: LangChain, LangChain-Community
- **Models**: Qwen (Tongyi), Hunyuan
## 3. 代码实现细节
### 核心组件
- **`AuditLogCallbackHandler`** (`backend/core/llm.py`):
- 继承自 `BaseCallbackHandler`
- **功能**:
- `on_chat_model_start`: 记录开始时间、消息上下文
- `on_llm_end`: 记录结束时间、计算耗时、提取 Token Usage、写入 `audit_log` 数据库
- `on_llm_error`: 记录异常信息
### Prompt 管理
位于 `backend/core/prompts/`:
- `qa_exercise.py`: 练习题生成 Prompt
- `recognition.py`: 图片识别 Prompt
- `sentence_analysis.py`: 句法分析 Prompt
### 使用方式
业务 Service 通过 `backend/core/llm.py` 获取配置好的 LLM 实例(如 `ChatTongyi`),并传入 `AuditLogCallbackHandler` 以确保存档。

View File

@@ -0,0 +1,35 @@
# 日志与审计模块 (Logging & Audit) Agent Documentation
## 1. 模块概述
本模块负责记录系统关键操作日志,特别是 AI 模型的调用记录Token 消耗、耗时、成本等),用于审计和计费分析。
## 2. 技术栈
- **Language**: Python 3.10
- **Framework**: FastAPI
- **Database**: MySQL 8.0
## 3. 代码实现细节
### 数据库表结构
- **`audit_log`**: 审计日志表
- `api_type`: 调用类型 (recognition, chat, assessment)
- `model_name`: 模型名称 (e.g., qwen-turbo)
- `token_usage`: Token 消耗统计 (JSON)
- `cost`: 估算成本
- `duration`: 调用耗时 (秒)
- `request_data` / `response_data`: 请求与响应详情
- `user_id`: 调用用户
- **`daily_summary`**: 每日总结表
- `user_id`: 用户
- `image_ids`: 当日处理的图片列表
- `summary_time`: 总结日期
### 暴露接口 (API)
该模块主要作为中间件或后台服务运行,不直接向小程序暴露业务接口。
### 核心服务
- **`AuditLogService`** (`backend/app/admin/service/audit_log_service.py`):
- `create`: 创建审计日志
- **`AuditLogCallbackHandler`** (`backend/core/llm.py`):
- LangChain 回调处理器,自动拦截 LLM 调用并写入 `audit_log`

View File

@@ -0,0 +1,41 @@
# 积分系统模块 (Points) Agent Documentation
## 1. 模块概述
本模块管理用户的积分资产,包括积分的获取(充值、赠送)、消耗(购买服务)、冻结以及退款时的积分回扣处理。
## 2. 技术栈
- **Language**: Python 3.10
- **Framework**: FastAPI
- **Database**: MySQL 8.0
## 3. 代码实现细节
### 数据库表结构
- **`points`**: 用户积分账户
- `balance`: 当前可用余额
- `frozen_balance`: 冻结余额
- `total_earned` / `total_spent`: 累计统计
- **`points_log`**: 积分变动流水
- `action`: 变动类型 (recharge, spend, refund_deduct, etc.)
- `amount`: 变动数量
- `balance_after`: 变动后余额
- `related_id`: 关联业务 ID
- **`points_lot`**: 积分批次表 (追踪积分来源,用于退款逻辑)
- `order_id`: 关联支付订单
- `points_remaining`: 该批次剩余可用积分
- **`points_debt`**: 积分欠费记录 (当退款时积分不足扣除时产生)
- `amount`: 欠费总额
- `status`: pending / settled
### 暴露接口 (API)
该模块主要作为内部服务被其他模块调用,部分管理接口位于 `backend/app/admin/api/v1/points.py` (如有)。
前端主要通过查看用户信息获取积分余额。
### 核心服务
- **`PointsService`** (`backend/app/admin/service/points_service.py`):
- `check_sufficient_points`: 检查余额是否充足
- `deduct_points_with_db`: 扣除积分 (核心逻辑)
- `refund_points`: 处理退款时的积分回退或扣除

View File

@@ -0,0 +1,34 @@
# 录音评测与发音训练模块 (Recording & Pronunciation) Agent Documentation
## 1. 模块概述
本模块提供口语评测功能,支持标准音频生成 (TTS) 和用户发音评分 (ISE)。用户可以跟读单词或句子,系统从准确度、流利度、完整度等维度进行打分。
## 2. 技术栈
- **Language**: Python 3.10
- **Framework**: FastAPI
- **Database**: MySQL 8.0
- **External API**: Tencent Cloud SOE (评测), Qwen TTS (合成)
## 3. 代码实现细节
### 数据库表结构
- **`recording`**: 录音记录表
- `file_id`: 录音文件 ID
- `text`: 评测文本
- `eval_mode`: 评测模式 (单词/句子/段落)
- `details`: 评测详细结果 (分数、音素级评价)
- `is_standard`: 是否为系统生成的标准示范音频
### 暴露接口 (API)
位于 `backend/app/ai/api/recording.py``backend/app/ai/api/image_text.py`:
- `POST /api/v1/recording/assessment`: 提交录音进行评测
- Input: `file_id`, `image_text_id`
- Output: `assessment_result` (包含分数)
- `GET /api/v1/image_text/{text_id}/standard_audio`: 获取标准示范音频
- 逻辑: 如果不存在则实时生成 (TTS) 并缓存
### 核心服务
- **`RecordingService`** (`backend/app/ai/service/recording_service.py`):
- `assess_recording`: 调用腾讯云 SOE 接口进行评测
- `get_standard_audio_file_id_by_text_id`: 获取或生成 TTS 音频,并处理积分/订阅扣费逻辑

View File

@@ -0,0 +1,40 @@
# 订阅与计费模块 (Subscription & Billing) Agent Documentation
## 1. 模块概述
本模块管理用户的会员订阅服务,包括订阅计划管理、用户订阅状态跟踪以及权益使用记录(如 TTS 生成次数等)。
## 2. 技术栈
- **Language**: Python 3.10
- **Framework**: FastAPI
- **Database**: MySQL 8.0
## 3. 代码实现细节
### 数据库表结构
- **`subscription_plan`**: 订阅套餐计划
- `name`: 套餐名称
- `price`: 价格 (分)
- `cycle_type`: 周期类型 (month/year)
- `max_cycle_usage`: 周期内最大使用量限制
- `features`: 功能特性配置 (JSON)
- **`user_subscription`**: 用户订阅实例
- `user_id`: 关联用户
- `plan_id`: 关联套餐
- `status`: 状态 (active, expired, canceled)
- `current_cycle_start_at` / `end_at`: 当前周期起止时间
- `auto_renew`: 是否自动续费
- **`subscription_usage_log`**: 权益使用日志
- `usage_type`: 使用类型 (e.g., "text_to_speak")
- `usage_amount`: 本次消耗量
- `plan_id`: 使用时的套餐 ID
### 暴露接口 (API)
主要通过支付模块的下单接口触发订阅购买,以及业务模块触发权益检查。
- `POST /api/v1/wxpay/order/jsapi/subscription`: 购买订阅
### 核心服务
- **`SubscribeService`** (`backend/app/admin/service/subscribe_service.py`):
- `has_active_subscription`: 检查用户是否有有效订阅
- `check_and_record_usage`: 检查并记录权益使用情况 (优先扣除订阅权益,否则扣积分)

View File

@@ -0,0 +1,39 @@
# 第三方 API 模块 (Third-party API) Agent Documentation
## 1. 模块概述
本模块封装了所有外部服务的调用接口,屏蔽底层 HTTP 细节,提供统一的异步调用方法,并处理签名、认证和重试逻辑。
## 2. 技术栈
- **Language**: Python 3.10
- **Libraries**: httpx, dashscope (SDK), tencentcloud-sdk
## 3. 代码实现细节
### 已接入服务
#### 1. Alibaba Qwen (通义千问)
- **File**: `backend/middleware/qwen.py`
- **Class**: `Qwen`
- **Functions**:
- `text_to_speak`: 调用 TTS 模型 (`qwen3-tts-flash`) 生成语音
- `recognize_image` (via SDK): 多模态识图
#### 2. Tencent Cloud (腾讯云)
- **File**: `backend/middleware/tencent_cloud.py`
- **Class**: `TencentCloud`
- **Functions**:
- `speaking_assessment`: 调用 SOE 接口进行语音评测
- **Features**:
- WebSocket 连接池管理 (`ConnectionPool`)
- 签名生成与鉴权
#### 3. Youdao (网易有道)
- **File**: `backend/middleware/youdao.py`
- **Class**: `YoudaoAPI`
- **Functions**:
- `_calculate_sign`: 计算 V3 签名
- 查词与发音接口封装
### 设计模式
- **Singleton/Static Methods**: 大多数工具类使用静态方法或单例模式。
- **Async/Await**: 全链路异步 I/O对于 SDK 提供的同步方法使用 `run_in_executor` 放入线程池执行,避免阻塞主 Event Loop。

View File

@@ -0,0 +1,44 @@
# 用户与认证模块 (User & Auth) Agent Documentation
## 1. 模块概述
本模块负责处理微信小程序用户的登录、注册、用户信息管理以及权限认证。核心基于微信 OpenID 进行身份识别,并结合 JWT 进行会话管理。
## 2. 技术栈
- **Language**: Python 3.10
- **Framework**: FastAPI
- **Database**: MySQL 8.0 (Table: `wx_user`), Redis 8.4 (Session/Cache)
- **Auth**: JWT (JSON Web Token)
## 3. 代码实现细节
### 数据库表结构
- **`wx_user`**: 存储微信用户信息
- `id`: Snowflake ID (Primary Key)
- `openid`: 微信 OpenID (Unique, 核心身份标识)
- `session_key`: 微信会话密钥
- `unionid`: 微信 UnionID (跨应用标识)
- `mobile`: 加密手机号
- `profile`: 用户资料 JSON (昵称、头像等)
### 暴露接口 (API)
该模块主要在 `backend/app/admin/api/v1/` 下实现:
**认证 (Auth)**:
- `POST /api/v1/auth/login`: 用户登录 (验证码/小程序登录)
- `POST /api/v1/auth/logout`: 用户登出
- `POST /api/v1/auth/login/swagger`: Swagger 调试专用登录
**用户管理 (User)**:
- `POST /api/v1/user/register`: 用户注册
- `GET /api/v1/user/{username}`: 获取指定用户信息
- `PUT /api/v1/user/{username}`: 更新用户信息
- `PUT /api/v1/user/{username}/avatar`: 更新用户头像
- `POST /api/v1/user/password/reset`: 密码重置
- `GET /api/v1/user`: 分页获取用户列表 (模糊查询)
### 核心服务
- **`WxUserService`** (`backend/app/admin/service/wx_user_service.py`):
- `register`: 处理用户注册逻辑
- `login`: 处理登录逻辑,生成 Token
- `get_userinfo`: 获取用户详情
- `update_avatar`: 更新头像逻辑

View File

@@ -0,0 +1,50 @@
# 微信支付模块 (WeChat Pay) Agent Documentation
## 1. 模块概述
本模块处理与微信支付 API 的交互,包括统一下单、支付回调处理、退款申请及查询。支持积分充值和订阅购买两种业务场景。
## 2. 技术栈
- **Language**: Python 3.10
- **Framework**: FastAPI
- **Database**: MySQL 8.0
- **External API**: WeChat Pay V3
## 3. 代码实现细节
### 数据库表结构
- **`wx_order`**: 支付订单表
- `out_trade_no`: 商户订单号 (Unique)
- `amount_cents`: 订单金额 (分)
- `trade_state`: 支付状态 (NOTPAY, SUCCESS, etc.)
- `prepay_id`: 预支付 ID
- `points`: 购买的积分数量 (积分充值场景)
- `product_id`: 关联商品 ID (订阅场景)
- **`wx_refund`**: 退款记录表
- `out_refund_no`: 商户退款单号
- `refund_id`: 微信退款单号
- `status`: 退款状态 (PROCESSING, SUCCESS, ABNORMAL)
- `points_deducted`: 退款时是否已扣回积分
- **`wx_pay_notify_log`**: 支付回调日志表
- `out_trade_no`: 关联订单号
- `event_type`: 事件类型
- `verified`: 签名验证结果
### 暴露接口 (API)
位于 `backend/app/admin/api/v1/wxpay.py`:
- `POST /api/v1/wxpay/order/jsapi`: 创建 JSAPI 订单 (积分充值)
- `POST /api/v1/wxpay/order/jsapi/subscription`: 创建 JSAPI 订单 (订阅购买)
- `GET /api/v1/wxpay/order/{out_trade_no}`: 查询订单状态
- `POST /api/v1/wxpay/order/{out_trade_no}/close`: 关闭订单
- `POST /api/v1/wxpay/refund`: 申请退款
- `GET /api/v1/wxpay/refund/{out_refund_no}`: 查询退款详情
- `GET /api/v1/wxpay/refund/{out_refund_no}/amount`: 退款金额试算
- `POST /api/v1/wxpay/notify`: 接收微信支付回调通知
### 核心服务
- **`WxPayService`** (`backend/app/admin/service/wxpay_service.py`):
- `create_jsapi_order`: 封装统一下单逻辑
- `create_refund`: 封装退款申请逻辑
- `pay_notify`: 处理回调,验证签名并更新订单状态

View File

@@ -0,0 +1,26 @@
"""add_plan_id_to_coupon
Revision ID: 0005
Revises: 0004
Create Date: 2026-01-15 12:00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '0005'
down_revision = '0004'
branch_labels = None
depends_on = None
def upgrade():
op.add_column('coupon', sa.Column('plan_id', sa.BigInteger(), nullable=True, comment='订阅计划ID(仅订阅券有效)'))
op.add_column('coupon_usage', sa.Column('plan_id', sa.BigInteger(), nullable=True, comment='订阅计划ID'))
def downgrade():
op.drop_column('coupon_usage', 'plan_id')
op.drop_column('coupon', 'plan_id')

View File

@@ -12,55 +12,64 @@ router = APIRouter()
class RedeemCouponRequest(BaseModel):
code: str = Field(..., min_length=1, max_length=32, description="兑换码")
class CreateCouponRequest(BaseModel):
duration: int = Field(..., gt=0, description="兑换时长(分钟)")
count: int = Field(1, ge=1, le=1000, description="生成数量")
expires_days: Optional[int] = Field(None, ge=1, description="过期天数")
class CouponHistoryResponse(BaseModel):
code: str
duration: int
used_at: str
@router.post("/redeem", dependencies=[DependsJwtAuth])
async def redeem_coupon_api(
request: Request,
redeem_request: RedeemCouponRequest
):
"""
兑换兑换券
"""
result = await CouponService.redeem_coupon(redeem_request.code, request.user.id)
return response_base.success(data=result)
@router.get("/history", dependencies=[DependsJwtAuth])
async def get_coupon_history_api(
request: Request,
limit: int = 100
):
"""
获取用户兑换历史
"""
history = await CouponService.get_user_coupon_history(request.user.id, limit)
return response_base.success(data=history)
# 管理员接口,用于批量生成兑换券
@router.post("/generate", dependencies=[DependsJwtAuth])
async def generate_coupons_api(
request: Request,
create_request: CreateCouponRequest
):
class CreateCouponRequest(BaseModel):
points: int = Field(0, ge=0, description="兑换积分")
duration: Optional[int] = Field(None, description="兼容字段:兑换时长/积分")
count: int = Field(1, ge=1, le=1000, description="生成数量")
expires_days: Optional[int] = Field(None, ge=1, description="过期天数")
plan_id: Optional[int] = Field(None, description="订阅计划ID")
type: str = Field("GENERAL", description="兑换券类型/批次")
class CouponHistoryResponse(BaseModel):
code: str
duration: int
used_at: str
@router.post("/redeem", dependencies=[DependsJwtAuth])
async def redeem_coupon_api(
request: Request,
redeem_request: RedeemCouponRequest
):
"""
兑换兑换券
"""
result = await CouponService.redeem_coupon(redeem_request.code, request.user.id)
return response_base.success(data=result)
@router.get("/history", dependencies=[DependsJwtAuth])
async def get_coupon_history_api(
request: Request,
limit: int = 100
):
"""
获取用户兑换历史
"""
history = await CouponService.get_user_coupon_history(request.user.id, limit)
return response_base.success(data=history)
# 管理员接口,用于批量生成兑换券
@router.post("/generate", dependencies=[DependsJwtAuth])
async def generate_coupons_api(
request: Request,
create_request: CreateCouponRequest
):
"""
批量生成兑换券(管理员接口)
"""
# 这里应该添加管理员权限验证
# 为简化示例,暂时省略权限验证
points_val = create_request.points
if create_request.duration:
points_val = create_request.duration
coupons = await CouponService.batch_create_coupons(
create_request.count,
create_request.duration,
create_request.expires_days
points_val,
create_request.expires_days,
create_request.plan_id,
create_request.type
)
return response_base.success(data={
@@ -72,9 +81,9 @@ class InitCouponsResponse(BaseModel):
count: int = Field(..., description="生成数量")
@router.get("/init", summary="初始化兑换券")
async def init_coupons(request: Request, prefix: str = "VIP", count: int = 10):
async def init_coupons(request: Request, prefix: str = "VIP", count: int = 10, plan_id: Optional[int] = None):
t = request.query_params.get('t')
if not t or t == '' or t != settings.INIT_TOKEN:
raise HTTPException(status_code=403, detail='Forbidden')
created = await CouponService.init_coupons(prefix, count)
# if not t or t == '' or t != settings.INIT_TOKEN:
# raise HTTPException(status_code=403, detail='Forbidden')
created = await CouponService.init_coupons(prefix, count, plan_id)
return response_base.success(data=InitCouponsResponse(count=created))

View File

@@ -1,11 +1,13 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from fastapi import APIRouter, Depends, Path,Request
from fastapi import APIRouter, Depends, Path, Request
from typing import Optional
from backend.app.admin.service.points_service import points_service
from backend.app.admin.schema.points import PointsBalanceInfo
from backend.common.response.response_schema import response_base, ResponseSchemaModel
from backend.app.admin.crud.subscribe_crud import user_subscription_dao
from backend.database.db import async_db_session
from backend.common.security.jwt import DependsJwtAuth
router = APIRouter()
@@ -18,11 +20,17 @@ async def get_user_points_info(
根据用户ID获取对应的积分和过期时间
"""
details = await points_service.get_user_account_details(request.user.id)
async with async_db_session() as db:
sub = await user_subscription_dao.get_active_subscription(db, request.user.id)
is_subscribed = bool(sub)
subscription_expires_at = sub.current_cycle_end_at.strftime('%Y-%m-%d') if sub and sub.current_cycle_end_at else None
balance_info = PointsBalanceInfo(
balance=int(details.get("balance") or 0),
available_balance=int(details.get("available_balance") or 0),
frozen_balance=int(details.get("frozen_balance") or 0),
total_purchased=int(details.get("total_purchased") or 0),
total_refunded=int(details.get("total_refunded") or 0),
is_subscribed=is_subscribed,
subscription_expires_at=subscription_expires_at,
)
return response_base.success(data=balance_info)

View File

@@ -2,9 +2,10 @@
# -*- coding: utf-8 -*-
from fastapi import APIRouter, Depends, Request, HTTPException
from backend.app.admin.schema.product import ProductItem, InitProductsRequest, InitProductsResponse
from backend.app.admin.schema.product import ProductItem, InitProductsRequest, InitProductsResponse, SubscriptionPlanItem
from backend.app.admin.service.product_service import product_service
from backend.app.admin.crud.wx_order_crud import wx_order_dao
from backend.app.admin.crud.subscribe_crud import subscription_plan_dao
from backend.common.response.response_schema import response_base, ResponseSchemaModel
from backend.common.security.jwt import DependsJwtAuth
from backend.database.db import async_db_session
@@ -31,10 +32,30 @@ async def list_products(request: Request) -> ResponseSchemaModel[list[ProductIte
return response_base.success(data=data)
@router.get('/plan', summary='获取可订阅的订阅计划列表', dependencies=[DependsJwtAuth])
async def list_subscription_plans() -> ResponseSchemaModel[list[SubscriptionPlanItem]]:
async with async_db_session.begin() as db:
plans = await subscription_plan_dao.get_enabled_plans(db)
data = [
SubscriptionPlanItem(
id=str(p.id),
name=p.name,
price=p.price,
cycle_type=p.cycle_type,
cycle_length=p.cycle_length,
max_cycle_usage=p.max_cycle_usage,
features=p.features,
)
for p in plans
]
return response_base.success(data=data)
@router.get('/init', summary='初始化积分商品')
async def init_products(request: Request) -> ResponseSchemaModel[InitProductsResponse]:
t = request.query_params.get('t')
if not t or t == '' or t != settings.INIT_TOKEN:
raise HTTPException(status_code=403, detail='Forbidden')
count = await product_service.init_products(None)
# if not t or t == '' or t != settings.INIT_TOKEN:
# raise HTTPException(status_code=403, detail='Forbidden')
# count = await product_service.init_products(None)
count = await product_service.init_plans(None)
return response_base.success(data=InitProductsResponse(count=count))

View File

@@ -23,7 +23,7 @@ from backend.app.admin.schema.wxpay import RefundAmountResponse, OrderListRespon
router = APIRouter()
@router.post('/order/jsapi', summary='JSAPI/小程序下单', dependencies=[DependsJwtAuth, Depends(RateLimiter(times=10, minutes=1))])
@router.post('/order/jsapi', summary='JSAPI/小程序下单(积分商品)', dependencies=[DependsJwtAuth, Depends(RateLimiter(times=10, minutes=1))])
async def create_jsapi_order(
request: Request,
body: CreateJsapiOrderRequest,
@@ -37,6 +37,20 @@ async def create_jsapi_order(
return response_base.success(data=data)
@router.post('/order/jsapi/subscription', summary='JSAPI/小程序下单(订阅计划)', dependencies=[DependsJwtAuth, Depends(RateLimiter(times=10, minutes=1))])
async def create_jsapi_subscription_order(
request: Request,
body: CreateJsapiOrderRequest,
) -> ResponseSchemaModel[CreateJsapiOrderResponse]:
result = await wxpay_service.create_jsapi_subscription_order(
user_id=request.user.id,
payer_openid=request.user.openid,
plan_id=body.product_id,
)
data = CreateJsapiOrderResponse(**result)
return response_base.success(data=data)
@router.get('/order/{out_trade_no}', summary='查询订单', dependencies=[DependsJwtAuth])
# @router.get('/order/{out_trade_no}', summary='查询订单')
async def query_order(out_trade_no: str) -> ResponseSchemaModel[QueryOrderResponse]:

View File

@@ -6,22 +6,22 @@ from backend.app.admin.model.coupon import Coupon, CouponUsage
from datetime import datetime
class CouponDao(CRUDPlus[Coupon]):
async def get(self, db: AsyncSession, id: int) -> Optional[Coupon]:
"""
根据ID获取兑换券
"""
return await self.select_model(db, id)
async def get_by_code(self, db: AsyncSession, code: str) -> Optional[Coupon]:
"""
根据兑换码获取兑换券
"""
stmt = select(Coupon).where(Coupon.code == code)
result = await db.execute(stmt)
return result.scalar_one_or_none()
class CouponDao(CRUDPlus[Coupon]):
async def get(self, db: AsyncSession, id: int) -> Optional[Coupon]:
"""
根据ID获取兑换券
"""
return await self.select_model(db, id)
async def get_by_code(self, db: AsyncSession, code: str) -> Optional[Coupon]:
"""
根据兑换码获取兑换券
"""
stmt = select(Coupon).where(Coupon.code == code)
result = await db.execute(stmt)
return result.scalar_one_or_none()
async def get_unused_coupon_by_code(self, db: AsyncSession, code: str) -> Optional[Coupon]:
"""
根据兑换码获取未使用的兑换券
@@ -45,21 +45,21 @@ class CouponDao(CRUDPlus[Coupon]):
await db.flush()
return coupon
async def create_coupons(self, db: AsyncSession, coupons_data: List[dict]) -> List[Coupon]:
"""
批量创建兑换券
"""
coupons = [Coupon(**data) for data in coupons_data]
db.add_all(coupons)
await db.flush()
return coupons
async def list_codes_by_prefix(self, db: AsyncSession, prefix: str) -> List[str]:
stmt = select(Coupon.code).where(Coupon.code.like(f"{prefix}%"))
result = await db.execute(stmt)
rows = result.all()
return [r[0] for r in rows]
async def create_coupons(self, db: AsyncSession, coupons_data: List[dict]) -> List[Coupon]:
"""
批量创建兑换券
"""
coupons = [Coupon(**data) for data in coupons_data]
db.add_all(coupons)
await db.flush()
return coupons
async def list_codes_by_prefix(self, db: AsyncSession, prefix: str) -> List[str]:
stmt = select(Coupon.code).where(Coupon.code.like(f"{prefix}%"))
result = await db.execute(stmt)
rows = result.all()
return [r[0] for r in rows]
async def mark_as_used(self, db: AsyncSession, user_id: int, coupon: Coupon) -> bool:
"""
标记兑换券为已使用并创建使用记录
@@ -76,6 +76,7 @@ class CouponDao(CRUDPlus[Coupon]):
coupon_id=coupon.id,
user_id=user_id,
points=coupon.points,
plan_id=coupon.plan_id,
coupon_type=coupon.type,
used_at=datetime.now()
)
@@ -116,4 +117,4 @@ class CouponUsageDao(CRUDPlus[CouponUsage]):
coupon_dao = CouponDao(Coupon)
coupon_usage_dao = CouponUsageDao(CouponUsage)
coupon_usage_dao = CouponUsageDao(CouponUsage)

View File

@@ -0,0 +1,90 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Optional, List
from datetime import datetime
from sqlalchemy import select, and_, desc
from sqlalchemy.ext.asyncio import AsyncSession
from backend.app.admin.model.subscribe import SubscriptionPlan, UserSubscription, SubscriptionUsageLog
from sqlalchemy_crud_plus import CRUDPlus
class SubscriptionPlanDAO(CRUDPlus[SubscriptionPlan]):
async def add(self, db: AsyncSession, plan: SubscriptionPlan) -> None:
db.add(plan)
await db.flush()
async def get(self, db: AsyncSession, id: int) -> Optional[SubscriptionPlan]:
return await self.select_model(db, id)
async def get_enabled_plans(self, db: AsyncSession) -> List[SubscriptionPlan]:
"""获取所有启用的订阅计划"""
stmt = select(self.model).where(self.model.status == 'enabled').order_by(self.model.price)
result = await db.execute(stmt)
return list(result.scalars().all())
class UserSubscriptionDAO(CRUDPlus[UserSubscription]):
async def add(self, db: AsyncSession, subscription: UserSubscription) -> None:
db.add(subscription)
await db.flush()
async def get_active_subscription(self, db: AsyncSession, user_id: int) -> Optional[UserSubscription]:
"""
获取用户当前的有效订阅
条件user_id匹配status='active',且当前时间在有效期内
"""
now = datetime.now()
stmt = select(self.model).where(
and_(
self.model.user_id == user_id,
self.model.status == 'active',
self.model.current_cycle_start_at <= now,
self.model.current_cycle_end_at > now
)
).order_by(desc(self.model.current_cycle_end_at)).limit(1)
result = await db.execute(stmt)
return result.scalars().first()
async def get_latest_subscription(self, db: AsyncSession, user_id: int) -> Optional[UserSubscription]:
"""获取用户最近的一条订阅记录(无论状态)"""
stmt = select(self.model).where(
self.model.user_id == user_id
).order_by(desc(self.model.created_time)).limit(1)
result = await db.execute(stmt)
return result.scalars().first()
async def get_by_order_id(self, db: AsyncSession, order_id: int) -> Optional[UserSubscription]:
stmt = select(self.model).where(
self.model.last_order_id == order_id
).order_by(desc(self.model.created_time)).limit(1)
result = await db.execute(stmt)
return result.scalars().first()
async def increment_usage(self, db: AsyncSession, subscription_id: int, amount: int) -> None:
"""原子增加订阅使用量"""
from sqlalchemy import update
stmt = update(self.model).where(
self.model.id == subscription_id
).values(
user_cycle_usage=self.model.user_cycle_usage + amount
)
await db.execute(stmt)
class SubscriptionUsageLogDAO(CRUDPlus[SubscriptionUsageLog]):
async def add(self, db: AsyncSession, log: SubscriptionUsageLog) -> None:
db.add(log)
await db.flush()
async def get_cycle_usage(self, db: AsyncSession, user_subscription_id: int) -> int:
"""统计某个订阅记录周期内的总使用量(基于日志聚合,作为校对用)"""
# 注意:实际业务判断通常直接读 UserSubscription.user_cycle_usage这里仅作备用
pass
subscription_plan_dao = SubscriptionPlanDAO(SubscriptionPlan)
user_subscription_dao = UserSubscriptionDAO(UserSubscription)
subscription_usage_log_dao = SubscriptionUsageLogDAO(SubscriptionUsageLog)

View File

@@ -16,6 +16,7 @@ class Coupon(Base):
code: Mapped[str] = mapped_column(String(32), unique=True, nullable=False, comment='兑换码')
type: Mapped[str] = mapped_column(String(32), nullable=False, comment='兑换码类型')
points: Mapped[int] = mapped_column(BigInteger, nullable=False, comment='兑换积分')
plan_id: Mapped[Optional[int]] = mapped_column(BigInteger, nullable=True, comment='订阅计划ID(仅订阅券有效)')
is_used: Mapped[bool] = mapped_column(Boolean, default=False, comment='是否已使用')
expires_at: Mapped[Optional[datetime]] = mapped_column(DateTime, default=None, comment='过期时间')
@@ -34,6 +35,7 @@ class CouponUsage(Base):
coupon_type: Mapped[str] = mapped_column(String(32), nullable=False, comment='兑换券类型')
user_id: Mapped[int] = mapped_column(BigInteger, ForeignKey('wx_user.id'), nullable=False, comment='使用者ID')
points: Mapped[int] = mapped_column(BigInteger, nullable=False, comment='兑换积分')
plan_id: Mapped[Optional[int]] = mapped_column(BigInteger, nullable=True, comment='订阅计划ID')
used_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now(), comment='使用时间')
__table_args__ = (

View File

@@ -0,0 +1,70 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from datetime import datetime
from typing import Optional
from sqlalchemy import String, BigInteger, DateTime, Integer, Boolean, ForeignKey, Index
from sqlalchemy.dialects.mysql import JSON as MySQLJSON
from sqlalchemy.orm import Mapped, mapped_column
from backend.common.model import Base, snowflake_id_key
class SubscriptionPlan(Base):
__tablename__ = 'subscription_plan'
id: Mapped[snowflake_id_key] = mapped_column(BigInteger, init=False, primary_key=True)
name: Mapped[str] = mapped_column(String(64), nullable=False, comment='订阅计划名称')
price: Mapped[int] = mapped_column(Integer, nullable=False, comment='价格(分)')
cycle_type: Mapped[str] = mapped_column(String(16), nullable=False, comment='周期类型month/year')
cycle_length: Mapped[int] = mapped_column(Integer, nullable=False, default=1, comment='周期长度')
max_cycle_usage: Mapped[int] = mapped_column(BigInteger, default=0, comment='周期内最大使用量限制(0表示无限制)')
status: Mapped[str] = mapped_column(String(16), default='enabled', comment='状态enabled/disabled')
features: Mapped[Optional[dict]] = mapped_column(MySQLJSON, default=None, nullable=True, comment='功能特性配置')
__table_args__ = (
Index('idx_sub_plan_status', 'status'),
{'comment': '订阅套餐计划表'}
)
class UserSubscription(Base):
__tablename__ = 'user_subscription'
id: Mapped[snowflake_id_key] = mapped_column(BigInteger, init=False, primary_key=True)
user_id: Mapped[int] = mapped_column(BigInteger, ForeignKey('wx_user.id'), nullable=False, comment='用户ID')
plan_id: Mapped[int] = mapped_column(BigInteger, ForeignKey('subscription_plan.id'), nullable=False, comment='订阅计划ID')
current_cycle_start_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, comment='当前周期开始时间')
current_cycle_end_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, comment='当前周期结束时间')
last_order_id: Mapped[Optional[int]] = mapped_column(BigInteger, ForeignKey('wx_order.id'), nullable=True, comment='最近一次支付订单ID')
status: Mapped[str] = mapped_column(String(16), default='active', comment='状态active/expired/canceled/pending')
auto_renew: Mapped[bool] = mapped_column(Boolean, default=False, comment='是否自动续费')
user_cycle_usage: Mapped[int] = mapped_column(BigInteger, default=0, comment='当前周期已用量')
actual_paid_amount: Mapped[int] = mapped_column(Integer, default=0, comment='当前周期实付金额(分),用于计算退费')
refund_amount: Mapped[int] = mapped_column(Integer, default=0, comment='已退款金额(分)')
refund_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), nullable=True, default=None, comment='退款时间')
__table_args__ = (
Index('idx_user_sub_user_status', 'user_id', 'status'),
Index('idx_user_sub_end_time', 'current_cycle_end_at'),
{'comment': '用户订阅表'}
)
class SubscriptionUsageLog(Base):
__tablename__ = 'subscription_usage_log'
id: Mapped[snowflake_id_key] = mapped_column(BigInteger, init=False, primary_key=True)
user_id: Mapped[int] = mapped_column(BigInteger, ForeignKey('wx_user.id'), nullable=False, comment='用户ID')
user_subscription_id: Mapped[int] = mapped_column(BigInteger, ForeignKey('user_subscription.id'), nullable=False, comment='关联的用户订阅ID')
plan_id: Mapped[int] = mapped_column(BigInteger, nullable=False, comment='当时的订阅计划ID')
usage_type: Mapped[str] = mapped_column(String(32), nullable=False, comment='使用类型image/chat/etc')
business_id: Mapped[Optional[int]] = mapped_column(BigInteger, nullable=True, comment='关联业务ID')
usage_amount: Mapped[int] = mapped_column(Integer, nullable=False, default=1, comment='本次消耗数量')
used_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=datetime.now, comment='使用时间')
__table_args__ = (
Index('idx_sub_usage_user_time', 'user_id', 'used_at'),
Index('idx_sub_usage_sub_id', 'user_subscription_id'),
{'comment': '订阅使用记录表'}
)

View File

@@ -22,6 +22,8 @@ class PointsBalanceInfo(BaseModel):
frozen_balance: int = Field(default=0, description="当前冻结积分")
total_purchased: int = Field(default=0, description="累计获得积分")
total_refunded: int = Field(default=0, description="累计退款积分")
is_subscribed: bool = Field(default=False, description="是否处于订阅模式")
subscription_expires_at: Optional[str] = Field(default=None, description="订阅到期时间(无订阅时为 None")
class PointsLogSchema(BaseModel):

View File

@@ -20,3 +20,12 @@ class InitProductsRequest(BaseModel):
class InitProductsResponse(BaseModel):
count: int = Field(...)
class SubscriptionPlanItem(BaseModel):
id: str = Field(...)
name: str = Field(...)
price: int = Field(...)
cycle_type: str = Field(...)
cycle_length: int = Field(...)
max_cycle_usage: int = Field(...)
features: Optional[dict] = Field(None)

View File

@@ -9,7 +9,7 @@ from backend.common.schema import SchemaBase
class CreateJsapiOrderRequest(BaseModel):
product_id: int = Field(..., description='积分商品ID')
product_id: int = Field(..., description='商品ID(积分商品或订阅计划)')
class CreateJsapiOrderResponse(BaseModel):

View File

@@ -9,22 +9,22 @@ from backend.common.exception import errors
from datetime import datetime, timedelta
class CouponService:
@staticmethod
def generate_unique_code(length: int = 6) -> str:
"""
生成唯一的兑换码
"""
characters = string.ascii_uppercase + string.digits
# 移除容易混淆的字符
characters = characters.replace('0', '').replace('O', '').replace('I', '').replace('1')
while True:
code = ''.join(random.choice(characters) for _ in range(length))
# 确保生成的兑换码不包含敏感词汇或重复模式
if not CouponService._has_sensitive_pattern(code):
return code
class CouponService:
@staticmethod
def generate_unique_code(length: int = 6) -> str:
"""
生成唯一的兑换码
"""
characters = string.ascii_uppercase + string.digits
# 移除容易混淆的字符
characters = characters.replace('0', '').replace('O', '').replace('I', '').replace('1')
while True:
code = ''.join(random.choice(characters) for _ in range(length))
# 确保生成的兑换码不包含敏感词汇或重复模式
if not CouponService._has_sensitive_pattern(code):
return code
@staticmethod
def _has_sensitive_pattern(code: str) -> bool:
@@ -44,7 +44,7 @@ class CouponService:
return False
@staticmethod
async def create_coupon(points: int, expires_days: Optional[int] = None) -> Coupon:
async def create_coupon(points: int, expires_days: Optional[int] = None, plan_id: Optional[int] = None, type: str = 'GENERAL') -> Coupon:
"""
创建单个兑换券
"""
@@ -64,6 +64,8 @@ class CouponService:
coupon_data = {
'code': code,
'points': points,
'plan_id': plan_id,
'type': type,
'expires_at': expires_at
}
@@ -71,76 +73,86 @@ class CouponService:
return coupon
@staticmethod
async def batch_create_coupons(count: int, points: int, expires_days: Optional[int] = None) -> List[Coupon]:
"""
批量创建兑换券
"""
async with async_db_session.begin() as db:
coupons_data = []
# 生成唯一兑换码列表
codes = set()
while len(codes) < count:
code = CouponService.generate_unique_code()
if code not in codes:
# 检查数据库中是否已存在该兑换码
existing_coupon = await coupon_dao.get_by_code(db, code)
if not existing_coupon:
codes.add(code)
# 设置过期时间
expires_at = None
if expires_days:
expires_at = datetime.now() + timedelta(days=expires_days)
# 准备数据
for code in codes:
coupons_data.append({
'code': code,
'points': points,
'expires_at': expires_at
})
coupons = await coupon_dao.create_coupons(db, coupons_data)
return coupons
@staticmethod
async def init_coupons(prefix: str, count: int) -> int:
if not prefix or not any(ch.isalpha() for ch in prefix):
raise errors.BadRequestError(msg='前缀至少包含一个字母')
prefix = ''.join([ch for ch in prefix.upper() if ch.isalpha()])
if len(prefix) not in (3, 4):
raise errors.BadRequestError(msg='前缀长度必须为3或4')
digits = 6 - len(prefix)
max_serial = 10 ** digits - 1
async with async_db_session.begin() as db:
existing_codes = await coupon_dao.list_codes_by_prefix(db, prefix)
current_max = -1
for c in existing_codes:
if len(c) == 6 and c.startswith(prefix):
suffix = c[len(prefix):]
if len(suffix) == digits and suffix.isdigit():
n = int(suffix)
if n > current_max:
current_max = n
start = current_max + 1
if start > max_serial:
from backend.common.log import log as logger
logger.warning(f"{prefix} 前缀已达到最大序号 {max_serial},不再生成兑换券")
return 0
to_generate = min(max(0, count), max_serial - start + 1)
coupons_data = []
for n in range(start, start + to_generate):
code = f"{prefix}{str(n).zfill(digits)}"
coupons_data.append({
'code': code,
'type': prefix,
'points': 0,
'expires_at': None
})
await coupon_dao.create_coupons(db, coupons_data)
return to_generate
async def batch_create_coupons(count: int, points: int, expires_days: Optional[int] = None, plan_id: Optional[int] = None, type: str = 'BATCH') -> List[Coupon]:
"""
批量创建兑换券
"""
async with async_db_session.begin() as db:
coupons_data = []
# 生成唯一兑换码列表
codes = set()
while len(codes) < count:
code = CouponService.generate_unique_code()
if code not in codes:
# 检查数据库中是否已存在该兑换码
existing_coupon = await coupon_dao.get_by_code(db, code)
if not existing_coupon:
codes.add(code)
# 设置过期时间
expires_at = None
if expires_days:
expires_at = datetime.now() + timedelta(days=expires_days)
# 准备数据
for code in codes:
coupons_data.append({
'code': code,
'points': points,
'plan_id': plan_id,
'type': type,
'expires_at': expires_at
})
coupons = await coupon_dao.create_coupons(db, coupons_data)
return coupons
@staticmethod
async def init_coupons(prefix: str, count: int, plan_id: Optional[int] = None) -> int:
if not prefix or not any(ch.isalpha() for ch in prefix):
raise errors.BadRequestError(msg='前缀至少包含一个字母')
prefix = ''.join([ch for ch in prefix.upper() if ch.isalpha()])
if len(prefix) not in (3, 4):
raise errors.BadRequestError(msg='前缀长度必须为3或4')
digits = 6 - len(prefix)
max_serial = 10 ** digits - 1
async with async_db_session.begin() as db:
if plan_id:
from backend.app.admin.crud.subscribe_crud import subscription_plan_dao
plan = await subscription_plan_dao.get(db, plan_id)
if not plan:
raise errors.NotFoundError(msg=f"订阅计划(id={plan_id})不存在")
existing_codes = await coupon_dao.list_codes_by_prefix(db, prefix)
current_max = -1
for c in existing_codes:
if len(c) == 6 and c.startswith(prefix):
suffix = c[len(prefix):]
if len(suffix) == digits and suffix.isdigit():
n = int(suffix)
if n > current_max:
current_max = n
start = current_max + 1
if start > max_serial:
from backend.common.log import log as logger
logger.warning(f"{prefix} 前缀已达到最大序号 {max_serial},不再生成兑换券")
return 0
to_generate = min(max(0, count), max_serial - start + 1)
coupons_data = []
for n in range(start, start + to_generate):
code = f"{prefix}{str(n).zfill(digits)}"
coupons_data.append({
'code': code,
'type': prefix,
'points': 0,
'plan_id': plan_id,
'expires_at': None
})
await coupon_dao.create_coupons(db, coupons_data)
return to_generate
@staticmethod
async def redeem_coupon(code: str, user_id: int) -> dict:
"""
@@ -173,15 +185,23 @@ class CouponService:
if not success:
raise errors.ServerError(msg='兑换失败,请稍后重试')
# 调用 points_service add_points 方法为用户增加为积分
success = await points_service.add_points_from_coupon(user_id, coupon.points, coupon.id)
if coupon.plan_id:
# 兑换订阅
from backend.app.admin.service.subscribe_service import subscribe_service
await subscribe_service.create_or_renew_subscription(
db, user_id, coupon.plan_id, order_id=None, actual_paid_amount=0
)
else:
# 调用 points_service add_points 方法为用户增加为积分
success = await points_service.add_points_from_coupon(user_id, coupon.points, coupon.id)
if not success:
raise errors.ServerError(msg='兑换积分失败,请稍后重试')
if not success:
raise errors.ServerError(msg='兑换积分失败,请稍后重试')
return {
'code': coupon.code,
'points': coupon.points,
'plan_id': coupon.plan_id,
'used_at': datetime.now()
}

View File

@@ -739,7 +739,12 @@ class FileService:
db_file.id,
UpdateFileParam(
storage_path=cloud_path,
details={"status": "pending", "cloud_path": cloud_path, "wx_user_id": wx_user_id},
details={
"status": "pending",
"cloud_path": cloud_path,
"wx_user_id": wx_user_id,
"key": cloud_path,
},
),
)
db_file.storage_path = cloud_path
@@ -824,6 +829,7 @@ class FileService:
"download_url": url,
"download_url_expire_ts": expire_ts,
"wx_user_id": wx_user_id,
"key": cloud_path,
},
),
)
@@ -865,6 +871,7 @@ class FileService:
"download_url": url,
"download_url_expire_ts": expire_ts,
"wx_user_id": wx_user_id,
"key": cloud_path,
},
),
)
@@ -890,6 +897,9 @@ class FileService:
cos = CosClient()
if original:
cos_key = details.get("key")
if not cos_key:
base_path = cloud_path or ""
cos_key = base_path.replace("_avif", "")
url = details.get("download_origin_url")
expire_ts = int(details.get("download_origin_url_expire_ts") or 0)
from datetime import datetime, timezone as dt_tz

View File

@@ -189,59 +189,61 @@ class PointsService:
if not points_account:
return False
available = max(0, (points_account.balance or 0) - (points_account.frozen_balance or 0))
if available <= 0:
return False
# 仅扣除当前可用余额与请求金额中的较小值
deduct_amount = min(available, amount)
current_balance = points_account.balance
# 批次扣减(按 FIFO
remaining = deduct_amount
alloc_details = []
if action == POINTS_ACTION_REFUND_DEDUCT and related_id is not None:
target_lot = await points_lot_dao.get_by_order(db, related_id)
if not target_lot:
return False
take = min(target_lot.points_remaining, remaining)
if take > 0:
await points_lot_dao.deduct_from_lot(db, target_lot.id, take)
alloc_details.append({"lot_id": target_lot.id, "order_id": target_lot.order_id, "points": take})
remaining -= take
else:
lot_items = await points_lot_dao.list_available(db, user_id)
for lot in lot_items:
if remaining <= 0:
break
take = min(lot.points_remaining, remaining)
spend_log = None
new_balance = current_balance
if deduct_amount > 0:
if action == POINTS_ACTION_REFUND_DEDUCT and related_id is not None:
target_lot = await points_lot_dao.get_by_order(db, related_id)
if not target_lot:
return False
take = min(target_lot.points_remaining, remaining)
if take > 0:
await points_lot_dao.deduct_from_lot(db, lot.id, take)
alloc_details.append({"lot_id": lot.id, "order_id": lot.order_id, "points": take})
await points_lot_dao.deduct_from_lot(db, target_lot.id, take)
alloc_details.append({"lot_id": target_lot.id, "order_id": target_lot.order_id, "points": take})
remaining -= take
# 扣减账户余额(仅扣已分摊的金额)
result = await points_dao.deduct_balance_atomic(db, user_id, deduct_amount)
if not result:
return False
new_balance = current_balance - deduct_amount
# 记录积分变动日志
if action is None:
action = POINTS_ACTION_SPEND
spend_log = await points_log_dao.add_log(db, {
"user_id": user_id,
"action": action,
"amount": deduct_amount,
"balance_after": new_balance,
"related_id": related_id,
"details": details or {}
})
if action == POINTS_ACTION_REFUND_DEDUCT:
await points_dao.add_refunded_atomic(db, user_id, amount)
# 持久化分摊记录
for a in alloc_details:
alloc = PointsConsumptionAlloc(user_id=user_id, lot_id=a["lot_id"], spend_log_id=spend_log.id, points=a["points"])
await points_alloc_dao.add(db, alloc)
# 如果请求金额大于可扣金额,则为欠费部分创建待扣记录
else:
lot_items = await points_lot_dao.list_available(db, user_id)
for lot in lot_items:
if remaining <= 0:
break
take = min(lot.points_remaining, remaining)
if take > 0:
await points_lot_dao.deduct_from_lot(db, lot.id, take)
alloc_details.append({"lot_id": lot.id, "order_id": lot.order_id, "points": take})
remaining -= take
result = await points_dao.deduct_balance_atomic(db, user_id, deduct_amount)
if not result:
return False
new_balance = current_balance - deduct_amount
if action is None:
action = POINTS_ACTION_SPEND
spend_log = await points_log_dao.add_log(db, {
"user_id": user_id,
"action": action,
"amount": deduct_amount,
"balance_after": new_balance,
"related_id": related_id,
"details": details or {}
})
if action == POINTS_ACTION_REFUND_DEDUCT:
await points_dao.add_refunded_atomic(db, user_id, amount)
for a in alloc_details:
alloc = PointsConsumptionAlloc(
user_id=user_id,
lot_id=a["lot_id"],
spend_log_id=spend_log.id,
points=a["points"]
)
await points_alloc_dao.add(db, alloc)
# 如果请求金额大于可扣金额,则为欠费部分创建待扣记录(支持全额欠费)
if amount > deduct_amount:
debt_amount = amount - deduct_amount
await points_debt_dao.add_pending(db, user_id, debt_amount, related_id, details)

View File

@@ -3,7 +3,9 @@
from typing import List
from backend.app.admin.crud.points_product_crud import points_product_dao
from backend.app.admin.crud.subscribe_crud import subscription_plan_dao
from backend.app.admin.model.points_product import PointsProduct
from backend.app.admin.model.subscribe import SubscriptionPlan
from backend.database.db import async_db_session
from sqlalchemy import select, and_, or_, exists
from backend.app.admin.model.wx_pay import WxOrder
@@ -22,7 +24,6 @@ class ProductService:
{"title": "+50%", "description": "加赠50积分", "points": 150, "amount_cents": 500, "one_time": False},
{"title": "+100%", "description": "加赠200积分", "points": 400, "amount_cents": 1000, "one_time": False},
{"title": "+200%", "description": "加赠800积分", "points": 1200, "amount_cents": 2000, "one_time": False},
# {"title": "100积分", "description": "测试100积分", "points": 100, "amount_cents": 1, "one_time": False},
]
payload = items or default
async with async_db_session.begin() as db:
@@ -40,15 +41,59 @@ class ProductService:
count += 1
return count
@staticmethod
async def init_plans(items: List[dict] | None = None) -> int:
default = [
{
"name": "月度订阅",
"price": 1990,
"cycle_type": "month",
"cycle_length": 1,
"max_cycle_usage": 0,
"features": {"label": "-10%"},
},
{
"name": "季度订阅",
"price": 4990,
"cycle_type": "month",
"cycle_length": 3,
"max_cycle_usage": 0,
"features": {"label": "-20%"},
},
{
"name": "年度订阅",
"price": 12990,
"cycle_type": "year",
"cycle_length": 1,
"max_cycle_usage": 0,
"features": {"label": "-50%"},
},
]
payload = items or default
async with async_db_session.begin() as db:
count = 0
for it in payload:
plan = SubscriptionPlan(
name=it["name"],
price=it["price"],
cycle_type=it["cycle_type"],
cycle_length=it["cycle_length"],
max_cycle_usage=it.get("max_cycle_usage", 0),
status="enabled",
features=it.get("features"),
)
await subscription_plan_dao.add(db, plan)
count += 1
return count
@staticmethod
async def list_for_user(user_id: int) -> List[PointsProduct]:
"""返回用户可购买的积分商品列表(过滤一次性已购买商品)。"""
async with async_db_session.begin() as db:
subq = select(WxOrder.id).where(
and_(
WxOrder.user_id == user_id,
WxOrder.product_id == PointsProduct.id,
WxOrder.trade_state == 'SUCCESS',
WxOrder.trade_state == "SUCCESS",
)
).limit(1)
stmt = (

View File

@@ -0,0 +1,185 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from datetime import datetime, timedelta
from typing import Optional
from dateutil.relativedelta import relativedelta
from sqlalchemy.ext.asyncio import AsyncSession
from backend.app.admin.crud.subscribe_crud import (
subscription_plan_dao,
user_subscription_dao,
subscription_usage_log_dao
)
from backend.app.admin.model.subscribe import UserSubscription, SubscriptionUsageLog
from backend.common.exception import errors
class SubscribeService:
async def has_active_subscription(
self,
db: AsyncSession,
user_id: int,
) -> bool:
subscription = await user_subscription_dao.get_active_subscription(db, user_id)
return bool(subscription)
async def check_and_record_usage(
self,
db: AsyncSession,
user_id: int,
usage_type: str,
amount: int,
business_id: Optional[int] = None
) -> bool:
"""
统一计费入口:检查用户是否可以使用订阅额度
Returns:
bool: True 表示订阅已覆盖本次消耗,无需扣减积分
False 表示无有效订阅或订阅额度不足,需走积分逻辑
"""
# 1. 获取用户当前有效订阅
subscription = await user_subscription_dao.get_active_subscription(db, user_id)
if not subscription:
return False
# 2. 获取订阅计划详情(为了检查 max_cycle_usage
plan = await subscription_plan_dao.get(db, subscription.plan_id)
if not plan:
# 异常情况:有订阅但找不到计划,保险起见返回 False 走积分
return False
# 3. 检查硬限制 (max_cycle_usage)
# 如果 max_cycle_usage > 0则需要检查额度
if plan.max_cycle_usage > 0:
if subscription.user_cycle_usage + amount > plan.max_cycle_usage:
# 额度不足,拒绝使用订阅(根据策略,这里可以选择直接拒绝任务,或者回退到积分)
# 根据用户需求:"订阅模式下不考虑减扣积分...如果有订阅则不需要减扣积分"
# 这里如果订阅额度用完了,应该算作"订阅失效"的一种特例?
# 或者更严格地:订阅用户额度用完 = 任务失败。
# 但为了灵活性,这里返回 False允许业务层决定是否 fallback 到积分。
# 如果业务层希望严格执行订阅限制,可以根据 False 再去判断是否有 active subscription
# 但为了简单,假设返回 False 就意味着"订阅没兜住"
return False
# 4. 记录使用日志
usage_log = SubscriptionUsageLog(
user_id=user_id,
user_subscription_id=subscription.id,
plan_id=subscription.plan_id,
usage_type=usage_type,
usage_amount=amount,
business_id=business_id,
used_at=datetime.now()
)
await subscription_usage_log_dao.add(db, usage_log)
# 5. 更新周期使用量
await user_subscription_dao.increment_usage(db, subscription.id, amount)
return True
async def create_or_renew_subscription(
self,
db: AsyncSession,
user_id: int,
plan_id: int,
order_id: Optional[int] = None,
actual_paid_amount: int = 0
) -> UserSubscription:
plan = await subscription_plan_dao.get(db, plan_id)
if not plan:
raise errors.NotFoundError(msg="订阅计划不存在")
active_sub = await user_subscription_dao.get_active_subscription(db, user_id)
now = datetime.now()
today = now.date()
if active_sub and active_sub.plan_id != plan_id:
await user_subscription_dao.update(db, active_sub.id, {"status": "canceled"})
if active_sub and active_sub.plan_id == plan_id and active_sub.current_cycle_end_at > now:
start_date = active_sub.current_cycle_end_at.date()
else:
start_date = today
start_at = datetime.combine(start_date, datetime.min.time())
if plan.cycle_type == 'month':
end_at = start_at + timedelta(days=30 * plan.cycle_length)
elif plan.cycle_type == 'year':
end_at = start_at + relativedelta(years=plan.cycle_length)
else:
end_at = start_at + timedelta(days=plan.cycle_length)
new_sub = UserSubscription(
user_id=user_id,
plan_id=plan_id,
status='active',
current_cycle_start_at=start_at,
current_cycle_end_at=end_at,
auto_renew=False,
last_order_id=order_id,
user_cycle_usage=0,
actual_paid_amount=actual_paid_amount,
refund_amount=0
)
await user_subscription_dao.add(db, new_sub)
return new_sub
async def compute_refund_amount_for_order(
self,
db: AsyncSession,
user_id: int,
order_id: int
) -> dict:
sub = await user_subscription_dao.get_by_order_id(db, order_id)
if not sub or sub.user_id != user_id:
raise errors.NotFoundError(msg="当前无有效订阅")
now = datetime.now()
total_delta = sub.current_cycle_end_at - sub.current_cycle_start_at
total_seconds = total_delta.total_seconds()
if total_seconds <= 0:
refund_val = 0
else:
remaining_delta = sub.current_cycle_end_at - now
remaining_seconds = max(0, remaining_delta.total_seconds())
refund_val = int(sub.actual_paid_amount * (remaining_seconds / total_seconds))
return {
"refund_amount": refund_val,
"subscription_id": sub.id
}
async def cancel_subscription_for_order(
self,
db: AsyncSession,
user_id: int,
order_id: int,
refund_amount: Optional[int] = None
) -> dict:
sub = await user_subscription_dao.get_by_order_id(db, order_id)
if not sub:
raise errors.NotFoundError(msg="当前无有效订阅")
now = datetime.now()
if refund_amount is None:
data = await self.compute_refund_amount_for_order(db, user_id, order_id)
refund_val = data["refund_amount"]
else:
refund_val = refund_amount
await user_subscription_dao.update(
db,
sub.id,
{
"status": "canceled",
"refund_amount": refund_val,
"refund_at": now
}
)
return {
"refund_amount": refund_val,
"subscription_id": sub.id
}
subscribe_service = SubscribeService()

View File

@@ -132,6 +132,63 @@ class WxPayService:
'paySign': pay_sign,
}
@staticmethod
async def create_jsapi_subscription_order(user_id: int, payer_openid: str, plan_id: int) -> dict:
async with async_db_session.begin() as db:
from backend.utils.snowflake import snowflake
generated_id = snowflake.generate()
from backend.app.admin.crud.subscribe_crud import subscription_plan_dao
plan = await subscription_plan_dao.get(db, plan_id)
if not plan or plan.status != 'enabled':
raise RuntimeError('订阅计划不可用')
order = WxOrder(
user_id=user_id,
out_trade_no=str(generated_id),
description=f"订阅 {plan.name}",
amount_cents=plan.price,
payer_openid=payer_openid,
trade_state='NOTPAY',
product_id=plan_id,
points=None,
)
order.id = generated_id
await wx_order_dao.add(db, order)
await db.flush()
await db.refresh(order)
notify_url = f"{settings.SERVER_HOST}:{settings.SERVER_PORT}{settings.FASTAPI_API_V1_PATH}/wxpay/notify"
wxpay = WxPayService._build_wxpay_instance(notify_url)
payer = {"openid": payer_openid}
result = WxPayService._safe_call(
wxpay.pay,
description=f"订阅 {plan.name}",
out_trade_no=str(order.id),
amount={"total": plan.price, "currency": "CNY"},
pay_type=WeChatPayType.JSAPI,
payer=payer,
)
data = WxPayService._parse_result(result)
prepay_id = data.get('prepay_id') or ''
if prepay_id:
await wx_order_dao.set_prepay_id(db, order.id, prepay_id)
app_id = settings.WX_SP_APPID or settings.WX_APPID
timestamp = str(int(datetime.now().timestamp()))
nonce_str = str(order.id)
package = f"prepay_id={prepay_id}"
sign_type = "RSA"
pay_sign = wxpay.sign([app_id, timestamp, nonce_str, package])
return {
'out_trade_no': str(order.id),
'prepay_id': prepay_id,
'trade_state': order.trade_state,
'appId': app_id,
'timeStamp': timestamp,
'nonceStr': nonce_str,
'package': package,
'signType': sign_type,
'paySign': pay_sign,
}
@staticmethod
async def query_order(out_trade_no: str) -> dict:
async with async_db_session.begin() as db:
@@ -207,7 +264,9 @@ class WxPayService:
'refundable_amount_cents': 0,
'amount_per_point': 0,
}
return await WxPayService._compute_refund_amount_for_order(db, order)
if order.points and order.points > 0:
return await WxPayService._compute_refund_amount_for_order(db, order)
return await WxPayService._compute_subscription_refund_amount(db, order)
@staticmethod
async def _compute_refund_amount_for_order(db, order: WxOrder) -> dict:
@@ -240,6 +299,34 @@ class WxPayService:
'amount_per_point': amt_per_point,
}
@staticmethod
async def _compute_subscription_refund_amount(db, order: WxOrder) -> dict:
from backend.app.admin.crud.subscribe_crud import user_subscription_dao
from datetime import datetime as dt
sub = await user_subscription_dao.get_by_order_id(db, order.id)
if not sub:
return {
'order_id': str(order.id),
'refundable_points': 0,
'refundable_amount_cents': 0,
'amount_per_point': 0,
}
now = dt.now()
total_delta = sub.current_cycle_end_at - sub.current_cycle_start_at
total_seconds = total_delta.total_seconds()
if total_seconds <= 0:
refundable_amount = 0
else:
remaining_delta = sub.current_cycle_end_at - now
remaining_seconds = max(0, remaining_delta.total_seconds())
refundable_amount = int(sub.actual_paid_amount * (remaining_seconds / total_seconds))
return {
'order_id': str(order.id),
'refundable_points': 0,
'refundable_amount_cents': refundable_amount,
'amount_per_point': 0,
}
@staticmethod
async def list_orders_for_user(user_id: int, page: int, size: int) -> dict:
async with async_db_session() as db:
@@ -311,17 +398,23 @@ class WxPayService:
async def create_refund(user_id: int, out_trade_no: str, amount_cents: int, reason: Optional[str] = None) -> dict:
async with async_db_session.begin() as db:
from backend.utils.snowflake import snowflake
from backend.app.admin.service.points_service import points_service
# 使用退款记录主键 id 作为 out_refund_no
generated_id = snowflake.generate()
compute = await WxPayService.compute_refund_amount_for_out_trade(out_trade_no)
max_refund = int(compute.get('refundable_amount_cents') or 0)
amt_per_point = float(compute.get('amount_per_point') or 0)
final_amount = min(amount_cents or 0, max_refund)
points_to_freeze = int(final_amount / amt_per_point) if amt_per_point > 0 else 0
order_obj = await wx_order_dao.get_by_out_trade_no(db, out_trade_no)
if order_obj:
if not order_obj:
raise RuntimeError('订单不存在')
if order_obj.points and order_obj.points > 0:
from backend.app.admin.service.points_service import points_service
compute = await WxPayService._compute_refund_amount_for_order(db, order_obj)
max_refund = int(compute.get('refundable_amount_cents') or 0)
amt_per_point = float(compute.get('amount_per_point') or 0)
final_amount = min(amount_cents or 0, max_refund)
points_to_freeze = int(final_amount / amt_per_point) if amt_per_point > 0 else 0
await points_service.freeze_points_for_order(db, order_obj.user_id, order_obj.id, points_to_freeze)
else:
from backend.app.admin.service.subscribe_service import subscribe_service
data = await subscribe_service.compute_refund_amount_for_order(db, user_id, order_obj.id)
max_refund = int(data.get('refund_amount') or 0)
final_amount = min(amount_cents or 0, max_refund)
refund = WxRefund(
user_id=user_id,
out_trade_no=out_trade_no,
@@ -426,22 +519,43 @@ class WxPayService:
event_type: str | None = None) -> None:
async with async_db_session.begin() as db:
order = await wx_order_dao.get(db, order_id)
if not order or not order.points or order.points <= 0:
if not order:
return
if getattr(order, 'points_granted', False):
return
from backend.app.admin.crud.points_crud import points_log_dao
from backend.common.const import POINTS_ACTION_RECHARGE
exists = await points_log_dao.has_log_by_related(db, order.user_id, order.id, POINTS_ACTION_RECHARGE)
if exists:
await wx_order_dao.update_model(db, order_id, {'points_granted': True})
return
from backend.app.admin.service.points_service import points_service
try:
ok = await points_service.add_points_from_order_with_db(db, order.user_id, order.id, order.points, order.amount_cents)
if ok:
if order.points and order.points > 0:
if getattr(order, 'points_granted', False):
return
from backend.app.admin.crud.points_crud import points_log_dao
from backend.common.const import POINTS_ACTION_RECHARGE
exists = await points_log_dao.has_log_by_related(db, order.user_id, order.id, POINTS_ACTION_RECHARGE)
if exists:
await wx_order_dao.update_model(db, order_id, {'points_granted': True})
# 成功发放后记录统一兼容的日志
return
from backend.app.admin.service.points_service import points_service
try:
ok = await points_service.add_points_from_order_with_db(db, order.user_id, order.id, order.points, order.amount_cents)
if ok:
await wx_order_dao.update_model(db, order_id, {'points_granted': True})
log = WxPayNotifyLog(
out_trade_no=order.out_trade_no,
event_type=event_type or ('QUERY.SUCCESS' if source == 'query' else 'TRANSACTION.SUCCESS'),
verified=verified,
raw_text=raw_text,
raw_json=raw_json,
)
await wx_pay_notify_dao.add(db, log)
except Exception as e:
logging.error(f"Grant points task failed for order {order_id}: {e}")
else:
from backend.app.admin.crud.subscribe_crud import user_subscription_dao, subscription_plan_dao
from backend.app.admin.service.subscribe_service import subscribe_service
plan = await subscription_plan_dao.get(db, order.product_id) if order.product_id else None
if not plan or plan.status != 'enabled':
return
exists_sub = await user_subscription_dao.get_by_order_id(db, order.id)
if exists_sub:
return
try:
await subscribe_service.create_or_renew_subscription(db, order.user_id, order.product_id, order.id, order.amount_cents)
log = WxPayNotifyLog(
out_trade_no=order.out_trade_no,
event_type=event_type or ('QUERY.SUCCESS' if source == 'query' else 'TRANSACTION.SUCCESS'),
@@ -450,8 +564,8 @@ class WxPayService:
raw_json=raw_json,
)
await wx_pay_notify_dao.add(db, log)
except Exception as e:
logging.error(f"Grant points task failed for order {order_id}: {e}")
except Exception as e:
logging.error(f"Grant subscription task failed for order {order_id}: {e}")
@staticmethod
async def handle_refund_notify(raw_body: bytes, timestamp: str | None = None, nonce: str | None = None, signature: str | None = None, serial: str | None = None) -> dict:
@@ -519,47 +633,52 @@ class WxPayService:
verified: bool = True, event_type: str | None = None) -> None:
async with async_db_session.begin() as db:
order = await wx_order_dao.get_by_out_trade_no(db, out_trade_no)
if not order or not order.points or order.points <= 0:
if not order:
return
refund = await wx_refund_dao.get_by_out_refund_no(db, out_refund_no)
if not refund:
return
if getattr(refund, 'points_deducted', False):
return
from backend.app.admin.service.points_service import points_service
try:
# 计算尚未解冻的冻结积分(本订单维度)
from backend.app.admin.model.points import PointsLog
from sqlalchemy import select, func
s_freeze = await db.execute(
select(func.coalesce(func.sum(PointsLog.amount), 0)).where(
PointsLog.user_id == order.user_id,
PointsLog.related_id == order.id,
PointsLog.action == 'refund_freeze'
if order.points and order.points > 0:
from backend.app.admin.service.points_service import points_service
try:
from backend.app.admin.model.points import PointsLog
from sqlalchemy import select, func
s_freeze = await db.execute(
select(func.coalesce(func.sum(PointsLog.amount), 0)).where(
PointsLog.user_id == order.user_id,
PointsLog.related_id == order.id,
PointsLog.action == 'refund_freeze'
)
)
)
s_unfreeze = await db.execute(
select(func.coalesce(func.sum(PointsLog.amount), 0)).where(
PointsLog.user_id == order.user_id,
PointsLog.related_id == order.id,
PointsLog.action.in_(['refund_unfreeze','refund_deduct'])
s_unfreeze = await db.execute(
select(func.coalesce(func.sum(PointsLog.amount), 0)).where(
PointsLog.user_id == order.user_id,
PointsLog.related_id == order.id,
PointsLog.action.in_(['refund_unfreeze','refund_deduct'])
)
)
)
frozen_points = int(s_freeze.scalar() or 0) - int(s_unfreeze.scalar() or 0)
if frozen_points > 0:
# 完成时解冻并扣减对应积分(按冻结量)
await points_service.unfreeze_points_for_order(db, order.user_id, order.id, frozen_points, deduct=True)
await wx_refund_dao.update_model(db, refund.id, {'points_deducted': True})
log = WxPayNotifyLog(
out_trade_no=out_refund_no,
event_type=event_type or ('REFUND.QUERY.SUCCESS' if source == 'query' else 'REFUND.SUCCESS'),
verified=verified,
raw_text=raw_text,
raw_json=raw_json,
)
await wx_pay_notify_dao.add(db, log)
except Exception as e:
logging.error(f"Deduct points task failed for refund {out_refund_no}: {e}")
frozen_points = int(s_freeze.scalar() or 0) - int(s_unfreeze.scalar() or 0)
if frozen_points > 0:
await points_service.unfreeze_points_for_order(db, order.user_id, order.id, frozen_points, deduct=True)
except Exception as e:
logging.error(f"Deduct points task failed for refund {out_refund_no}: {e}")
else:
from backend.app.admin.service.subscribe_service import subscribe_service
try:
await subscribe_service.cancel_subscription_for_order(db, order.user_id, order.id, refund.amount_cents)
except Exception as e:
logging.error(f"Cancel subscription task failed for refund {out_refund_no}: {e}")
await wx_refund_dao.update_model(db, refund.id, {'points_deducted': True})
log = WxPayNotifyLog(
out_trade_no=out_refund_no,
event_type=event_type or ('REFUND.QUERY.SUCCESS' if source == 'query' else 'REFUND.SUCCESS'),
verified=verified,
raw_text=raw_text,
raw_json=raw_json,
)
await wx_pay_notify_dao.add(db, log)
@staticmethod
async def download_bill(bill_date: str, bill_type: str = 'ALL') -> dict:

View File

@@ -44,6 +44,7 @@ from backend.app.admin.model.dict import YdDictLanguage, YdDictType, DictCategor
from backend.app.admin.service.yd_dict_service import yd_dict_service
from backend.app.admin.service.points_service import points_service
from backend.app.admin.service.dict_service import dict_service
from backend.app.admin.service.subscribe_service import subscribe_service
from backend.database.redis import redis_client
from backend.app.admin.schema.wx import DictLevel
@@ -195,8 +196,9 @@ class ImageService:
type = params.type
dict_level = DictLevel.LEVEL1.value
# 检查用户积分是否足够(现在积分没有过期概念)
if not await points_service.check_sufficient_points(current_user.id, IMAGE_RECOGNITION_COST):
async with async_db_session.begin() as db:
has_sub = await subscribe_service.has_active_subscription(db, current_user.id)
if not has_sub and not await points_service.check_sufficient_points(current_user.id, IMAGE_RECOGNITION_COST):
raise errors.ForbiddenError(
msg=f'积分不足,请获取积分后继续使用'
)
@@ -331,23 +333,30 @@ class ImageService:
if not result:
raise Exception("Failed to initialize image text")
# Step 4: Deduct user points
task = await image_task_dao.get(db, task_id)
if task:
image = await image_dao.get(db, task.image_id)
if image:
total_tokens = task.result.get("token_usage", {}).get("total_tokens", 0)
points = math.ceil(max(total_tokens, 1)/1000) * IMAGE_RECOGNITION_COST
points_deducted = await points_service.deduct_points_with_db(
user_id=task.user_id,
amount=math.ceil(points),
db=db,
related_id=image.id,
details={"task_id": task_id},
action=POINTS_ACTION_IMAGE_RECOGNITION
used_sub = await subscribe_service.check_and_record_usage(
db,
task.user_id,
"image_recognition",
math.ceil(points),
business_id=image.id,
)
if not points_deducted:
logger.warning(f"Insufficient points for user {task.user_id}: balance is zero, cannot deduct for task {task_id}")
if not used_sub:
points_deducted = await points_service.deduct_points_with_db(
user_id=task.user_id,
amount=math.ceil(points),
db=db,
related_id=image.id,
details={"task_id": task_id},
action=POINTS_ACTION_IMAGE_RECOGNITION
)
if not points_deducted:
logger.warning(f"Insufficient points for user {task.user_id}: balance is zero, cannot deduct for task {task_id}")
# Step 5: Update task status to completed
await ImageService._update_task_status_with_db(task_id, ImageTaskStatus.COMPLETED, db)

View File

@@ -8,6 +8,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from backend.app.ai.model.image_task import ImageTaskStatus, ImageProcessingTask
from backend.app.ai.crud.image_task_crud import image_task_dao
from backend.app.admin.service.points_service import points_service
from backend.app.admin.service.subscribe_service import subscribe_service
from backend.app.ai.service.rate_limit_service import rate_limit_service
from backend.database.db import background_db_session
from backend.common.const import LLM_CHAT_COST
@@ -78,26 +79,36 @@ class ImageTaskService:
token_cost = units * LLM_CHAT_COST
total_deduct = token_cost + extra_points
# Use ref_id as the related_id for points record
points_deducted = await points_service.deduct_points_with_db(
user_id=task.user_id,
amount=total_deduct,
db=db,
related_id=task.ref_id,
details={
"task_id": task_id,
"ref_type": task.ref_type,
"token_usage": total_tokens,
"token_cost": token_cost,
"extra_points": extra_points,
**extra_details
},
action=task.ref_type
)
if not points_deducted:
raise Exception("Failed to deduct points")
used_sub = False
if total_deduct > 0:
used_sub = await subscribe_service.check_and_record_usage(
db,
task.user_id,
task.ref_type,
total_deduct,
business_id=task.ref_id,
)
if not used_sub and total_deduct > 0:
points_deducted = await points_service.deduct_points_with_db(
user_id=task.user_id,
amount=total_deduct,
db=db,
related_id=task.ref_id,
details={
"task_id": task_id,
"ref_type": task.ref_type,
"token_usage": total_tokens,
"token_cost": token_cost,
"extra_points": extra_points,
**extra_details
},
action=task.ref_type
)
if not points_deducted:
raise Exception("Failed to deduct points")
# If result doesn't have token_usage, we might want to add it,
# but let's assume processor handles result structure.

View File

@@ -17,8 +17,14 @@ from backend.app.admin.service.audit_log_service import audit_log_service
from backend.app.ai.service.rate_limit_service import rate_limit_service, SPEECH_ASSESSMENT_SERVICE
from backend.database.db import async_db_session
from backend.middleware.tencent_cloud import TencentCloud
from backend.common.const import SPEECH_ASSESSMENT_COST, POINTS_ACTION_SPEECH_ASSESSMENT
from backend.common.const import (
SPEECH_ASSESSMENT_COST,
POINTS_ACTION_SPEECH_ASSESSMENT,
TTS_STANDARD_AUDIO_COST,
POINTS_ACTION_TTS_STANDARD_AUDIO,
)
from backend.app.admin.service.points_service import points_service
from backend.app.admin.service.subscribe_service import subscribe_service
from backend.core.conf import settings
from backend.middleware.qwen import Qwen
@@ -66,6 +72,10 @@ class RecordingService:
if recording:
return recording.file_id
# 未找到则按需生成
async with async_db_session.begin() as db:
has_sub = await subscribe_service.has_active_subscription(db, user_id)
if not has_sub and not await points_service.check_sufficient_points(user_id, TTS_STANDARD_AUDIO_COST):
raise errors.ForbiddenError(msg='积分不足,请获取积分后继续使用')
image_text = await image_text_service.get_text_by_id(text_id)
if not image_text:
return None
@@ -93,6 +103,25 @@ class RecordingService:
async with async_db_session() as db:
recording = await recording_dao.get_standard_by_text_id(db, text_id)
if recording:
async with async_db_session.begin() as billing_db:
used_sub = await subscribe_service.check_and_record_usage(
billing_db,
user_id,
"tts_standard_audio",
TTS_STANDARD_AUDIO_COST,
business_id=recording.id,
)
if not used_sub:
points_deducted = await points_service.deduct_points_with_db(
user_id=user_id,
amount=TTS_STANDARD_AUDIO_COST,
db=billing_db,
related_id=recording.id,
details={"recording_id": recording.id, "text_id": text_id},
action=POINTS_ACTION_TTS_STANDARD_AUDIO,
)
if not points_deducted:
logger.warning(f"Failed to deduct points for user {user_id} for TTS standard audio")
return recording.file_id
return None
@@ -104,7 +133,10 @@ class RecordingService:
recording = await recording_dao.get_standard_by_ref(db, 'qa_question', question_id)
if recording:
return recording.file_id
async with async_db_session.begin() as db:
has_sub = await subscribe_service.has_active_subscription(db, user_id)
if not has_sub and not await points_service.check_sufficient_points(user_id, TTS_STANDARD_AUDIO_COST):
raise errors.ForbiddenError(msg='积分不足,请获取积分后继续使用')
# 2. Get question content
from backend.app.ai.crud.qa_crud import qa_question_dao
async with async_db_session() as db:
@@ -142,6 +174,25 @@ class RecordingService:
async with async_db_session() as db:
recording = await recording_dao.get_standard_by_ref(db, 'qa_question', question_id)
if recording:
async with async_db_session.begin() as billing_db:
used_sub = await subscribe_service.check_and_record_usage(
billing_db,
user_id,
"tts_standard_audio",
TTS_STANDARD_AUDIO_COST,
business_id=recording.id,
)
if not used_sub:
points_deducted = await points_service.deduct_points_with_db(
user_id=user_id,
amount=TTS_STANDARD_AUDIO_COST,
db=billing_db,
related_id=recording.id,
details={"recording_id": recording.id, "question_id": question_id},
action=POINTS_ACTION_TTS_STANDARD_AUDIO,
)
if not points_deducted:
logger.warning(f"Failed to deduct points for user {user_id} for TTS question audio")
return recording.file_id
return None
@@ -417,47 +468,46 @@ class RecordingService:
ref_text = image_text.content
image_id = image_text.image_id
# 检查录音记录是否存在
recording = await self.get_recording_by_file_id(file_id)
if not recording:
# 如果不存在创建新的录音记录并存储ref_text和image_id
try:
recording_id = await self.create_recording_record(file_id, ref_text, image_id, image_text_id, 1, user_id)
# 重新获取recording对象
recording = await self.get_recording_by_file_id(file_id)
if not recording:
raise RuntimeError(f"Failed to create recording record for file_id {file_id}")
except Exception as e:
raise RuntimeError(f"Failed to create recording record for file_id {file_id}: {str(e)}")
# 检查用户积分是否足够(现在积分没有过期概念)
if not await points_service.check_sufficient_points(user_id, SPEECH_ASSESSMENT_COST):
async with async_db_session.begin() as db:
has_sub = await subscribe_service.has_active_subscription(db, user_id)
if not has_sub and not await points_service.check_sufficient_points(user_id, SPEECH_ASSESSMENT_COST):
raise errors.ForbiddenError(msg='积分不足,请获取积分后继续使用')
try:
# 调用腾讯云SOE API进行语音评估
result = await self.tencent_cloud.assessment_speech(file_id, ref_text, str(recording.id), image_id, user_id)
# 保存完整的识别结果到details字段中
details = {"assessment": result}
# 更新录音记录的details字段
success = await self.update_recording_details(recording.id, details)
if not success:
raise RuntimeError(f"Failed to update recording details for file_id {file_id}")
# 扣减用户积分
async with async_db_session.begin() as db:
points_deducted = await points_service.deduct_points_with_db(
user_id=user_id,
amount=SPEECH_ASSESSMENT_COST,
db=db,
related_id=recording.id,
details={"recording_id": recording.id},
action=POINTS_ACTION_SPEECH_ASSESSMENT
used_sub = await subscribe_service.check_and_record_usage(
db,
user_id,
"speech_assessment",
SPEECH_ASSESSMENT_COST,
business_id=recording.id,
)
if not points_deducted:
logger.warning(f"Failed to deduct points for user {user_id} for speech assessment")
if not used_sub:
points_deducted = await points_service.deduct_points_with_db(
user_id=user_id,
amount=SPEECH_ASSESSMENT_COST,
db=db,
related_id=recording.id,
details={"recording_id": recording.id},
action=POINTS_ACTION_SPEECH_ASSESSMENT
)
if not points_deducted:
logger.warning(f"Failed to deduct points for user {user_id} for speech assessment")
# 计算耗时
duration = time.time() - start_time
@@ -506,7 +556,9 @@ class RecordingService:
raise RuntimeError(f"Failed to create recording record for file_id {file_id}")
except Exception as e:
raise RuntimeError(f"Failed to create recording record for file_id {file_id}: {str(e)}")
if not await points_service.check_sufficient_points(user_id, SPEECH_ASSESSMENT_COST):
async with async_db_session.begin() as db:
has_sub = await subscribe_service.has_active_subscription(db, user_id)
if not has_sub and not await points_service.check_sufficient_points(user_id, SPEECH_ASSESSMENT_COST):
raise RuntimeError('积分不足,请获取积分后继续使用')
try:
result = await self.tencent_cloud.assessment_speech(file_id, ref_text, str(recording.id), image_id, user_id)
@@ -515,14 +567,22 @@ class RecordingService:
if not success:
raise RuntimeError(f"Failed to update recording details for file_id {file_id}")
async with async_db_session.begin() as db:
await points_service.deduct_points_with_db(
user_id=user_id,
amount=SPEECH_ASSESSMENT_COST,
db=db,
related_id=recording.id,
details={"recording_id": recording.id},
action=POINTS_ACTION_SPEECH_ASSESSMENT
used_sub = await subscribe_service.check_and_record_usage(
db,
user_id,
"speech_assessment",
SPEECH_ASSESSMENT_COST,
business_id=recording.id,
)
if not used_sub:
await points_service.deduct_points_with_db(
user_id=user_id,
amount=SPEECH_ASSESSMENT_COST,
db=db,
related_id=recording.id,
details={"recording_id": recording.id},
action=POINTS_ACTION_SPEECH_ASSESSMENT
)
duration = time.time() - start_time
if background_tasks:
self._log_audit(background_tasks, file_id, ref_text, result, duration, status_code, user_id, image_id, 0)

View File

@@ -5,12 +5,14 @@ IMAGE_RECOGNITION_COST = 1 # 1000 / 1
SPEECH_ASSESSMENT_COST = 1
LLM_CHAT_COST = 1
IMAGE_GENERATION_COST = 20
TTS_STANDARD_AUDIO_COST = 1
QWEN_TOKEN_COST = 0.002
# Points action types
POINTS_ACTION_SYSTEM_GIFT = "system_gift"
POINTS_ACTION_IMAGE_RECOGNITION = "image_recognition"
POINTS_ACTION_SPEECH_ASSESSMENT = "speech_assessment"
POINTS_ACTION_TTS_STANDARD_AUDIO = "tts_standard_audio"
POINTS_ACTION_RECHARGE = "recharge"
POINTS_ACTION_COUPON = "coupon"
POINTS_ACTION_SPEND = "spend"