diff --git a/config.py b/config.py index 364239c..8f795f8 100644 --- a/config.py +++ b/config.py @@ -1,105 +1,11 @@ -import os -from pathlib import Path -from types import SimpleNamespace # 导入SimpleNamespace -from typing import Any, Dict - -import yaml -from dotenv import load_dotenv -from uvicorn.server import logger +from pydantic_settings import BaseSettings,SettingsConfigDict -class EnvConfig: - def __init__(self): - # 加载环境变量 - if os.path.exists(".env"): - if load_dotenv(".env"): - logger.info("[激活配置] .env 配置加载成功 ✅") - else: - # 将 exit(1) 更改为 warning,避免程序直接退出 - logger.warning( - "[激活配置] 环境变量加载失败,请检查 .env 文件内容是否正确" - ) - else: - logger.warning("[激活配置] .env 文件不存在,将不加载环境变量") - - def _parse_bool(self, value): - """将字符串形式的布尔值转换为布尔类型""" - if isinstance(value, str): - if value.lower() == "true": - return True - elif value.lower() == "false": - return False - return value - - def __getattr__(self, name): - value = os.getenv(name) - if value is not None: - return self._parse_bool(value) - default_name = f"DEFAULT_{name}" - default_value = os.getenv(default_name) - if default_value is not None: - return self._parse_bool(default_value) - return None - - def get(self, name, default=None): - value = os.getenv(name) - if value is not None: - return self._parse_bool(value) - return default - - -class YamlConfig: - def __init__(self, yaml_file: str = "config.yaml"): - self._yaml_file = Path(yaml_file) - self._load_settings() - - def _load_settings(self) -> None: - if not self._yaml_file.exists(): - raise FileNotFoundError(f"[激活配置] 配置文件 {self._yaml_file} 不存在") - - with open(self._yaml_file, "r", encoding="utf-8") as f: - config_data = yaml.safe_load(f) or {} - - self._set_attributes(config_data) - logger.info(f"[激活配置] config.yaml 配置加载成功 ✅") - - def _set_attributes(self, config_data: Dict[str, Any]) -> None: - # 使用 SimpleNamespace 更好地处理嵌套配置,使其可以通过属性访问 - for key, value in config_data.items(): - if isinstance(value, dict): - setattr(self, key, SimpleNamespace(**value)) - else: - setattr(self, key, value) - - # 移除 _from_dict 方法,因为 SimpleNamespace 可以直接从字典创建 - - def __repr__(self) -> str: - attrs = [] - for key in sorted(self.__dict__.keys()): - if not key.startswith("_"): - value = getattr(self, key) - attrs.append(f"{key}={repr(value)}") - return f"Settings({', '.join(attrs)})" - - def reload(self) -> None: - if self._yaml_file is None: - raise RuntimeError("无法重新加载,此实例是从字典创建的") - self._load_settings() - - -# 创建配置实例 -env_config = EnvConfig() -yaml_config = None -try: - yaml_config = YamlConfig() -except FileNotFoundError: - logger.warning("[激活配置] 配置文件 config.yaml 不存在,将不加载配置") - - -class Setting: - def __init__(self): - self.env = env_config - self.config = yaml_config - - -setting = Setting() +class Settings(BaseSettings): + model_config = SettingsConfigDict(env_file=".env" , env_prefix="WNZS_") + PGSQL: str = "" + WECOM_CORPID: str = "" + WECOM_CORPSECRET: str = "" + WECOM_APP_TOKEN: str = "" + WECOM_APP_ENCODING_AES_KEY: str = "" + \ No newline at end of file diff --git a/lifespan.py b/lifespan.py index 71dd55f..daadb86 100644 --- a/lifespan.py +++ b/lifespan.py @@ -19,7 +19,7 @@ def init_scheduler(app : FastAPI): def active_config(): logger.info(f"[激活配置] 加载配置 ⚙️") - from config import setting # noqa + from config import Settings # noqa def import_router(app: FastAPI): diff --git a/model/__init__.py b/model/__init__.py index 3ede093..6509439 100644 --- a/model/__init__.py +++ b/model/__init__.py @@ -1,8 +1,8 @@ from sqlmodel import Session, SQLModel, create_engine -from config import setting +from config import Settings -PGSQL = setting.env.PGSQL or "sqlite:///database.db" +PGSQL = Settings().PGSQL engine = create_engine(str(PGSQL)) diff --git a/pyproject.toml b/pyproject.toml index 9f7d658..a2550b3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ dependencies = [ "xmltodict>=1.0.2", "psycopg2-binary>=2.9.11", "fastscheduler[fastapi]>=0.1.2", + "pydantic-settings>=2.11.0", ] [dependency-groups] diff --git a/service/__init__.py b/service/__init__.py new file mode 100644 index 0000000..23270a6 --- /dev/null +++ b/service/__init__.py @@ -0,0 +1,9 @@ +from service.wecom import Wecom +from config import Settings +from utils.sing import SingletonProvider + +# 获取单例函数 + +get_wecom = SingletonProvider(lambda: Wecom( + Settings().WECOM_CORPID,Settings().WECOM_CORPSECRET +)) \ No newline at end of file diff --git a/service/wecom/__init__.py b/service/wecom/__init__.py new file mode 100644 index 0000000..6645ddb --- /dev/null +++ b/service/wecom/__init__.py @@ -0,0 +1,5 @@ +from wecom_sdk import Wecom + +__all__ = [ + "Wecom" +] \ No newline at end of file diff --git a/utils/sing.py b/utils/sing.py new file mode 100644 index 0000000..599a388 --- /dev/null +++ b/utils/sing.py @@ -0,0 +1,22 @@ +from typing import Callable, Generic, TypeVar + +T = TypeVar("T") + + +class SingletonProvider(Generic[T]): + def __init__(self, factory: Callable[[], T]): + self._factory = factory + self._instance: T | None = None + + def __call__(self) -> T: + if self._instance is None: + self._instance = self._factory() + return self._instance + + def reset(self) -> None: + """重置单例(测试 / 热重载用)""" + self._instance = None + + def warmup(self) -> T: + """提前初始化""" + return self() diff --git a/utils/wxcom/__init__.py b/utils/wxcom/__init__.py index a28a915..666963f 100644 --- a/utils/wxcom/__init__.py +++ b/utils/wxcom/__init__.py @@ -1,8 +1,7 @@ -from .wx_com import wecom_service,wxcpt +from .wx_com import wxcpt from .wx_utils import get_request_params,decrypt_message,extract_message_content __all__ = [ - "wecom_service", "wxcpt", "get_request_params", "decrypt_message", diff --git a/utils/wxcom/modules/contact.py b/utils/wxcom/modules/contact.py deleted file mode 100644 index f875809..0000000 --- a/utils/wxcom/modules/contact.py +++ /dev/null @@ -1,41 +0,0 @@ -from wecom_sdk.exceptions.general import SDKException -from wecom_sdk.modules.base import WecomBaseClient -from wecom_sdk.utils.requests import HttpxRequest - -class WecomContactClient(WecomBaseClient): - - async def get_contact_list(self , userid: str): - """ - 获取联系人列表 - @param userid: 用户id - - @return: 联系人列表 - """ - url = self.BASE_URL + "/externalcontact/list" - params = {"access_token": await self.access_token , "userid": userid} - resp = await HttpxRequest.post(url=url, params=params) - - if resp.errcode == 0: - return resp.external_contact_list - else: - raise SDKException(resp.errcode, resp.errmsg) - - async def get_contact_detail(self , external_userid: str , cursor : None | str = None): - """ - 获取联系人详情 - @param userid: 用户id - @param external_userid: 外部联系人id - @param cursor: 分页游标 - - @return: 联系人详情 - """ - url = self.BASE_URL + "/externalcontact/get" - params = {"access_token": await self.access_token , "external_userid": external_userid } - params.update({"cursor": cursor} if cursor else {}) - - resp = await HttpxRequest.post(url=url, params=params) - - if resp.get("errcode") == 0: - return resp.get("external_contact" , {}) - else: - raise SDKException(resp.errcode, resp.errmsg) \ No newline at end of file diff --git a/utils/wxcom/schemas/contact.py b/utils/wxcom/schemas/contact.py deleted file mode 100644 index eb78ff6..0000000 --- a/utils/wxcom/schemas/contact.py +++ /dev/null @@ -1,66 +0,0 @@ -from wecom_sdk.schemas.base import BaseSchema -from typing import List, Optional - -class TextAttr(BaseSchema): - value: str - -class WebAttr(BaseSchema): - url: str - title: str - -class MiniProgramAttr(BaseSchema): - appid: str - pagepath: str - title: str - -class ExternalAttr(BaseSchema): - type: int - name: str - text: Optional[TextAttr] = None - web: Optional[WebAttr] = None - miniprogram: Optional[MiniProgramAttr] = None - -class ExternalProfile(BaseSchema): - external_attr: List[ExternalAttr] - -class ExternalContact(BaseSchema): - external_userid: str - name: str - position: Optional[str] = None - avatar: Optional[str] = None - corp_name: Optional[str] = None - corp_full_name: Optional[str] = None - type: int - gender: int - unionid: Optional[str] = None - external_profile: Optional[ExternalProfile] = None - -class Tag(BaseSchema): - group_name: str - tag_name: str - tag_id: Optional[str] = None - type: int - -class WechatChannels(BaseSchema): - nickname: str - source: int - -class FollowUser(BaseSchema): - userid: str - remark: Optional[str] = None - description: Optional[str] = None - createtime: int - tags: Optional[List[Tag]] = None - remark_corp_name: Optional[str] = None - remark_mobiles: Optional[List[str]] = None - oper_userid: str - add_way: int - state: Optional[str] = None - wechat_channels: Optional[WechatChannels] = None - -class ContactResponse(BaseSchema): - errcode: int - errmsg: str - external_contact: ExternalContact - follow_user: List[FollowUser] - next_cursor: Optional[str] = None diff --git a/utils/wxcom/wx_com.py b/utils/wxcom/wx_com.py index bbe0acc..2918f65 100644 --- a/utils/wxcom/wx_com.py +++ b/utils/wxcom/wx_com.py @@ -1,13 +1,6 @@ from uvicorn.server import logger -from wecom_sdk import Wecom -from utils.wxcom.modules.contact import WecomContactClient - -from config import setting -from .WXBizMsgCrypt3 import WXBizMsgCrypt - - -class WecomPro(Wecom , WecomContactClient): - pass +from config import Settings +from utils.wxcom.WXBizMsgCrypt3 import WXBizMsgCrypt def get_wxcpt(): @@ -20,46 +13,22 @@ def get_wxcpt(): try: # 验证企业微信配置是否完整 required_configs = [ - setting.env.WECOM_APP_TOKEN, - setting.env.WECOM_APP_ENCODING_AES_KEY, - setting.env.WECOM_CORPID + Settings().WECOM_APP_TOKEN, + Settings().WECOM_APP_ENCODING_AES_KEY, + Settings().WECOM_CORPID ] + + if not all(required_configs): raise ValueError("企业微信配置不完整") return WXBizMsgCrypt( - setting.env.WECOM_APP_TOKEN, # 设置的Token - setting.env.WECOM_APP_ENCODING_AES_KEY, # 设置密钥 - setting.env.WECOM_CORPID # 企业ID + Settings().WECOM_APP_TOKEN, # 设置的Token + Settings().WECOM_APP_ENCODING_AES_KEY, # 设置密钥 + Settings().WECOM_CORPID # 企业ID ) except Exception as e: logger.error(f"初始化WXBizMsgCrypt失败: {str(e)}") raise - -def get_wecom_service(): - """ - 初始化并返回 Wecom 服务实例 - - :param setting_env: 配置环境对象,包含企业微信相关配置 - :return: Wecom 服务实例 - """ - try: - # 验证企业微信配置是否完整 - required_configs = [ - setting.env.WECOM_CORPID, - setting.env.WECOM_CORPSECRET - ] - if not all(required_configs): - raise ValueError("企业微信配置不完整") - - return WecomPro( - corpid=setting.env.WECOM_CORPID, - corpsecret=setting.env.WECOM_CORPSECRET - ) - except Exception as e: - logger.error(f"初始化Wecom服务失败: {str(e)}") - raise - -wecom_service = get_wecom_service() wxcpt = get_wxcpt() diff --git a/uv.lock b/uv.lock index c94abf7..9cb7bac 100644 --- a/uv.lock +++ b/uv.lock @@ -1331,6 +1331,7 @@ dependencies = [ { name = "pickledb" }, { name = "psycopg2-binary" }, { name = "pydantic" }, + { name = "pydantic-settings" }, { name = "pyjwt" }, { name = "pyminio" }, { name = "pytest" }, @@ -1352,6 +1353,7 @@ requires-dist = [ { name = "pickledb", specifier = ">=1.3.2" }, { name = "psycopg2-binary", specifier = ">=2.9.11" }, { name = "pydantic", specifier = "<2.10" }, + { name = "pydantic-settings", specifier = ">=2.11.0" }, { name = "pyjwt", specifier = ">=2.10.1" }, { name = "pyminio", specifier = ">=0.3.1" }, { name = "pytest", specifier = ">=8.4.1" },