Files
backend/backend/app/admin/service/wx_service.py
2025-11-21 11:28:22 +08:00

141 lines
5.6 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# services/wx.py
from sqlalchemy.orm import Session
from typing import Optional
from fastapi import Request, Response
from backend.app.admin.crud.wx_user_crud import wx_user_dao
from backend.app.admin.model.dict import DictCategory
from backend.app.admin.model.wx_user import WxUser
from backend.app.admin.schema.token import GetWxLoginToken
from backend.app.admin.schema.wx import DictLevel
from backend.app.admin.service.points_service import points_service
from backend.common.security.jwt import create_access_token, create_refresh_token
from backend.core.conf import settings
import logging
from backend.app.admin.service.wx_user_service import WxUserService
from backend.core.wx_integration import decrypt_wx_data
from backend.database.db import async_db_session
from backend.utils.timezone import timezone
class WxAuthService:
@staticmethod
async def login(
*,
request: Request, response: Response,
openid: str, session_key: str,
encrypted_data: str = None,
iv: str = None
) -> GetWxLoginToken:
"""
处理用户登录逻辑:
1. 查找或创建用户
2. 更新用户session_key
3. 解密用户信息(如果提供)
4. 生成访问令牌
"""
async with async_db_session.begin() as db:
user = None
try:
# 查找或创建用户
user = await wx_user_dao.get_by_openid(db, openid)
if not user:
user = WxUser(
openid=openid,
session_key=session_key,
profile={
'dict_level': DictLevel.LEVEL1.value,
'dict_category': DictCategory.GENERAL.value
},
)
await wx_user_dao.add(db, user)
await db.flush()
await db.refresh(user)
# initialize user points
await points_service.initialize_user_points(user_id=user.id, db=db)
else:
await wx_user_dao.update_session_key(db, user.id, session_key)
# 解密用户信息(如果提供)
if encrypted_data and iv:
try:
decrypted_data = decrypt_wx_data(
encrypted_data,
session_key,
iv
)
WxUserService.update_user_profile(db, user.id, decrypted_data)
except Exception as e:
logging.warning(f"用户数据解密失败: {str(e)}")
# 生成访问令牌
access_token = await create_access_token(
user.id,
False,
# extra info
ip=request.client.host,
# os=request.state.os,
# browser=request.state.browser,
# device=request.state.device,
)
refresh_token = await create_refresh_token(access_token.session_uuid, user.id, False)
response.set_cookie(
key=settings.COOKIE_REFRESH_TOKEN_KEY,
value=refresh_token.refresh_token,
max_age=settings.COOKIE_REFRESH_TOKEN_EXPIRE_SECONDS,
expires=timezone.to_utc(refresh_token.refresh_token_expire_time),
httponly=True,
)
except Exception as e:
db.rollback()
logging.error(f"登录处理失败: {str(e)}")
raise
else:
# 从用户资料中获取词典等级设置
dict_level = None
dict_category = None
if user and user.profile and isinstance(user.profile, dict):
dict_level = user.profile.get("dict_level")
dict_category = user.profile.get("category")
data = GetWxLoginToken(
access_token=access_token.access_token,
access_token_expire_time=access_token.access_token_expire_time,
session_uuid=access_token.session_uuid,
dict_level=dict_level,
dict_category=dict_category
)
return data
@staticmethod
async def update_user_settings(
*,
user_id: int,
dict_level: Optional[DictLevel] = None,
dict_category: Optional[DictCategory] = None
) -> None:
"""
更新用户设置
"""
async with async_db_session.begin() as db:
user = await wx_user_dao.get(db, user_id)
if not user:
raise ValueError("用户不存在")
# 如果用户没有profile初始化为空字典
if not user.profile:
user.profile = {}
# 更新词典等级设置
if dict_level is not None:
user.profile["dict_level"] = dict_level.value
if dict_category is not None:
user.profile["dict_category"] = dict_category.value
# 使用新的方法更新用户资料会自动更新updated_time字段
await wx_user_dao.update_user_profile(db, user_id, user.profile)
wx_service: WxAuthService = WxAuthService()