This commit is contained in:
Felix
2025-12-04 09:55:18 +08:00
parent 4a9e0ce061
commit 1ab6232c8a
3 changed files with 79 additions and 61 deletions

View File

@@ -227,7 +227,22 @@ class WxPayService:
if not refund:
return {}
if refund.status == 'SUCCESS':
asyncio.create_task(WxPayService._deduct_points_for_refund(refund.out_trade_no, out_refund_no))
asyncio.create_task(
WxPayService._deduct_points_for_refund(
refund.out_trade_no,
out_refund_no,
'query',
refund.raw or {
'out_refund_no': refund.out_refund_no,
'out_trade_no': refund.out_trade_no,
'refund_id': refund.refund_id,
'status': refund.status,
},
None,
True,
'REFUND.QUERY.SUCCESS',
)
)
return {
'out_refund_no': refund.out_refund_no,
'refund_id': refund.refund_id,
@@ -322,21 +337,25 @@ class WxPayService:
decrypted = decrypt_resource(resource.get('associated_data'), resource.get('nonce'), resource.get('ciphertext'), apiv3)
out_refund_no = decrypted.get('out_refund_no') or ''
status = decrypted.get('status') or ''
log = WxPayNotifyLog(
out_trade_no=out_refund_no,
event_type='REFUND',
verified=verified,
raw_text=text,
raw_json=body_json,
)
await wx_pay_notify_dao.add(db, log)
if verified and decrypted and status == 'SUCCESS':
out_trade_no = decrypted.get('out_trade_no')
asyncio.create_task(WxPayService._deduct_points_for_refund(out_trade_no, out_refund_no))
asyncio.create_task(
WxPayService._deduct_points_for_refund(
out_trade_no,
out_refund_no,
'callback',
body_json,
text,
verified,
'REFUND.SUCCESS',
)
)
return {'verified': verified}
@staticmethod
async def _deduct_points_for_refund(out_trade_no: str, out_refund_no: str) -> None:
async def _deduct_points_for_refund(out_trade_no: str, out_refund_no: str, source: str | None = None,
raw_json: dict | None = None, raw_text: str | None = None,
verified: bool = True, event_type: str | None = None) -> None:
async with async_db_session.begin() as db:
order = await wx_order_dao.get_by_out_trade_no(db, out_trade_no)
if not order or not order.points or order.points <= 0:
@@ -357,6 +376,14 @@ class WxPayService:
ok = await points_service.deduct_points(order.user_id, order.points, related_id=refund.id, details={"order_id": order.id, "product_id": order.product_id}, action=POINTS_ACTION_SPEND)
if ok:
await wx_refund_dao.update_model(db, refund.id, {'points_deducted': True})
log = WxPayNotifyLog(
out_trade_no=out_refund_no,
event_type=event_type or ('REFUND.QUERY.SUCCESS' if source == 'query' else 'REFUND.SUCCESS'),
verified=verified,
raw_text=raw_text,
raw_json=raw_json,
)
await wx_pay_notify_dao.add(db, log)
except Exception as e:
logging.error(f"Deduct points task failed for refund {out_refund_no}: {e}")

View File

@@ -4,6 +4,7 @@
定时任务处理模块
"""
from datetime import datetime, timedelta
import logging
from sqlalchemy import select, and_, desc
from backend.app.admin.model.audit_log import AuditLog, DailySummary
from backend.app.ai.model.image import Image
@@ -13,17 +14,20 @@ from backend.app.admin.crud.daily_summary_crud import daily_summary_dao
from backend.app.admin.schema.audit_log import CreateDailySummaryParam
from backend.common.const import API_TYPE_RECOGNITION
from backend.database.db import async_db_session
from backend.utils.timezone import timezone
async def wx_user_index_history() -> None:
"""异步实现 wx_user_index_history 任务"""
# 计算前一天的时间范围
today = datetime.now().date()
yesterday = today - timedelta(days=1)
yesterday_start = datetime(yesterday.year, yesterday.month, yesterday.day)
yesterday_end = datetime(today.year, today.month, today.day)
# 计算前一天的时间范围(统一为 UTC避免时区误差
today_local = timezone.now().date()
yesterday = today_local - timedelta(days=1)
yesterday_start_naive = datetime(yesterday.year, yesterday.month, yesterday.day)
yesterday_end_naive = datetime(today_local.year, today_local.month, today_local.day)
yesterday_start = timezone.to_utc(yesterday_start_naive)
yesterday_end = timezone.to_utc(yesterday_end_naive)
async with async_db_session.begin() as db:
async with async_db_session() as db:
# 优化:通过 audit_log 表查询有相关记录的用户,避免遍历所有用户
# 先获取有前一天 recognition 记录的用户 ID 列表
user_ids_stmt = (
@@ -44,7 +48,7 @@ async def wx_user_index_history() -> None:
if not user_ids:
return # 没有用户有相关记录,直接返回
# 分批处理用户,避免一次性加载过多数据
# 分批处理用户,避免一次性加载过多数据,并缩小事务粒度
batch_size = 500
for i in range(0, len(user_ids), batch_size):
batch_user_ids = user_ids[i:i + batch_size]
@@ -92,7 +96,7 @@ async def wx_user_index_history() -> None:
# image_map 只存储 thumbnail_id 而不是整个 Image 对象
image_map = {row.id: row.thumbnail_id for row in images_result.fetchall()}
# 处理每个用户
# 处理每个用户(按用户提交事务,避免单个失败导致整体回滚)
for user_id, audit_logs in user_audit_logs.items():
user = users.get(user_id)
if not user:
@@ -113,33 +117,32 @@ async def wx_user_index_history() -> None:
else:
thumbnail_ids.append('')
# 创建或更新DailySummary记录
daily_summary_data = {
"user_id": user_id,
"image_ids": image_ids,
"thumbnail_ids": thumbnail_ids,
"summary_time": yesterday_start
}
# 检查是否已存在该用户当天的记录
existing_summary_stmt = (
select(DailySummary)
.where(
and_(
DailySummary.user_id == user_id,
DailySummary.summary_time == yesterday_start
)
)
)
existing_summary_result = await db.execute(existing_summary_stmt)
existing_summary = existing_summary_result.scalar_one_or_none()
if existing_summary:
# 更新现有记录
await daily_summary_dao.update_model(db, existing_summary.id, daily_summary_data)
else:
# 创建新记录
await daily_summary_dao.create_model(db, CreateDailySummaryParam(**daily_summary_data))
# 创建或更新 DailySummary 记录(用户级事务)
try:
async with async_db_session.begin() as wdb:
daily_summary_data = {
"user_id": user_id,
"image_ids": image_ids,
"thumbnail_ids": thumbnail_ids,
"summary_time": yesterday_start
}
# 提交批次更改
await db.commit()
existing_summary_stmt = (
select(DailySummary)
.where(
and_(
DailySummary.user_id == user_id,
DailySummary.summary_time >= yesterday_start,
DailySummary.summary_time < yesterday_end,
)
)
)
existing_summary_result = await wdb.execute(existing_summary_stmt)
existing_summary = existing_summary_result.scalar_one_or_none()
if existing_summary:
await daily_summary_dao.update_model(wdb, existing_summary.id, daily_summary_data)
else:
await daily_summary_dao.create_model(wdb, CreateDailySummaryParam(**daily_summary_data))
except Exception as e:
logging.error(f"Daily summary commit failed for user_id={user_id}: {e}")

View File

@@ -14,19 +14,7 @@ from wechatpayv3 import WeChatPay, WeChatPayType
app = register_app()
@app.get("/")
def read_root():
wxpay = WxPayService._build_wxpay_instance(f'https://app.xhzone.cn:80{settings.FASTAPI_API_V1_PATH}/wxpay/notif')
out_trade_no = ''.join(sample(ascii_letters + digits, 8))
description = 'demo-description'
amount = 100
code, message = wxpay.pay(
description=description,
out_trade_no=out_trade_no,
amount={'total': amount},
payer={"openid": 'onPN_1wptAzuLo16_0aO-iSHuxI0'},
pay_type=WeChatPayType.JSAPI
)
print({'code': code, 'message': message})
async def read_root():
return {"Hello": f"World"}