diff --git a/backend/app/admin/api/router.py b/backend/app/admin/api/router.py index acb3c63..8946b65 100755 --- a/backend/app/admin/api/router.py +++ b/backend/app/admin/api/router.py @@ -9,12 +9,14 @@ from backend.app.admin.api.v1.audit_log import router as audit_log_router from backend.app.admin.api.v1.coupon import router as coupon_router from backend.app.admin.api.v1.points import router as points_router from backend.app.admin.api.v1.wxpay import router as wxpay_router +from backend.app.admin.api.v1.product import router as product_router from backend.core.conf import settings v1 = APIRouter(prefix=settings.FASTAPI_API_V1_PATH) v1.include_router(wx_router, prefix='/wx', tags=['微信服务']) v1.include_router(wxpay_router, prefix='/wxpay', tags=['微信支付']) +v1.include_router(product_router, prefix='/product', tags=['积分商品']) v1.include_router(file_router, prefix='/file', tags=['文件服务']) v1.include_router(dict_router, prefix='/dict', tags=['字典服务']) v1.include_router(audit_log_router, prefix='/audit', tags=['审计日志服务']) diff --git a/backend/app/admin/api/v1/product.py b/backend/app/admin/api/v1/product.py new file mode 100644 index 0000000..4c526a5 --- /dev/null +++ b/backend/app/admin/api/v1/product.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +from fastapi import APIRouter, Depends, Request + +from backend.app.admin.schema.product import ProductItem, InitProductsRequest, InitProductsResponse +from backend.app.admin.service.product_service import product_service +from backend.app.admin.crud.wx_order_crud import wx_order_dao +from backend.common.response.response_schema import response_base, ResponseSchemaModel +from backend.common.security.jwt import DependsJwtAuth +from backend.database.db import async_db_session + + +router = APIRouter() + + +@router.get('/list', summary='获取可购买积分商品列表', dependencies=[DependsJwtAuth]) +async def list_products(request: Request) -> ResponseSchemaModel[list[ProductItem]]: + items = await product_service.list_for_user(request.user.id) + data = [ + ProductItem( + id=str(p.id), + title=p.title, + description=p.description, + points=p.points, + amount_cents=p.amount_cents, + one_time=p.one_time, + ) + for p in items + ] + return response_base.success(data=data) + + +@router.post('/init', summary='初始化积分商品') +async def init_products(body: InitProductsRequest) -> ResponseSchemaModel[InitProductsResponse]: + count = await product_service.init_products([i.model_dump() for i in (body.items or [])]) + return response_base.success(data=InitProductsResponse(count=count)) diff --git a/backend/app/admin/api/v1/wxpay.py b/backend/app/admin/api/v1/wxpay.py index b086415..5273901 100644 --- a/backend/app/admin/api/v1/wxpay.py +++ b/backend/app/admin/api/v1/wxpay.py @@ -29,25 +29,25 @@ async def create_jsapi_order( result = await wxpay_service.create_jsapi_order( user_id=request.user.id, payer_openid=request.user.openid, - amount_cents=body.amount_cents, - description=body.description, + product_id=body.product_id, ) data = CreateJsapiOrderResponse(**result) return response_base.success(data=data) # @router.get('/order/{out_trade_no}', summary='查询订单', dependencies=[DependsJwtAuth]) -# async def query_order(out_trade_no: str) -> ResponseSchemaModel[QueryOrderResponse]: -# result = await wxpay_service.query_order(out_trade_no) -# data = QueryOrderResponse(**result) -# return response_base.success(data=data) +@router.get('/order/{out_trade_no}', summary='查询订单') +async def query_order(out_trade_no: str) -> ResponseSchemaModel[QueryOrderResponse]: + result = await wxpay_service.query_order(out_trade_no) + data = QueryOrderResponse(**result) + return response_base.success(data=data) -# @router.post('/order/{out_trade_no}/close', summary='关闭订单', dependencies=[DependsJwtAuth]) -# async def close_order(out_trade_no: str) -> ResponseSchemaModel[CloseOrderResponse]: -# result = await wxpay_service.close_order(out_trade_no) -# data = CloseOrderResponse(**result) -# return response_base.success(data=data) +@router.post('/order/{out_trade_no}/close', summary='关闭订单', dependencies=[DependsJwtAuth]) +async def close_order(out_trade_no: str) -> ResponseSchemaModel[CloseOrderResponse]: + result = await wxpay_service.close_order(out_trade_no) + data = CloseOrderResponse(**result) + return response_base.success(data=data) @router.post('/refund', summary='申请退款', dependencies=[DependsJwtAuth]) @@ -75,6 +75,7 @@ async def query_refund(out_refund_no: str) -> ResponseSchemaModel[QueryRefundRes @router.post('/notify', summary='支付回调通知') async def pay_notify(request: Request) -> ResponseSchemaModel[dict]: raw = await request.body() + print(raw) timestamp = request.headers.get('Wechatpay-Timestamp', '') nonce = request.headers.get('Wechatpay-Nonce', '') signature = request.headers.get('Wechatpay-Signature', '') @@ -83,6 +84,17 @@ async def pay_notify(request: Request) -> ResponseSchemaModel[dict]: return response_base.success(data=result) +@router.post('/notify/refund', summary='退款回调通知') +async def refund_notify(request: Request) -> ResponseSchemaModel[dict]: + raw = await request.body() + timestamp = request.headers.get('Wechatpay-Timestamp', '') + nonce = request.headers.get('Wechatpay-Nonce', '') + signature = request.headers.get('Wechatpay-Signature', '') + serial = request.headers.get('Wechatpay-Serial', '') + result = await wxpay_service.handle_refund_notify(raw, timestamp, nonce, signature, serial) + return response_base.success(data=result) + + # @router.get('/bill', summary='下载交易账单', dependencies=[DependsJwtAuth]) # async def download_bill(bill_date: str, bill_type: str = 'ALL') -> ResponseSchemaModel[DownloadBillResponse]: # result = await wxpay_service.download_bill(bill_date, bill_type) diff --git a/backend/app/admin/crud/points_crud.py b/backend/app/admin/crud/points_crud.py index 99a1635..a966595 100644 --- a/backend/app/admin/crud/points_crud.py +++ b/backend/app/admin/crud/points_crud.py @@ -109,6 +109,15 @@ class PointsLogDao(CRUDPlus[PointsLog]): await db.flush() return log + async def has_log_by_related(self, db: AsyncSession, user_id: int, related_id: int, action: str) -> bool: + stmt = select(PointsLog.id).where( + PointsLog.user_id == user_id, + PointsLog.related_id == related_id, + PointsLog.action == action, + ).limit(1) + result = await db.execute(stmt) + return result.scalar() is not None + points_dao = PointsDao(Points) -points_log_dao = PointsLogDao(PointsLog) \ No newline at end of file +points_log_dao = PointsLogDao(PointsLog) diff --git a/backend/app/admin/crud/points_product_crud.py b/backend/app/admin/crud/points_product_crud.py new file mode 100644 index 0000000..649095f --- /dev/null +++ b/backend/app/admin/crud/points_product_crud.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy_crud_plus import CRUDPlus + +from backend.app.admin.model.points_product import PointsProduct + + +class PointsProductCRUD(CRUDPlus[PointsProduct]): + async def add(self, db: AsyncSession, product: PointsProduct) -> None: + db.add(product) + + async def get_enabled_list(self, db: AsyncSession) -> list[PointsProduct]: + stmt = select(self.model).where(self.model.enabled == True).order_by(self.model.amount_cents) + result = await db.execute(stmt) + return list(result.scalars().all()) + + async def get_by_id(self, db: AsyncSession, pid: int) -> PointsProduct | None: + return await self.select_model(db, pid) + +points_product_dao: PointsProductCRUD = PointsProductCRUD(PointsProduct) + diff --git a/backend/app/admin/crud/wx_order_crud.py b/backend/app/admin/crud/wx_order_crud.py index 665e215..b802abb 100644 --- a/backend/app/admin/crud/wx_order_crud.py +++ b/backend/app/admin/crud/wx_order_crud.py @@ -8,6 +8,10 @@ from backend.app.admin.model.wx_pay import WxOrder class WxOrderCRUD(CRUDPlus[WxOrder]): + + async def get(self, db: AsyncSession, order_id: int) -> WxOrder | None: + return await self.select_model(db, order_id) + async def add(self, db: AsyncSession, order: WxOrder) -> None: db.add(order) @@ -28,5 +32,13 @@ class WxOrderCRUD(CRUDPlus[WxOrder]): result = await db.execute(stmt) return result.rowcount -wx_order_dao: WxOrderCRUD = WxOrderCRUD(WxOrder) + async def has_successful_purchase(self, db: AsyncSession, user_id: int, product_id: int) -> bool: + stmt = select(self.model).where( + self.model.user_id == user_id, + self.model.product_id == product_id, + self.model.trade_state == 'SUCCESS' + ).limit(1) + result = await db.execute(stmt) + return result.scalar() is not None +wx_order_dao: WxOrderCRUD = WxOrderCRUD(WxOrder) diff --git a/backend/app/admin/model/__init__.py b/backend/app/admin/model/__init__.py index 40b9fc5..8379285 100755 --- a/backend/app/admin/model/__init__.py +++ b/backend/app/admin/model/__init__.py @@ -8,3 +8,4 @@ from backend.app.admin.model.coupon import Coupon, CouponUsage from backend.app.admin.model.points import Points, PointsLog from backend.app.ai.model import Image from backend.app.admin.model.wx_pay import WxOrder, WxRefund, WxPayNotifyLog +from backend.app.admin.model.points_product import PointsProduct diff --git a/backend/app/admin/model/points_product.py b/backend/app/admin/model/points_product.py new file mode 100644 index 0000000..47af267 --- /dev/null +++ b/backend/app/admin/model/points_product.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +from typing import Optional + +from sqlalchemy import String, BigInteger, Integer, Boolean +from sqlalchemy.orm import Mapped, mapped_column + +from backend.common.model import snowflake_id_key, Base + + +class PointsProduct(Base): + __tablename__ = 'points_product' + + id: Mapped[snowflake_id_key] = mapped_column(BigInteger, init=False, primary_key=True) + title: Mapped[str] = mapped_column(String(64), nullable=False, comment='商品标题') + points: Mapped[int] = mapped_column(Integer, nullable=False, comment='积分数量') + amount_cents: Mapped[int] = mapped_column(Integer, nullable=False, comment='价格(分)') + description: Mapped[Optional[str]] = mapped_column(String(255), default=None, comment='描述') + one_time: Mapped[bool] = mapped_column(Boolean, default=False, comment='是否每个用户限购一次') + enabled: Mapped[bool] = mapped_column(Boolean, default=True, comment='是否启用') + diff --git a/backend/app/admin/model/wx_pay.py b/backend/app/admin/model/wx_pay.py index d835a1e..efc7ad1 100644 --- a/backend/app/admin/model/wx_pay.py +++ b/backend/app/admin/model/wx_pay.py @@ -21,6 +21,9 @@ class WxOrder(Base): prepay_id: Mapped[Optional[str]] = mapped_column(String(64), default=None, index=True, comment='预支付ID') transaction_id: Mapped[Optional[str]] = mapped_column(String(64), default=None, index=True, comment='微信支付交易单号') trade_state: Mapped[str] = mapped_column(String(16), default='NOTPAY', index=True, comment='订单状态') + product_id: Mapped[Optional[int]] = mapped_column(BigInteger, default=None, index=True, comment='商品ID') + points: Mapped[Optional[int]] = mapped_column(Integer, default=None, comment='购买积分数量') + points_granted: Mapped[bool] = mapped_column(Boolean, default=False, comment='积分已发放') __table_args__ = ( Index('idx_wx_order_user', 'user_id', 'trade_state'), @@ -40,6 +43,7 @@ class WxRefund(Base): status: Mapped[str] = mapped_column(String(16), default='PROCESSING', index=True, comment='退款状态') reason: Mapped[Optional[str]] = mapped_column(String(128), default=None, comment='退款原因') raw: Mapped[Optional[dict]] = mapped_column(MySQLJSON, default=None, comment='原始返回数据') + points_deducted: Mapped[bool] = mapped_column(Boolean, default=False, comment='退款已扣回积分') __table_args__ = ( Index('idx_wx_refund_trade', 'out_trade_no', 'status'), diff --git a/backend/app/admin/schema/product.py b/backend/app/admin/schema/product.py new file mode 100644 index 0000000..c4ad19d --- /dev/null +++ b/backend/app/admin/schema/product.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +from pydantic import BaseModel, Field +from typing import Optional, List + + +class ProductItem(BaseModel): + id: str = Field(...) + title: str = Field(...) + description: Optional[str] = Field(None) + points: int = Field(...) + amount_cents: int = Field(...) + one_time: bool = Field(False) + + +class InitProductsRequest(BaseModel): + items: Optional[List[ProductItem]] = None + + +class InitProductsResponse(BaseModel): + count: int = Field(...) + diff --git a/backend/app/admin/schema/wxpay.py b/backend/app/admin/schema/wxpay.py index d977bf1..1542252 100644 --- a/backend/app/admin/schema/wxpay.py +++ b/backend/app/admin/schema/wxpay.py @@ -8,8 +8,7 @@ from backend.common.schema import SchemaBase class CreateJsapiOrderRequest(BaseModel): - amount_cents: int = Field(..., description='订单金额(分)') - description: str = Field(..., description='订单描述') + product_id: int = Field(..., description='积分商品ID') class CreateJsapiOrderResponse(BaseModel): diff --git a/backend/app/admin/service/points_service.py b/backend/app/admin/service/points_service.py index 4b424d0..0456caa 100644 --- a/backend/app/admin/service/points_service.py +++ b/backend/app/admin/service/points_service.py @@ -196,6 +196,11 @@ class PointsService: return True + @staticmethod + async def deduct_points(user_id: int, amount: int, related_id: Optional[int] = None, details: Optional[dict] = None, action: Optional[str] = None) -> bool: + async with async_db_session.begin() as db: + return await PointsService.deduct_points_with_db(user_id, amount, db, related_id, details, action) + @staticmethod async def initialize_user_points(user_id: int, db: AsyncSession = None) -> Points: """ @@ -234,4 +239,4 @@ class PointsService: } -points_service:PointsService = PointsService() \ No newline at end of file +points_service:PointsService = PointsService() diff --git a/backend/app/admin/service/product_service.py b/backend/app/admin/service/product_service.py new file mode 100644 index 0000000..697982f --- /dev/null +++ b/backend/app/admin/service/product_service.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +from typing import List + +from backend.app.admin.crud.points_product_crud import points_product_dao +from backend.app.admin.model.points_product import PointsProduct +from backend.database.db import async_db_session +from sqlalchemy import select, and_, or_, exists +from backend.app.admin.model.wx_pay import WxOrder + + +class ProductService: + @staticmethod + async def list_enabled() -> List[PointsProduct]: + async with async_db_session.begin() as db: + return await points_product_dao.get_enabled_list(db) + + @staticmethod + async def init_products(items: List[dict] | None = None) -> int: + default = [ + {"title": "100积分", "description": "充值100积分", "points": 100, "amount_cents": 1, "one_time": False}, + # {"title": "300积分", "description": "充值300积分", "points": 300, "amount_cents": 300, "one_time": False}, + {"title": "1000积分首购", "description": "首购礼包,仅限一次", "points": 1000, "amount_cents": 1, "one_time": True}, + ] + payload = items or default + async with async_db_session.begin() as db: + count = 0 + for it in payload: + product = PointsProduct( + title=it["title"], + description=it.get("description"), + points=it["points"], + amount_cents=it["amount_cents"], + one_time=bool(it.get("one_time", False)), + enabled=True, + ) + await points_product_dao.add(db, product) + count += 1 + return count + + @staticmethod + async def list_for_user(user_id: int) -> List[PointsProduct]: + """返回用户可购买的积分商品列表(过滤一次性已购买商品)。""" + async with async_db_session.begin() as db: + subq = select(WxOrder.id).where( + and_( + WxOrder.user_id == user_id, + WxOrder.product_id == PointsProduct.id, + WxOrder.trade_state == 'SUCCESS', + ) + ).limit(1) + stmt = ( + select(PointsProduct) + .where(PointsProduct.enabled == True) + .where(or_(PointsProduct.one_time == False, ~exists(subq))) + .order_by(PointsProduct.amount_cents) + ) + result = await db.execute(stmt) + return list(result.scalars().all()) + + +product_service: ProductService = ProductService() diff --git a/backend/app/admin/service/wxpay_service.py b/backend/app/admin/service/wxpay_service.py index f226f4e..9de8fce 100755 --- a/backend/app/admin/service/wxpay_service.py +++ b/backend/app/admin/service/wxpay_service.py @@ -3,6 +3,7 @@ from datetime import datetime import json import logging import httpx +import asyncio from backend.app.admin.model.wx_pay import WxOrder, WxRefund, WxPayNotifyLog from backend.app.admin.crud.wx_order_crud import wx_order_dao @@ -10,7 +11,6 @@ from backend.app.admin.crud.wx_refund_crud import wx_refund_dao from backend.app.admin.crud.wx_pay_notify_crud import wx_pay_notify_dao from backend.core.conf import settings from backend.database.db import async_db_session -from backend.utils.wechatpay_v3 import decrypt_resource, _verify_header_signature from wechatpayv3 import WeChatPay, WeChatPayType @@ -70,17 +70,28 @@ class WxPayService: return result @staticmethod - async def create_jsapi_order(user_id: int, amount_cents: int, description: str, payer_openid: str) -> dict: + async def create_jsapi_order(user_id: int, payer_openid: str, product_id: int) -> dict: async with async_db_session.begin() as db: from backend.utils.snowflake import snowflake generated_id = snowflake.generate() + from backend.app.admin.crud.points_product_crud import points_product_dao + product = await points_product_dao.get_by_id(db, product_id) + if not product or not product.enabled: + raise RuntimeError('商品不可用') + # 限购一次检查 + from backend.app.admin.crud.wx_order_crud import wx_order_dao as order_dao_check + if product.one_time: + if await order_dao_check.has_successful_purchase(db, user_id, product_id): + raise RuntimeError('该商品限购一次,已购买') order = WxOrder( user_id=user_id, out_trade_no=str(generated_id), - description=description, - amount_cents=amount_cents, + description=product.title, + amount_cents=product.amount_cents, payer_openid=payer_openid, trade_state='NOTPAY', + product_id=product_id, + points=product.points, ) order.id = generated_id await wx_order_dao.add(db, order) @@ -93,9 +104,9 @@ class WxPayService: payer = {"openid": payer_openid} result = WxPayService._safe_call( wxpay.pay, - description=description, + description=product.title, out_trade_no=str(order.id), - amount={"total": amount_cents, "currency": "CNY"}, + amount={"total": product.amount_cents, "currency": "CNY"}, pay_type=WeChatPayType.JSAPI, payer=payer, ) @@ -127,15 +138,28 @@ class WxPayService: order = await wx_order_dao.get_by_out_trade_no(db, out_trade_no) if not order: return {} - notify_url = f"{settings.SERVER_HOST}:{settings.SERVER_PORT}{settings.FASTAPI_API_V1_PATH}/wxpay/notify" + # notify_url = f"{settings.SERVER_HOST}:{settings.SERVER_PORT}{settings.FASTAPI_API_V1_PATH}/wxpay/notify" + notify_url = f"https://app.xhzone.cn:{settings.SERVER_PORT}{settings.FASTAPI_API_V1_PATH}/wxpay/notify" wxpay = WxPayService._build_wxpay_instance(notify_url) result = WxPayService._safe_call(wxpay.query, out_trade_no=out_trade_no) + print(result) data = WxPayService._parse_result(result) trade_state = data.get('trade_state', 'NOTPAY') transaction_id = data.get('transaction_id') await wx_order_dao.update_trade_state(db, order.id, trade_state) if transaction_id: await wx_order_dao.set_transaction(db, order.id, transaction_id) + if trade_state == 'SUCCESS': + asyncio.create_task( + WxPayService._grant_points_if_needed( + order.id, + source='query', + raw_json=data, + raw_text=None, + verified=True, + event_type='TRANSACTION.SUCCESS', + ) + ) return { 'out_trade_no': out_trade_no, 'transaction_id': transaction_id, @@ -202,6 +226,8 @@ class WxPayService: refund = await wx_refund_dao.get_by_out_refund_no(db, out_refund_no) if not refund: return {} + if refund.status == 'SUCCESS': + asyncio.create_task(WxPayService._deduct_points_for_refund(refund.out_trade_no, out_refund_no)) return { 'out_refund_no': refund.out_refund_no, 'refund_id': refund.refund_id, @@ -225,23 +251,115 @@ class WxPayService: decrypted = decrypt_resource(resource.get('associated_data'), resource.get('nonce'), resource.get('ciphertext'), apiv3) out_trade_no = decrypted.get('out_trade_no') or '' transaction_id = decrypted.get('transaction_id') - log = WxPayNotifyLog( - out_trade_no=out_trade_no, - event_type=event_type, - verified=verified, - raw_text=text, - raw_json=body_json, - ) - await wx_pay_notify_dao.add(db, log) - if verified and out_trade_no: order = await wx_order_dao.get_by_out_trade_no(db, out_trade_no) if order: await wx_order_dao.update_trade_state(db, order.id, 'SUCCESS') if 'transaction_id' in locals() and transaction_id: await wx_order_dao.set_transaction(db, order.id, transaction_id) + asyncio.create_task( + WxPayService._grant_points_if_needed( + order.id, + source='callback', + raw_json=body_json, + raw_text=text, + verified=verified, + event_type=event_type or 'TRANSACTION.SUCCESS', + ) + ) return {'verified': verified} + @staticmethod + async def _grant_points_if_needed(order_id: int, source: str | None = None, raw_json: dict | None = None, + raw_text: str | None = None, verified: bool = True, + event_type: str | None = None) -> None: + async with async_db_session.begin() as db: + order = await wx_order_dao.get(db, order_id) + if not order or not order.points or order.points <= 0: + return + if getattr(order, 'points_granted', False): + return + from backend.app.admin.crud.points_crud import points_log_dao + from backend.common.const import POINTS_ACTION_RECHARGE + exists = await points_log_dao.has_log_by_related(db, order.user_id, order.id, POINTS_ACTION_RECHARGE) + if exists: + await wx_order_dao.update_model(db, order_id, {'points_granted': True}) + return + from backend.app.admin.service.points_service import points_service + try: + ok = await points_service.add_points(order.user_id, order.points, related_id=order.id, details={"product_id": order.product_id, "order_id": order.id}, action=None) + if ok: + await wx_order_dao.update_model(db, order_id, {'points_granted': True}) + # 成功发放后记录统一兼容的日志 + log = WxPayNotifyLog( + out_trade_no=order.out_trade_no, + event_type=event_type or ('QUERY.SUCCESS' if source == 'query' else 'TRANSACTION.SUCCESS'), + verified=verified, + raw_text=raw_text, + raw_json=raw_json, + ) + await wx_pay_notify_dao.add(db, log) + except Exception as e: + logging.error(f"Grant points task failed for order {order_id}: {e}") + + @staticmethod + async def handle_refund_notify(raw_body: bytes, timestamp: str | None = None, nonce: str | None = None, + signature: str | None = None, serial: str | None = None) -> dict: + async with async_db_session.begin() as db: + text = raw_body.decode('utf-8') if isinstance(raw_body, (bytes, bytearray)) else str(raw_body) + verified = False + if timestamp and nonce and signature: + from backend.utils.wechatpay_v3 import _verify_header_signature + verified = _verify_header_signature(timestamp, nonce, text, settings.WX_PAY_PLATFORM_CERT_PATH) + body_json = json.loads(text) + resource = body_json.get('resource') or {} + out_refund_no = '' + status = '' + decrypted = None + if verified and resource: + from backend.utils.wechatpay_v3 import decrypt_resource + apiv3 = settings.WX_PAY_APIV3_KEY + decrypted = decrypt_resource(resource.get('associated_data'), resource.get('nonce'), resource.get('ciphertext'), apiv3) + out_refund_no = decrypted.get('out_refund_no') or '' + status = decrypted.get('status') or '' + log = WxPayNotifyLog( + out_trade_no=out_refund_no, + event_type='REFUND', + verified=verified, + raw_text=text, + raw_json=body_json, + ) + await wx_pay_notify_dao.add(db, log) + if verified and decrypted and status == 'SUCCESS': + out_trade_no = decrypted.get('out_trade_no') + asyncio.create_task(WxPayService._deduct_points_for_refund(out_trade_no, out_refund_no)) + return {'verified': verified} + + @staticmethod + async def _deduct_points_for_refund(out_trade_no: str, out_refund_no: str) -> None: + async with async_db_session.begin() as db: + order = await wx_order_dao.get_by_out_trade_no(db, out_trade_no) + if not order or not order.points or order.points <= 0: + return + refund = await wx_refund_dao.get_by_out_refund_no(db, out_refund_no) + if not refund: + return + if getattr(refund, 'points_deducted', False): + return + from backend.app.admin.crud.points_crud import points_log_dao + from backend.common.const import POINTS_ACTION_SPEND + exists = await points_log_dao.has_log_by_related(db, order.user_id, refund.id, POINTS_ACTION_SPEND) + if exists: + await wx_refund_dao.update_model(db, refund.id, {'points_deducted': True}) + return + from backend.app.admin.service.points_service import points_service + try: + ok = await points_service.deduct_points(order.user_id, order.points, related_id=refund.id, details={"order_id": order.id, "product_id": order.product_id}, action=POINTS_ACTION_SPEND) + if ok: + await wx_refund_dao.update_model(db, refund.id, {'points_deducted': True}) + except Exception as e: + logging.error(f"Deduct points task failed for refund {out_refund_no}: {e}") + @staticmethod async def download_bill(bill_date: str, bill_type: str = 'ALL') -> dict: notify_url = f"{settings.SERVER_HOST}:{settings.SERVER_PORT}{settings.FASTAPI_API_V1_PATH}/wxpay/notify" diff --git a/backend/core/conf.py b/backend/core/conf.py index e8365ab..ed3fa75 100755 --- a/backend/core/conf.py +++ b/backend/core/conf.py @@ -113,6 +113,7 @@ class Settings(BaseSettings): f'{FASTAPI_API_V1_PATH}/auth/login', f'{FASTAPI_API_V1_PATH}/auth/logout', f'{FASTAPI_API_V1_PATH}/wxpay/notify', + f'{FASTAPI_API_V1_PATH}/wxpay/notify/refund', ] # JWT