Merge commit '64b263cc2f237a5b201ac13f4c7267dfb61a2963' into dev-scheduler

This commit is contained in:
2026-01-15 14:33:31 +08:00
12 changed files with 62 additions and 256 deletions

112
config.py
View File

@@ -1,105 +1,11 @@
import os
from pathlib import Path
from types import SimpleNamespace # 导入SimpleNamespace
from typing import Any, Dict
import yaml
from dotenv import load_dotenv
from uvicorn.server import logger
from pydantic_settings import BaseSettings,SettingsConfigDict
class EnvConfig:
def __init__(self):
# 加载环境变量
if os.path.exists(".env"):
if load_dotenv(".env"):
logger.info("[激活配置] .env 配置加载成功 ✅")
else:
# 将 exit(1) 更改为 warning避免程序直接退出
logger.warning(
"[激活配置] 环境变量加载失败,请检查 .env 文件内容是否正确"
)
else:
logger.warning("[激活配置] .env 文件不存在,将不加载环境变量")
def _parse_bool(self, value):
"""将字符串形式的布尔值转换为布尔类型"""
if isinstance(value, str):
if value.lower() == "true":
return True
elif value.lower() == "false":
return False
return value
def __getattr__(self, name):
value = os.getenv(name)
if value is not None:
return self._parse_bool(value)
default_name = f"DEFAULT_{name}"
default_value = os.getenv(default_name)
if default_value is not None:
return self._parse_bool(default_value)
return None
def get(self, name, default=None):
value = os.getenv(name)
if value is not None:
return self._parse_bool(value)
return default
class YamlConfig:
def __init__(self, yaml_file: str = "config.yaml"):
self._yaml_file = Path(yaml_file)
self._load_settings()
def _load_settings(self) -> None:
if not self._yaml_file.exists():
raise FileNotFoundError(f"[激活配置] 配置文件 {self._yaml_file} 不存在")
with open(self._yaml_file, "r", encoding="utf-8") as f:
config_data = yaml.safe_load(f) or {}
self._set_attributes(config_data)
logger.info(f"[激活配置] config.yaml 配置加载成功 ✅")
def _set_attributes(self, config_data: Dict[str, Any]) -> None:
# 使用 SimpleNamespace 更好地处理嵌套配置,使其可以通过属性访问
for key, value in config_data.items():
if isinstance(value, dict):
setattr(self, key, SimpleNamespace(**value))
else:
setattr(self, key, value)
# 移除 _from_dict 方法,因为 SimpleNamespace 可以直接从字典创建
def __repr__(self) -> str:
attrs = []
for key in sorted(self.__dict__.keys()):
if not key.startswith("_"):
value = getattr(self, key)
attrs.append(f"{key}={repr(value)}")
return f"Settings({', '.join(attrs)})"
def reload(self) -> None:
if self._yaml_file is None:
raise RuntimeError("无法重新加载,此实例是从字典创建的")
self._load_settings()
# 创建配置实例
env_config = EnvConfig()
yaml_config = None
try:
yaml_config = YamlConfig()
except FileNotFoundError:
logger.warning("[激活配置] 配置文件 config.yaml 不存在,将不加载配置")
class Setting:
def __init__(self):
self.env = env_config
self.config = yaml_config
setting = Setting()
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env" , env_prefix="WNZS_")
PGSQL: str = ""
WECOM_CORPID: str = ""
WECOM_CORPSECRET: str = ""
WECOM_APP_TOKEN: str = ""
WECOM_APP_ENCODING_AES_KEY: str = ""

View File

@@ -19,7 +19,7 @@ def init_scheduler(app : FastAPI):
def active_config():
logger.info(f"[激活配置] 加载配置 ⚙️")
from config import setting # noqa
from config import Settings # noqa
def import_router(app: FastAPI):

View File

@@ -1,8 +1,8 @@
from sqlmodel import Session, SQLModel, create_engine
from config import setting
from config import Settings
PGSQL = setting.env.PGSQL or "sqlite:///database.db"
PGSQL = Settings().PGSQL
engine = create_engine(str(PGSQL))

View File

@@ -22,6 +22,7 @@ dependencies = [
"xmltodict>=1.0.2",
"psycopg2-binary>=2.9.11",
"fastscheduler[fastapi]>=0.1.2",
"pydantic-settings>=2.11.0",
]
[dependency-groups]

9
service/__init__.py Normal file
View File

@@ -0,0 +1,9 @@
from service.wecom import Wecom
from config import Settings
from utils.sing import SingletonProvider
# 获取单例函数
get_wecom = SingletonProvider(lambda: Wecom(
Settings().WECOM_CORPID,Settings().WECOM_CORPSECRET
))

View File

@@ -0,0 +1,5 @@
from wecom_sdk import Wecom
__all__ = [
"Wecom"
]

22
utils/sing.py Normal file
View File

@@ -0,0 +1,22 @@
from typing import Callable, Generic, TypeVar
T = TypeVar("T")
class SingletonProvider(Generic[T]):
def __init__(self, factory: Callable[[], T]):
self._factory = factory
self._instance: T | None = None
def __call__(self) -> T:
if self._instance is None:
self._instance = self._factory()
return self._instance
def reset(self) -> None:
"""重置单例(测试 / 热重载用)"""
self._instance = None
def warmup(self) -> T:
"""提前初始化"""
return self()

View File

@@ -1,8 +1,7 @@
from .wx_com import wecom_service,wxcpt
from .wx_com import wxcpt
from .wx_utils import get_request_params,decrypt_message,extract_message_content
__all__ = [
"wecom_service",
"wxcpt",
"get_request_params",
"decrypt_message",

View File

@@ -1,41 +0,0 @@
from wecom_sdk.exceptions.general import SDKException
from wecom_sdk.modules.base import WecomBaseClient
from wecom_sdk.utils.requests import HttpxRequest
class WecomContactClient(WecomBaseClient):
async def get_contact_list(self , userid: str):
"""
获取联系人列表
@param userid: 用户id
@return: 联系人列表
"""
url = self.BASE_URL + "/externalcontact/list"
params = {"access_token": await self.access_token , "userid": userid}
resp = await HttpxRequest.post(url=url, params=params)
if resp.errcode == 0:
return resp.external_contact_list
else:
raise SDKException(resp.errcode, resp.errmsg)
async def get_contact_detail(self , external_userid: str , cursor : None | str = None):
"""
获取联系人详情
@param userid: 用户id
@param external_userid: 外部联系人id
@param cursor: 分页游标
@return: 联系人详情
"""
url = self.BASE_URL + "/externalcontact/get"
params = {"access_token": await self.access_token , "external_userid": external_userid }
params.update({"cursor": cursor} if cursor else {})
resp = await HttpxRequest.post(url=url, params=params)
if resp.get("errcode") == 0:
return resp.get("external_contact" , {})
else:
raise SDKException(resp.errcode, resp.errmsg)

View File

@@ -1,66 +0,0 @@
from wecom_sdk.schemas.base import BaseSchema
from typing import List, Optional
class TextAttr(BaseSchema):
value: str
class WebAttr(BaseSchema):
url: str
title: str
class MiniProgramAttr(BaseSchema):
appid: str
pagepath: str
title: str
class ExternalAttr(BaseSchema):
type: int
name: str
text: Optional[TextAttr] = None
web: Optional[WebAttr] = None
miniprogram: Optional[MiniProgramAttr] = None
class ExternalProfile(BaseSchema):
external_attr: List[ExternalAttr]
class ExternalContact(BaseSchema):
external_userid: str
name: str
position: Optional[str] = None
avatar: Optional[str] = None
corp_name: Optional[str] = None
corp_full_name: Optional[str] = None
type: int
gender: int
unionid: Optional[str] = None
external_profile: Optional[ExternalProfile] = None
class Tag(BaseSchema):
group_name: str
tag_name: str
tag_id: Optional[str] = None
type: int
class WechatChannels(BaseSchema):
nickname: str
source: int
class FollowUser(BaseSchema):
userid: str
remark: Optional[str] = None
description: Optional[str] = None
createtime: int
tags: Optional[List[Tag]] = None
remark_corp_name: Optional[str] = None
remark_mobiles: Optional[List[str]] = None
oper_userid: str
add_way: int
state: Optional[str] = None
wechat_channels: Optional[WechatChannels] = None
class ContactResponse(BaseSchema):
errcode: int
errmsg: str
external_contact: ExternalContact
follow_user: List[FollowUser]
next_cursor: Optional[str] = None

View File

@@ -1,13 +1,6 @@
from uvicorn.server import logger
from wecom_sdk import Wecom
from utils.wxcom.modules.contact import WecomContactClient
from config import setting
from .WXBizMsgCrypt3 import WXBizMsgCrypt
class WecomPro(Wecom , WecomContactClient):
pass
from config import Settings
from utils.wxcom.WXBizMsgCrypt3 import WXBizMsgCrypt
def get_wxcpt():
@@ -20,46 +13,22 @@ def get_wxcpt():
try:
# 验证企业微信配置是否完整
required_configs = [
setting.env.WECOM_APP_TOKEN,
setting.env.WECOM_APP_ENCODING_AES_KEY,
setting.env.WECOM_CORPID
Settings().WECOM_APP_TOKEN,
Settings().WECOM_APP_ENCODING_AES_KEY,
Settings().WECOM_CORPID
]
if not all(required_configs):
raise ValueError("企业微信配置不完整")
return WXBizMsgCrypt(
setting.env.WECOM_APP_TOKEN, # 设置的Token
setting.env.WECOM_APP_ENCODING_AES_KEY, # 设置密钥
setting.env.WECOM_CORPID # 企业ID
Settings().WECOM_APP_TOKEN, # 设置的Token
Settings().WECOM_APP_ENCODING_AES_KEY, # 设置密钥
Settings().WECOM_CORPID # 企业ID
)
except Exception as e:
logger.error(f"初始化WXBizMsgCrypt失败: {str(e)}")
raise
def get_wecom_service():
"""
初始化并返回 Wecom 服务实例
:param setting_env: 配置环境对象,包含企业微信相关配置
:return: Wecom 服务实例
"""
try:
# 验证企业微信配置是否完整
required_configs = [
setting.env.WECOM_CORPID,
setting.env.WECOM_CORPSECRET
]
if not all(required_configs):
raise ValueError("企业微信配置不完整")
return WecomPro(
corpid=setting.env.WECOM_CORPID,
corpsecret=setting.env.WECOM_CORPSECRET
)
except Exception as e:
logger.error(f"初始化Wecom服务失败: {str(e)}")
raise
wecom_service = get_wecom_service()
wxcpt = get_wxcpt()

2
uv.lock generated
View File

@@ -1331,6 +1331,7 @@ dependencies = [
{ name = "pickledb" },
{ name = "psycopg2-binary" },
{ name = "pydantic" },
{ name = "pydantic-settings" },
{ name = "pyjwt" },
{ name = "pyminio" },
{ name = "pytest" },
@@ -1352,6 +1353,7 @@ requires-dist = [
{ name = "pickledb", specifier = ">=1.3.2" },
{ name = "psycopg2-binary", specifier = ">=2.9.11" },
{ name = "pydantic", specifier = "<2.10" },
{ name = "pydantic-settings", specifier = ">=2.11.0" },
{ name = "pyjwt", specifier = ">=2.10.1" },
{ name = "pyminio", specifier = ">=0.3.1" },
{ name = "pytest", specifier = ">=8.4.1" },