add wxpay

This commit is contained in:
Felix
2025-12-03 22:10:11 +08:00
parent f029e0a5fe
commit 4a9e0ce061
15 changed files with 359 additions and 32 deletions

View File

@@ -9,12 +9,14 @@ from backend.app.admin.api.v1.audit_log import router as audit_log_router
from backend.app.admin.api.v1.coupon import router as coupon_router
from backend.app.admin.api.v1.points import router as points_router
from backend.app.admin.api.v1.wxpay import router as wxpay_router
from backend.app.admin.api.v1.product import router as product_router
from backend.core.conf import settings
v1 = APIRouter(prefix=settings.FASTAPI_API_V1_PATH)
v1.include_router(wx_router, prefix='/wx', tags=['微信服务'])
v1.include_router(wxpay_router, prefix='/wxpay', tags=['微信支付'])
v1.include_router(product_router, prefix='/product', tags=['积分商品'])
v1.include_router(file_router, prefix='/file', tags=['文件服务'])
v1.include_router(dict_router, prefix='/dict', tags=['字典服务'])
v1.include_router(audit_log_router, prefix='/audit', tags=['审计日志服务'])

View File

@@ -0,0 +1,36 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from fastapi import APIRouter, Depends, Request
from backend.app.admin.schema.product import ProductItem, InitProductsRequest, InitProductsResponse
from backend.app.admin.service.product_service import product_service
from backend.app.admin.crud.wx_order_crud import wx_order_dao
from backend.common.response.response_schema import response_base, ResponseSchemaModel
from backend.common.security.jwt import DependsJwtAuth
from backend.database.db import async_db_session
router = APIRouter()
@router.get('/list', summary='获取可购买积分商品列表', dependencies=[DependsJwtAuth])
async def list_products(request: Request) -> ResponseSchemaModel[list[ProductItem]]:
items = await product_service.list_for_user(request.user.id)
data = [
ProductItem(
id=str(p.id),
title=p.title,
description=p.description,
points=p.points,
amount_cents=p.amount_cents,
one_time=p.one_time,
)
for p in items
]
return response_base.success(data=data)
@router.post('/init', summary='初始化积分商品')
async def init_products(body: InitProductsRequest) -> ResponseSchemaModel[InitProductsResponse]:
count = await product_service.init_products([i.model_dump() for i in (body.items or [])])
return response_base.success(data=InitProductsResponse(count=count))

View File

@@ -29,25 +29,25 @@ async def create_jsapi_order(
result = await wxpay_service.create_jsapi_order(
user_id=request.user.id,
payer_openid=request.user.openid,
amount_cents=body.amount_cents,
description=body.description,
product_id=body.product_id,
)
data = CreateJsapiOrderResponse(**result)
return response_base.success(data=data)
# @router.get('/order/{out_trade_no}', summary='查询订单', dependencies=[DependsJwtAuth])
# async def query_order(out_trade_no: str) -> ResponseSchemaModel[QueryOrderResponse]:
# result = await wxpay_service.query_order(out_trade_no)
# data = QueryOrderResponse(**result)
# return response_base.success(data=data)
@router.get('/order/{out_trade_no}', summary='查询订单')
async def query_order(out_trade_no: str) -> ResponseSchemaModel[QueryOrderResponse]:
result = await wxpay_service.query_order(out_trade_no)
data = QueryOrderResponse(**result)
return response_base.success(data=data)
# @router.post('/order/{out_trade_no}/close', summary='关闭订单', dependencies=[DependsJwtAuth])
# async def close_order(out_trade_no: str) -> ResponseSchemaModel[CloseOrderResponse]:
# result = await wxpay_service.close_order(out_trade_no)
# data = CloseOrderResponse(**result)
# return response_base.success(data=data)
@router.post('/order/{out_trade_no}/close', summary='关闭订单', dependencies=[DependsJwtAuth])
async def close_order(out_trade_no: str) -> ResponseSchemaModel[CloseOrderResponse]:
result = await wxpay_service.close_order(out_trade_no)
data = CloseOrderResponse(**result)
return response_base.success(data=data)
@router.post('/refund', summary='申请退款', dependencies=[DependsJwtAuth])
@@ -75,6 +75,7 @@ async def query_refund(out_refund_no: str) -> ResponseSchemaModel[QueryRefundRes
@router.post('/notify', summary='支付回调通知')
async def pay_notify(request: Request) -> ResponseSchemaModel[dict]:
raw = await request.body()
print(raw)
timestamp = request.headers.get('Wechatpay-Timestamp', '')
nonce = request.headers.get('Wechatpay-Nonce', '')
signature = request.headers.get('Wechatpay-Signature', '')
@@ -83,6 +84,17 @@ async def pay_notify(request: Request) -> ResponseSchemaModel[dict]:
return response_base.success(data=result)
@router.post('/notify/refund', summary='退款回调通知')
async def refund_notify(request: Request) -> ResponseSchemaModel[dict]:
raw = await request.body()
timestamp = request.headers.get('Wechatpay-Timestamp', '')
nonce = request.headers.get('Wechatpay-Nonce', '')
signature = request.headers.get('Wechatpay-Signature', '')
serial = request.headers.get('Wechatpay-Serial', '')
result = await wxpay_service.handle_refund_notify(raw, timestamp, nonce, signature, serial)
return response_base.success(data=result)
# @router.get('/bill', summary='下载交易账单', dependencies=[DependsJwtAuth])
# async def download_bill(bill_date: str, bill_type: str = 'ALL') -> ResponseSchemaModel[DownloadBillResponse]:
# result = await wxpay_service.download_bill(bill_date, bill_type)

View File

@@ -109,6 +109,15 @@ class PointsLogDao(CRUDPlus[PointsLog]):
await db.flush()
return log
async def has_log_by_related(self, db: AsyncSession, user_id: int, related_id: int, action: str) -> bool:
stmt = select(PointsLog.id).where(
PointsLog.user_id == user_id,
PointsLog.related_id == related_id,
PointsLog.action == action,
).limit(1)
result = await db.execute(stmt)
return result.scalar() is not None
points_dao = PointsDao(Points)
points_log_dao = PointsLogDao(PointsLog)
points_log_dao = PointsLogDao(PointsLog)

View File

@@ -0,0 +1,23 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy_crud_plus import CRUDPlus
from backend.app.admin.model.points_product import PointsProduct
class PointsProductCRUD(CRUDPlus[PointsProduct]):
async def add(self, db: AsyncSession, product: PointsProduct) -> None:
db.add(product)
async def get_enabled_list(self, db: AsyncSession) -> list[PointsProduct]:
stmt = select(self.model).where(self.model.enabled == True).order_by(self.model.amount_cents)
result = await db.execute(stmt)
return list(result.scalars().all())
async def get_by_id(self, db: AsyncSession, pid: int) -> PointsProduct | None:
return await self.select_model(db, pid)
points_product_dao: PointsProductCRUD = PointsProductCRUD(PointsProduct)

View File

@@ -8,6 +8,10 @@ from backend.app.admin.model.wx_pay import WxOrder
class WxOrderCRUD(CRUDPlus[WxOrder]):
async def get(self, db: AsyncSession, order_id: int) -> WxOrder | None:
return await self.select_model(db, order_id)
async def add(self, db: AsyncSession, order: WxOrder) -> None:
db.add(order)
@@ -28,5 +32,13 @@ class WxOrderCRUD(CRUDPlus[WxOrder]):
result = await db.execute(stmt)
return result.rowcount
wx_order_dao: WxOrderCRUD = WxOrderCRUD(WxOrder)
async def has_successful_purchase(self, db: AsyncSession, user_id: int, product_id: int) -> bool:
stmt = select(self.model).where(
self.model.user_id == user_id,
self.model.product_id == product_id,
self.model.trade_state == 'SUCCESS'
).limit(1)
result = await db.execute(stmt)
return result.scalar() is not None
wx_order_dao: WxOrderCRUD = WxOrderCRUD(WxOrder)

View File

@@ -8,3 +8,4 @@ from backend.app.admin.model.coupon import Coupon, CouponUsage
from backend.app.admin.model.points import Points, PointsLog
from backend.app.ai.model import Image
from backend.app.admin.model.wx_pay import WxOrder, WxRefund, WxPayNotifyLog
from backend.app.admin.model.points_product import PointsProduct

View File

@@ -0,0 +1,21 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Optional
from sqlalchemy import String, BigInteger, Integer, Boolean
from sqlalchemy.orm import Mapped, mapped_column
from backend.common.model import snowflake_id_key, Base
class PointsProduct(Base):
__tablename__ = 'points_product'
id: Mapped[snowflake_id_key] = mapped_column(BigInteger, init=False, primary_key=True)
title: Mapped[str] = mapped_column(String(64), nullable=False, comment='商品标题')
points: Mapped[int] = mapped_column(Integer, nullable=False, comment='积分数量')
amount_cents: Mapped[int] = mapped_column(Integer, nullable=False, comment='价格(分)')
description: Mapped[Optional[str]] = mapped_column(String(255), default=None, comment='描述')
one_time: Mapped[bool] = mapped_column(Boolean, default=False, comment='是否每个用户限购一次')
enabled: Mapped[bool] = mapped_column(Boolean, default=True, comment='是否启用')

View File

@@ -21,6 +21,9 @@ class WxOrder(Base):
prepay_id: Mapped[Optional[str]] = mapped_column(String(64), default=None, index=True, comment='预支付ID')
transaction_id: Mapped[Optional[str]] = mapped_column(String(64), default=None, index=True, comment='微信支付交易单号')
trade_state: Mapped[str] = mapped_column(String(16), default='NOTPAY', index=True, comment='订单状态')
product_id: Mapped[Optional[int]] = mapped_column(BigInteger, default=None, index=True, comment='商品ID')
points: Mapped[Optional[int]] = mapped_column(Integer, default=None, comment='购买积分数量')
points_granted: Mapped[bool] = mapped_column(Boolean, default=False, comment='积分已发放')
__table_args__ = (
Index('idx_wx_order_user', 'user_id', 'trade_state'),
@@ -40,6 +43,7 @@ class WxRefund(Base):
status: Mapped[str] = mapped_column(String(16), default='PROCESSING', index=True, comment='退款状态')
reason: Mapped[Optional[str]] = mapped_column(String(128), default=None, comment='退款原因')
raw: Mapped[Optional[dict]] = mapped_column(MySQLJSON, default=None, comment='原始返回数据')
points_deducted: Mapped[bool] = mapped_column(Boolean, default=False, comment='退款已扣回积分')
__table_args__ = (
Index('idx_wx_refund_trade', 'out_trade_no', 'status'),

View File

@@ -0,0 +1,22 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from pydantic import BaseModel, Field
from typing import Optional, List
class ProductItem(BaseModel):
id: str = Field(...)
title: str = Field(...)
description: Optional[str] = Field(None)
points: int = Field(...)
amount_cents: int = Field(...)
one_time: bool = Field(False)
class InitProductsRequest(BaseModel):
items: Optional[List[ProductItem]] = None
class InitProductsResponse(BaseModel):
count: int = Field(...)

View File

@@ -8,8 +8,7 @@ from backend.common.schema import SchemaBase
class CreateJsapiOrderRequest(BaseModel):
amount_cents: int = Field(..., description='订单金额(分)')
description: str = Field(..., description='订单描述')
product_id: int = Field(..., description='积分商品ID')
class CreateJsapiOrderResponse(BaseModel):

View File

@@ -196,6 +196,11 @@ class PointsService:
return True
@staticmethod
async def deduct_points(user_id: int, amount: int, related_id: Optional[int] = None, details: Optional[dict] = None, action: Optional[str] = None) -> bool:
async with async_db_session.begin() as db:
return await PointsService.deduct_points_with_db(user_id, amount, db, related_id, details, action)
@staticmethod
async def initialize_user_points(user_id: int, db: AsyncSession = None) -> Points:
"""
@@ -234,4 +239,4 @@ class PointsService:
}
points_service:PointsService = PointsService()
points_service:PointsService = PointsService()

View File

@@ -0,0 +1,62 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import List
from backend.app.admin.crud.points_product_crud import points_product_dao
from backend.app.admin.model.points_product import PointsProduct
from backend.database.db import async_db_session
from sqlalchemy import select, and_, or_, exists
from backend.app.admin.model.wx_pay import WxOrder
class ProductService:
@staticmethod
async def list_enabled() -> List[PointsProduct]:
async with async_db_session.begin() as db:
return await points_product_dao.get_enabled_list(db)
@staticmethod
async def init_products(items: List[dict] | None = None) -> int:
default = [
{"title": "100积分", "description": "充值100积分", "points": 100, "amount_cents": 1, "one_time": False},
# {"title": "300积分", "description": "充值300积分", "points": 300, "amount_cents": 300, "one_time": False},
{"title": "1000积分首购", "description": "首购礼包,仅限一次", "points": 1000, "amount_cents": 1, "one_time": True},
]
payload = items or default
async with async_db_session.begin() as db:
count = 0
for it in payload:
product = PointsProduct(
title=it["title"],
description=it.get("description"),
points=it["points"],
amount_cents=it["amount_cents"],
one_time=bool(it.get("one_time", False)),
enabled=True,
)
await points_product_dao.add(db, product)
count += 1
return count
@staticmethod
async def list_for_user(user_id: int) -> List[PointsProduct]:
"""返回用户可购买的积分商品列表(过滤一次性已购买商品)。"""
async with async_db_session.begin() as db:
subq = select(WxOrder.id).where(
and_(
WxOrder.user_id == user_id,
WxOrder.product_id == PointsProduct.id,
WxOrder.trade_state == 'SUCCESS',
)
).limit(1)
stmt = (
select(PointsProduct)
.where(PointsProduct.enabled == True)
.where(or_(PointsProduct.one_time == False, ~exists(subq)))
.order_by(PointsProduct.amount_cents)
)
result = await db.execute(stmt)
return list(result.scalars().all())
product_service: ProductService = ProductService()

View File

@@ -3,6 +3,7 @@ from datetime import datetime
import json
import logging
import httpx
import asyncio
from backend.app.admin.model.wx_pay import WxOrder, WxRefund, WxPayNotifyLog
from backend.app.admin.crud.wx_order_crud import wx_order_dao
@@ -10,7 +11,6 @@ from backend.app.admin.crud.wx_refund_crud import wx_refund_dao
from backend.app.admin.crud.wx_pay_notify_crud import wx_pay_notify_dao
from backend.core.conf import settings
from backend.database.db import async_db_session
from backend.utils.wechatpay_v3 import decrypt_resource, _verify_header_signature
from wechatpayv3 import WeChatPay, WeChatPayType
@@ -70,17 +70,28 @@ class WxPayService:
return result
@staticmethod
async def create_jsapi_order(user_id: int, amount_cents: int, description: str, payer_openid: str) -> dict:
async def create_jsapi_order(user_id: int, payer_openid: str, product_id: int) -> dict:
async with async_db_session.begin() as db:
from backend.utils.snowflake import snowflake
generated_id = snowflake.generate()
from backend.app.admin.crud.points_product_crud import points_product_dao
product = await points_product_dao.get_by_id(db, product_id)
if not product or not product.enabled:
raise RuntimeError('商品不可用')
# 限购一次检查
from backend.app.admin.crud.wx_order_crud import wx_order_dao as order_dao_check
if product.one_time:
if await order_dao_check.has_successful_purchase(db, user_id, product_id):
raise RuntimeError('该商品限购一次,已购买')
order = WxOrder(
user_id=user_id,
out_trade_no=str(generated_id),
description=description,
amount_cents=amount_cents,
description=product.title,
amount_cents=product.amount_cents,
payer_openid=payer_openid,
trade_state='NOTPAY',
product_id=product_id,
points=product.points,
)
order.id = generated_id
await wx_order_dao.add(db, order)
@@ -93,9 +104,9 @@ class WxPayService:
payer = {"openid": payer_openid}
result = WxPayService._safe_call(
wxpay.pay,
description=description,
description=product.title,
out_trade_no=str(order.id),
amount={"total": amount_cents, "currency": "CNY"},
amount={"total": product.amount_cents, "currency": "CNY"},
pay_type=WeChatPayType.JSAPI,
payer=payer,
)
@@ -127,15 +138,28 @@ class WxPayService:
order = await wx_order_dao.get_by_out_trade_no(db, out_trade_no)
if not order:
return {}
notify_url = f"{settings.SERVER_HOST}:{settings.SERVER_PORT}{settings.FASTAPI_API_V1_PATH}/wxpay/notify"
# notify_url = f"{settings.SERVER_HOST}:{settings.SERVER_PORT}{settings.FASTAPI_API_V1_PATH}/wxpay/notify"
notify_url = f"https://app.xhzone.cn:{settings.SERVER_PORT}{settings.FASTAPI_API_V1_PATH}/wxpay/notify"
wxpay = WxPayService._build_wxpay_instance(notify_url)
result = WxPayService._safe_call(wxpay.query, out_trade_no=out_trade_no)
print(result)
data = WxPayService._parse_result(result)
trade_state = data.get('trade_state', 'NOTPAY')
transaction_id = data.get('transaction_id')
await wx_order_dao.update_trade_state(db, order.id, trade_state)
if transaction_id:
await wx_order_dao.set_transaction(db, order.id, transaction_id)
if trade_state == 'SUCCESS':
asyncio.create_task(
WxPayService._grant_points_if_needed(
order.id,
source='query',
raw_json=data,
raw_text=None,
verified=True,
event_type='TRANSACTION.SUCCESS',
)
)
return {
'out_trade_no': out_trade_no,
'transaction_id': transaction_id,
@@ -202,6 +226,8 @@ class WxPayService:
refund = await wx_refund_dao.get_by_out_refund_no(db, out_refund_no)
if not refund:
return {}
if refund.status == 'SUCCESS':
asyncio.create_task(WxPayService._deduct_points_for_refund(refund.out_trade_no, out_refund_no))
return {
'out_refund_no': refund.out_refund_no,
'refund_id': refund.refund_id,
@@ -225,23 +251,115 @@ class WxPayService:
decrypted = decrypt_resource(resource.get('associated_data'), resource.get('nonce'), resource.get('ciphertext'), apiv3)
out_trade_no = decrypted.get('out_trade_no') or ''
transaction_id = decrypted.get('transaction_id')
log = WxPayNotifyLog(
out_trade_no=out_trade_no,
event_type=event_type,
verified=verified,
raw_text=text,
raw_json=body_json,
)
await wx_pay_notify_dao.add(db, log)
if verified and out_trade_no:
order = await wx_order_dao.get_by_out_trade_no(db, out_trade_no)
if order:
await wx_order_dao.update_trade_state(db, order.id, 'SUCCESS')
if 'transaction_id' in locals() and transaction_id:
await wx_order_dao.set_transaction(db, order.id, transaction_id)
asyncio.create_task(
WxPayService._grant_points_if_needed(
order.id,
source='callback',
raw_json=body_json,
raw_text=text,
verified=verified,
event_type=event_type or 'TRANSACTION.SUCCESS',
)
)
return {'verified': verified}
@staticmethod
async def _grant_points_if_needed(order_id: int, source: str | None = None, raw_json: dict | None = None,
raw_text: str | None = None, verified: bool = True,
event_type: str | None = None) -> None:
async with async_db_session.begin() as db:
order = await wx_order_dao.get(db, order_id)
if not order or not order.points or order.points <= 0:
return
if getattr(order, 'points_granted', False):
return
from backend.app.admin.crud.points_crud import points_log_dao
from backend.common.const import POINTS_ACTION_RECHARGE
exists = await points_log_dao.has_log_by_related(db, order.user_id, order.id, POINTS_ACTION_RECHARGE)
if exists:
await wx_order_dao.update_model(db, order_id, {'points_granted': True})
return
from backend.app.admin.service.points_service import points_service
try:
ok = await points_service.add_points(order.user_id, order.points, related_id=order.id, details={"product_id": order.product_id, "order_id": order.id}, action=None)
if ok:
await wx_order_dao.update_model(db, order_id, {'points_granted': True})
# 成功发放后记录统一兼容的日志
log = WxPayNotifyLog(
out_trade_no=order.out_trade_no,
event_type=event_type or ('QUERY.SUCCESS' if source == 'query' else 'TRANSACTION.SUCCESS'),
verified=verified,
raw_text=raw_text,
raw_json=raw_json,
)
await wx_pay_notify_dao.add(db, log)
except Exception as e:
logging.error(f"Grant points task failed for order {order_id}: {e}")
@staticmethod
async def handle_refund_notify(raw_body: bytes, timestamp: str | None = None, nonce: str | None = None,
signature: str | None = None, serial: str | None = None) -> dict:
async with async_db_session.begin() as db:
text = raw_body.decode('utf-8') if isinstance(raw_body, (bytes, bytearray)) else str(raw_body)
verified = False
if timestamp and nonce and signature:
from backend.utils.wechatpay_v3 import _verify_header_signature
verified = _verify_header_signature(timestamp, nonce, text, settings.WX_PAY_PLATFORM_CERT_PATH)
body_json = json.loads(text)
resource = body_json.get('resource') or {}
out_refund_no = ''
status = ''
decrypted = None
if verified and resource:
from backend.utils.wechatpay_v3 import decrypt_resource
apiv3 = settings.WX_PAY_APIV3_KEY
decrypted = decrypt_resource(resource.get('associated_data'), resource.get('nonce'), resource.get('ciphertext'), apiv3)
out_refund_no = decrypted.get('out_refund_no') or ''
status = decrypted.get('status') or ''
log = WxPayNotifyLog(
out_trade_no=out_refund_no,
event_type='REFUND',
verified=verified,
raw_text=text,
raw_json=body_json,
)
await wx_pay_notify_dao.add(db, log)
if verified and decrypted and status == 'SUCCESS':
out_trade_no = decrypted.get('out_trade_no')
asyncio.create_task(WxPayService._deduct_points_for_refund(out_trade_no, out_refund_no))
return {'verified': verified}
@staticmethod
async def _deduct_points_for_refund(out_trade_no: str, out_refund_no: str) -> None:
async with async_db_session.begin() as db:
order = await wx_order_dao.get_by_out_trade_no(db, out_trade_no)
if not order or not order.points or order.points <= 0:
return
refund = await wx_refund_dao.get_by_out_refund_no(db, out_refund_no)
if not refund:
return
if getattr(refund, 'points_deducted', False):
return
from backend.app.admin.crud.points_crud import points_log_dao
from backend.common.const import POINTS_ACTION_SPEND
exists = await points_log_dao.has_log_by_related(db, order.user_id, refund.id, POINTS_ACTION_SPEND)
if exists:
await wx_refund_dao.update_model(db, refund.id, {'points_deducted': True})
return
from backend.app.admin.service.points_service import points_service
try:
ok = await points_service.deduct_points(order.user_id, order.points, related_id=refund.id, details={"order_id": order.id, "product_id": order.product_id}, action=POINTS_ACTION_SPEND)
if ok:
await wx_refund_dao.update_model(db, refund.id, {'points_deducted': True})
except Exception as e:
logging.error(f"Deduct points task failed for refund {out_refund_no}: {e}")
@staticmethod
async def download_bill(bill_date: str, bill_type: str = 'ALL') -> dict:
notify_url = f"{settings.SERVER_HOST}:{settings.SERVER_PORT}{settings.FASTAPI_API_V1_PATH}/wxpay/notify"

View File

@@ -113,6 +113,7 @@ class Settings(BaseSettings):
f'{FASTAPI_API_V1_PATH}/auth/login',
f'{FASTAPI_API_V1_PATH}/auth/logout',
f'{FASTAPI_API_V1_PATH}/wxpay/notify',
f'{FASTAPI_API_V1_PATH}/wxpay/notify/refund',
]
# JWT