Merge commit 'd3bf64ae61cebccd20b2c587f08fbd9e3b4ceb79'

This commit is contained in:
2026-01-15 17:34:49 +08:00
33 changed files with 1723 additions and 528 deletions

3
.gitignore vendored
View File

@@ -14,4 +14,5 @@ wheels/
# prod file
.sqlite
fallback.log
database.db
database.db
fastscheduler_state.json

112
config.py
View File

@@ -1,105 +1,11 @@
import os
from pathlib import Path
from types import SimpleNamespace # 导入SimpleNamespace
from typing import Any, Dict
import yaml
from dotenv import load_dotenv
from uvicorn.server import logger
from pydantic_settings import BaseSettings,SettingsConfigDict
class EnvConfig:
def __init__(self):
# 加载环境变量
if os.path.exists(".env"):
if load_dotenv(".env"):
logger.info("[激活配置] .env 配置加载成功 ✅")
else:
# 将 exit(1) 更改为 warning避免程序直接退出
logger.warning(
"[激活配置] 环境变量加载失败,请检查 .env 文件内容是否正确"
)
else:
logger.warning("[激活配置] .env 文件不存在,将不加载环境变量")
def _parse_bool(self, value):
"""将字符串形式的布尔值转换为布尔类型"""
if isinstance(value, str):
if value.lower() == "true":
return True
elif value.lower() == "false":
return False
return value
def __getattr__(self, name):
value = os.getenv(name)
if value is not None:
return self._parse_bool(value)
default_name = f"DEFAULT_{name}"
default_value = os.getenv(default_name)
if default_value is not None:
return self._parse_bool(default_value)
return None
def get(self, name, default=None):
value = os.getenv(name)
if value is not None:
return self._parse_bool(value)
return default
class YamlConfig:
def __init__(self, yaml_file: str = "config.yaml"):
self._yaml_file = Path(yaml_file)
self._load_settings()
def _load_settings(self) -> None:
if not self._yaml_file.exists():
raise FileNotFoundError(f"[激活配置] 配置文件 {self._yaml_file} 不存在")
with open(self._yaml_file, "r", encoding="utf-8") as f:
config_data = yaml.safe_load(f) or {}
self._set_attributes(config_data)
logger.info(f"[激活配置] config.yaml 配置加载成功 ✅")
def _set_attributes(self, config_data: Dict[str, Any]) -> None:
# 使用 SimpleNamespace 更好地处理嵌套配置,使其可以通过属性访问
for key, value in config_data.items():
if isinstance(value, dict):
setattr(self, key, SimpleNamespace(**value))
else:
setattr(self, key, value)
# 移除 _from_dict 方法,因为 SimpleNamespace 可以直接从字典创建
def __repr__(self) -> str:
attrs = []
for key in sorted(self.__dict__.keys()):
if not key.startswith("_"):
value = getattr(self, key)
attrs.append(f"{key}={repr(value)}")
return f"Settings({', '.join(attrs)})"
def reload(self) -> None:
if self._yaml_file is None:
raise RuntimeError("无法重新加载,此实例是从字典创建的")
self._load_settings()
# 创建配置实例
env_config = EnvConfig()
yaml_config = None
try:
yaml_config = YamlConfig()
except FileNotFoundError:
logger.warning("[激活配置] 配置文件 config.yaml 不存在,将不加载配置")
class Setting:
def __init__(self):
self.env = env_config
self.config = yaml_config
setting = Setting()
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env" , env_prefix="WNZS_")
PGSQL: str = ""
WECOM_CORPID: str = ""
WECOM_CORPSECRET: str = ""
WECOM_APP_TOKEN: str = ""
WECOM_APP_ENCODING_AES_KEY: str = ""

View File

@@ -10,10 +10,16 @@ def init_database():
create_db_and_tables()
logger.info("[数据库] 数据库初始化完成 ✅")
def init_scheduler(app : FastAPI):
from scheduler import init_scheduler_router
logger.info("[定时任务] 初始化定时任务 📦")
init_scheduler_router(app)
logger.info("[定时任务] 定时任务初始化完成 ✅")
def active_config():
logger.info(f"[激活配置] 加载配置 ⚙️")
from config import setting # noqa
from config import Settings # noqa
def import_router(app: FastAPI):
@@ -23,7 +29,11 @@ def import_router(app: FastAPI):
app.include_router(router)
logger.info(f"[导入路由] 路由导入完成 ✅")
async def import_mcp_server(app: FastAPI):
logger.info(f"[导入MCP] 开始导入MCP 🛣️")
from mcps import create_mcp_app
app.mount("/app" , await create_mcp_app())
logger.info(f"[导入MCP] MCP导入完成 ✅")
@asynccontextmanager
async def lifespan(app: FastAPI):
@@ -31,5 +41,7 @@ async def lifespan(app: FastAPI):
active_config()
init_database()
import_router(app)
init_scheduler(app)
await import_mcp_server(app)
yield
logger.info(f"[生命周期] 应用关闭 🔧✅")

View File

@@ -2,9 +2,11 @@ from fastapi import FastAPI
from handler.exception import install as exception_install
from lifespan import lifespan
from mcps import create_mcp_app
app = FastAPI(lifespan=lifespan)
exception_install(app)
if __name__ == "__main__":

8
mcps/__init__.py Normal file
View File

@@ -0,0 +1,8 @@
from fastmcp import FastMCP
from mcps.test.test import weather_mcp
async def create_mcp_app():
main_mcp = FastMCP("MCP 主服务")
await main_mcp.import_server(weather_mcp , prefix="test")
return main_mcp.http_app()

3
mcps/test/test.py Normal file
View File

@@ -0,0 +1,3 @@
from fastmcp import FastMCP
weather_mcp = FastMCP(name="WeatherService")

View File

@@ -1,8 +1,8 @@
from sqlmodel import Session, SQLModel, create_engine
from config import setting
from config import Settings
PGSQL = setting.env.PGSQL or "sqlite:///database.db"
PGSQL = Settings().PGSQL
engine = create_engine(str(PGSQL))

View File

@@ -0,0 +1,22 @@
from sqlmodel import SQLModel, Field, Column, JSON
class Department(SQLModel, table = True):
did: int = Field(default=None, primary_key=True)
dname: str = Field(max_length=100)
name_en: str = Field(max_length=100)
department_leader: list[int] = Field(default=[], sa_column=Column(JSON))
parent_id: int = Field(default=0)
order: int = Field(default=0)
class Employee(SQLModel, table = True):
userid: int = Field(default=None, primary_key=True)
ename: str = Field(max_length=100)
dept_id: int = Field(foreign_key='Department.did')
open_userid: str = Field(max_length=100)

View File

@@ -3,7 +3,7 @@ name = "wecom-wnzs-adapter"
version = "0.1.0"
description = "企业微信万能助手适配器"
readme = "README.md"
requires-python = ">=3.12"
requires-python = ">=3.13"
dependencies = [
"casbin>=1.43.0",
"fastapi[standard]>=0.116.1",
@@ -16,11 +16,12 @@ dependencies = [
"sqlmodel>=0.0.24",
"uvicorn>=0.35.0",
"apscheduler>=3.11.0",
"pydantic<2.10",
"pickledb>=1.3.2",
"wecom-sdk>=1.0.0",
"xmltodict>=1.0.2",
"psycopg2-binary>=2.9.11",
"fastscheduler[fastapi]>=0.1.2",
"pydantic-settings>=2.11.0",
"fastmcp>=2.14.3",
]
[dependency-groups]

7
scheduler/__init__.py Normal file
View File

@@ -0,0 +1,7 @@
from fastapi import FastAPI
from fastscheduler.fastapi_integration import create_scheduler_routes
from scheduler.scheduler import scheduler
def init_scheduler_router(app : FastAPI):
app.include_router(create_scheduler_routes(scheduler))
scheduler.start()

9
scheduler/scheduler.py Normal file
View File

@@ -0,0 +1,9 @@
from fastscheduler import FastScheduler
scheduler = FastScheduler(quiet=True)
@scheduler.every(10).seconds
def background_task():
print("Background work")

9
service/__init__.py Normal file
View File

@@ -0,0 +1,9 @@
from service.wecom import Wecom
from config import Settings
from utils.sing import SingletonProvider
# 获取单例函数
get_wecom = SingletonProvider(lambda: Wecom(
Settings().WECOM_CORPID,Settings().WECOM_CORPSECRET
))

View File

@@ -0,0 +1,4 @@
from .modules.mixin import Wecom
__VERSION__ = "1.0.0"
__AUTHOR__ = "Jasar Ayiken"

View File

@@ -0,0 +1,3 @@
from typing import Literal
MESSAGE_TYPES: Literal["text", "image", "voice", "video", "textcard", "news", "mpnews"]

View File

@@ -0,0 +1,14 @@
class SDKException(Exception):
def __init__(self, errcode: int, message: str):
"""
通用错误返回类,用于抛出请求错误时的异常
- 若请求返回的errcode不为0则抛出此异常
@param errcode: 错误码
@param message: 错误信息
"""
self.errcode = str(errcode)
self.message = message
def __str__(self):
return f"Error Occured: {self.errcode} - {self.message}"

View File

@@ -0,0 +1,78 @@
from datetime import datetime, timedelta
from wecom.exceptions.general import SDKException
from wecom.schemas.token import (
AccessTokenInfo,
AccessTokenParams,
)
from wecom.utils.requests import HttpxRequest
BASE_URL: str = "https://qyapi.weixin.qq.com/cgi-bin"
class WecomBaseClient:
BASE_URL: str = BASE_URL
def __init__(self, corpid: str, corpsecret: str):
"""
企业微信SDK
@param corpid: 企业ID
@param corpsecret: 应用的凭证密钥
每个应用有独立的secret获取到的access_token只能本应用使用所以每个应用的access_token应该分开来获取
"""
self.corpid = corpid
self.corpsecret = corpsecret
self._access_token = None
self.access_token_valid_time = None
@property
async def access_token(self) -> str:
"""企业微信SDK的access_token"""
if (
self.access_token_valid_time
and datetime.now() < self.access_token_valid_time
):
return self._access_token
await self.__get_access_token()
return self._access_token
@access_token.setter
def access_token(self, value: str):
self._access_token = value
async def __get_access_token(self, refresh: bool = False) -> str:
"""
获取access_token
access_token的有效期通过返回的expires_in来传达正常情况下为7200秒2小时有效期内重复获取返回相同结果过期后获取会返回新的access_token。
由于企业微信每个应用的access_token是彼此独立的所以进行缓存时需要区分应用来进行存储。
详细说明https://work.weixin.qq.com/api/doc/90000/90135/91039
@return: access_token: str 或 None
"""
if (
not refresh
and self.access_token_valid_time
and datetime.now() < self.access_token_valid_time
):
return self.access_token
url = self.BASE_URL + "/gettoken"
params = AccessTokenParams(
corpid=self.corpid, corpsecret=self.corpsecret
).model_dump()
resp = AccessTokenInfo(**await HttpxRequest.get(url=url, params=params))
if resp.errcode == 0:
self.access_token_valid_time = datetime.now() + timedelta(
seconds=resp.expires_in
)
self.access_token = resp.access_token
return resp.access_token
else:
raise SDKException(resp.errcode, resp.errmsg)

View File

@@ -0,0 +1,82 @@
from wecom.exceptions.general import SDKException
from wecom.modules.base import WecomBaseClient
from wecom.schemas.departments import (
CreateDepartmentInfo,
CreateDepartmentParams,
DepartmentInfo,
UpdateDepartmentInfo,
UpdateDepartmentParams,
)
from wecom.utils.requests import HttpxRequest
class WecomDepartmentClient(WecomBaseClient):
async def create_departments(self, data: CreateDepartmentParams) -> int:
"""
创建部门
@param data: 创建部门的参数
@return: 部门id
"""
url = self.BASE_URL + "/department/create"
params = {"access_token": await self.access_token}
resp = CreateDepartmentInfo(
**await HttpxRequest.post(url=url, params=params, json=data)
)
if resp.errcode == 0:
return resp.id
else:
raise SDKException(resp.errcode, resp.errmsg)
async def delete_departments(self, id: int) -> bool:
"""
删除部门
@param id: 部门id
@return: 删除状态(Boolean)
"""
url = self.BASE_URL + "/department/delete"
params = {"access_token": await self.access_token, "id": id}
resp = await HttpxRequest.get(url=url, params=params)
if resp.errcode == 0:
return True
else:
raise SDKException(resp.errcode, resp.errmsg)
async def update_departments(self, data: UpdateDepartmentParams) -> bool:
"""
更新部门
@param data: 更新部门的参数
@return: 更新状态(Boolean)
"""
url = self.BASE_URL + "/department/update"
params = {"access_token": await self.access_token}
resp = UpdateDepartmentInfo(
**await HttpxRequest.post(url=url, params=params, json=data)
)
if resp.errcode == 0:
return True
else:
raise SDKException(resp.errcode, resp.errmsg)
async def get_departments(self, id: int = None) -> list[DepartmentInfo]:
"""
获取部门列表
@param id: 部门id。获取指定部门及其下的子部门。
如果不填,默认获取全量组织架构
@return: 部门列表
"""
url = self.BASE_URL + "/department/list"
params = {"access_token": await self.access_token, "id": id}
resp = DepartmentInfo(**await HttpxRequest.get(url=url, params=params))
if resp.errcode == 0:
return resp.department
else:
raise SDKException(resp.errcode, resp.errmsg)

View File

@@ -0,0 +1,60 @@
from typing import Literal
from wecom.exceptions.general import SDKException
from wecom.modules.base import WecomBaseClient
from wecom.schemas.message import (
MessageParams,
RecallMessageInfo,
RecallMessageParams,
SendMessageInfo,
)
from wecom.utils.requests import HttpxRequest
class WecomMessageClient(WecomBaseClient):
async def send_message(
self,
data: MessageParams,
) -> str:
"""
企业微信发送消息
@param data: 发送消息的参数
各类消息的参数详情 https://developer.work.weixin.qq.com/document/path/90236
@return: 消息ID
"""
url = self.BASE_URL + "/message/send"
params = {"access_token": await self.access_token}
data = data.model_dump()
resp = SendMessageInfo(
**await HttpxRequest.post(url=url, params=params, json=data)
)
if resp.errcode == 0:
return resp.msgid
else:
raise SDKException(resp.errcode, resp.errmsg)
async def recall_message(self, data: RecallMessageParams) -> bool:
"""
企业微信撤回消息
@param msgid: 消息ID
@return: 撤回状态(Boolean)
"""
data = data.model_dump()
url = self.BASE_URL + "/message/recall"
params = {"access_token": await self.access_token}
resp = RecallMessageInfo(
**await HttpxRequest.post(url=url, params=params, json=data)
)
if resp.errcode == 0:
return True
else:
raise SDKException(resp.errcode, resp.errmsg)

View File

@@ -0,0 +1,10 @@
from wecom.modules.base import WecomBaseClient
from wecom.modules.department import WecomDepartmentClient
from wecom.modules.message import WecomMessageClient
from wecom.modules.users import WecomUsersClient
class Wecom(
WecomDepartmentClient, WecomUsersClient, WecomMessageClient, WecomBaseClient
):
pass

View File

@@ -0,0 +1,95 @@
from wecom.exceptions.general import SDKException
from wecom.modules.base import WecomBaseClient
from wecom.schemas.departments import DepartmentInfo
from wecom.schemas.users import (
DepartmentUserDetailInfo,
DepartmentUserInfo,
UserInfo,
)
from wecom.utils.requests import HttpxRequest
class WecomUsersClient(WecomBaseClient):
async def get_user(self, userid: str) -> dict:
"""
读取成员
@param userid: 成员UserID。对应管理端的账号企业内必须唯一。不区分大小写长度为1~64个字节
@return: 成员信息
"""
url = self.BASE_URL + "/user/get"
params = {"access_token": await self.access_token, "userid": userid}
resp = UserInfo(**await HttpxRequest.get(url=url, params=params))
if resp.errcode == 0:
return resp.model_dump(exclude={"errcode", "errmsg"})
else:
raise SDKException(resp.errcode, resp.errmsg)
async def get_user_in_department_detail(self, department_id: str) -> dict:
"""
读取部门成员完整信息
@param department_id: 获取的部门id
@return: 部门成员信息
"""
url = self.BASE_URL + "/user/list"
params = {
"access_token": await self.access_token,
"department_id": department_id,
}
resp = DepartmentUserDetailInfo(
**await HttpxRequest.get(url=url, params=params)
)
if resp.errcode == 0:
return resp.model_dump(exclude={"errcode", "errmsg"})
else:
raise SDKException(resp.errcode, resp.errmsg)
async def get_user_in_department(self, department_id: int) -> dict:
"""
读取部门成员简要信息
@param department_id: 获取的部门id
@return: 部门成员信息
"""
url = self.BASE_URL + "/user/simplelist"
params = {
"access_token": await self.access_token,
"department_id": department_id,
}
resp = DepartmentUserInfo(**await HttpxRequest.get(url=url, params=params))
if resp.errcode == 0:
return resp.model_dump(exclude={"errcode", "errmsg"})
else:
raise SDKException(resp.errcode, resp.errmsg)
@staticmethod
def convert_userid(userid: str, decrypt: bool = False):
"""
学工号/企业微信ID转换方法
@param userid: 学工号/企业微信ID
@param decrypt: 是否解密
@return: 转换后的学工号/企业微信ID
"""
if decrypt:
year = str(int(userid[10:12]) + 1945)
no = str(int(userid[2:9]) - 115342)
no = no[1:7]
userid = year + no
else:
userid = (
"8"
+ userid[2:3]
+ str(int(userid[-6:]) + 1115342)
+ userid[8:9]
+ str(int(userid[0:4]) - 1945)
)
return userid

View File

@@ -0,0 +1,13 @@
from pydantic import BaseModel, ConfigDict
class BaseSchema(BaseModel):
model_config = ConfigDict(
extra="ignore",
use_enum_values=True,
from_attributes=True,
validate_assignment=True,
populate_by_name=True,
coerce_numbers_to_str=True,
arbitrary_types_allowed=True,
)

View File

@@ -0,0 +1,58 @@
from typing import AnyStr, List
from wecom.schemas.base import BaseSchema
class CreateDepartmentParams(BaseSchema):
"""
创建部门
@param name: 部门名称。长度限制为1~32个字节字符不能包括\:?”<>
@param name_en: 英文名称
@param parentid: 父部门id。根部门id为1
@param order: 在父部门中的次序值。order值小的排序靠前。
@param id: 部门id整型。指定时必须大于1不指定时则自动生成
"""
name: str
name_en: str | None = None
parentid: int
order: int | None = None
id: int | None = None
class UpdateDepartmentParams(CreateDepartmentParams): ...
class UpdateDepartmentInfo(BaseSchema):
errcode: int
errmsg: AnyStr
class CreateDepartmentInfo(BaseSchema):
errcode: int
errmsg: AnyStr
id: int
class DepartmentInfo(BaseSchema):
"""
部门单体响应数据
"""
id: int
name: AnyStr
name_en: AnyStr | None = None
department_leader: List[str] | None = None
parentid: int | None = None
order: int | None = None
class DepartmentInfo(BaseSchema):
"""
部门整体响应数据
"""
errcode: int
errmsg: AnyStr
department: List[DepartmentInfo]

View File

@@ -0,0 +1,94 @@
from typing import AnyStr, Literal
from wecom.schemas.base import BaseSchema
class MessageParams(BaseSchema):
"""
发送消息参数
各类消息的参数详情 https://developer.work.weixin.qq.com/document/path/90236
根据msgtype的不同选择对应的消息内容填充即可
@param touser: 指定接收消息的成员成员ID列表多个接收者用|分隔最多支持1000个
@param toparty: 指定接收消息的部门部门ID列表多个接收者用|分隔最多支持100个。
@param totag: 指定接收消息的标签标签ID列表多个接收者用|分隔最多支持100个。
@param msgtype: 消息类型此时固定为text
@param agentid: 企业应用的id整型。企业内部开发可在应用的设置页面查看第三方服务商可通过接口 获取企业授权信息 获取该参数值
@param safe: 表示是否是保密消息0表示可对外分享1表示不能分享且内容显示水印默认为0
@param enable_id_trans: 表示是否开启id转译0表示否1表示是默认0。仅第三方应用需要用到企业自建应用可以忽略。
@param enable_duplicate_check: 表示是否开启重复消息检查0表示否1表示是默认0
@param duplicate_check_interval: 表示是否重复消息检查的时间间隔默认1800s最大不超过4小时
touser、toparty、totag不能同时为空后面不再强调
"""
touser: AnyStr | None = None
toparty: AnyStr | None = None
totag: AnyStr | None = None
msgtype: Literal[
"text", "image", "voice", "video", "textcard", "news", "mpnews", "markdown"
]
agentid: int
# 各种类型的消息内容
text: dict | None = None
voice: dict | None = None
video: dict | None = None
file: dict | None = None
textcard: dict | None = None
news: dict | None = None
mpnews: dict | None = None
markdown: dict | None = None
safe: int = 0
enable_id_trans: int = 0
enable_duplicate_check: int = 0
duplicate_check_interval: int = 1800
class SendMessageInfo(BaseSchema):
"""
发送消息响应数据
"""
errcode: int
errmsg: AnyStr
invaliduser: AnyStr | None = None
invalidparty: AnyStr | None = None
invalidtag: AnyStr | None = None
unlicenseduser: AnyStr | None = None
msgid: AnyStr | None = None
response_code: AnyStr | None = None
class SendMessageInvalid(BaseSchema):
"""
发送消息失败响应数据
"""
errmsg: AnyStr
invaliduser: AnyStr | None = None
invalidparty: AnyStr | None = None
invalidtag: AnyStr | None = None
unlicenseduser: AnyStr | None = None
class RecallMessageParams(BaseSchema):
"""
撤回消息请求参数
"""
msgid: AnyStr
class RecallMessageInfo(BaseSchema):
"""
撤回消息响应数据
"""
errcode: int
errmsg: AnyStr

View File

@@ -0,0 +1,36 @@
from wecom.schemas.base import BaseSchema
class AccessTokenParams(BaseSchema):
"""
获取access_token的参数
@param corpid: 企业ID
@param corpsecret: 应用的凭证密钥
"""
corpid: str
corpsecret: str
class AccessTokenInfo(BaseSchema):
"""
获取access_token的返回数据
@param errcode: 返回码 出错返回码为0表示成功非0表示调用失败
@param errmsg: 对返回码的文本描述内容
@param access_token: 获取到的凭证 最长为512字节
@param expires_in: 凭证的有效时间(秒)
"""
errcode: int
errmsg: str
access_token: str | None = None
expires_in: int | None = None
class AccessTokenInvalid(BaseSchema):
"""
获取access_token失败时的返回数据
@param errmsg: 错误信息
"""
errmsg: str

View File

@@ -0,0 +1,39 @@
from typing import AnyStr, List
from wecom.schemas.base import BaseSchema
class UserInfo(BaseSchema):
"""
用户单体响应数据
"""
errcode: int
errmsg: AnyStr
userid: AnyStr | None = None
name: AnyStr | None = None
department: List[int] | None = None
position: AnyStr | None = None
moblie: AnyStr | None = None
gender: int | None = None
email: AnyStr | None = None
status: int | None = None
class UserSimpleInfo(BaseSchema):
userid: AnyStr
name: AnyStr
department: List[int]
open_userid: AnyStr | None = None
class DepartmentUserInfo(BaseSchema):
errcode: int
errmsg: AnyStr
userlist: List[UserSimpleInfo]
class DepartmentUserDetailInfo(BaseSchema):
errcode: int
errmsg: AnyStr
userlist: List[UserInfo]

View File

View File

@@ -0,0 +1,42 @@
import httpx
class HttpxRequest:
@classmethod
async def get(
cls, url: str, params: dict | None = None, headers: dict | None = None
) -> dict:
"""
发送GET请求
@param url: 请求URL
@param params: 请求参数
@param headers: 请求头
@return: 响应内容
"""
async with httpx.AsyncClient() as client:
response = await client.get(url, params=params, headers=headers)
response.raise_for_status()
return response.json()
@classmethod
async def post(
cls,
url: str,
params: dict | None = None,
data: dict | None = None,
json: dict | None = None,
headers: dict | None = None,
) -> dict:
"""
发送POST请求
@param url: 请求URL
@param params: 请求参数
@param headers: 请求头
@return: 响应内容
"""
async with httpx.AsyncClient() as client:
response = await client.post(
url, params=params, data=data, json=json, headers=headers
)
response.raise_for_status()
return response.json()

22
utils/sing.py Normal file
View File

@@ -0,0 +1,22 @@
from typing import Callable, Generic, TypeVar
T = TypeVar("T")
class SingletonProvider(Generic[T]):
def __init__(self, factory: Callable[[], T]):
self._factory = factory
self._instance: T | None = None
def __call__(self) -> T:
if self._instance is None:
self._instance = self._factory()
return self._instance
def reset(self) -> None:
"""重置单例(测试 / 热重载用)"""
self._instance = None
def warmup(self) -> T:
"""提前初始化"""
return self()

View File

@@ -1,8 +1,7 @@
from .wx_com import wecom_service,wxcpt
from .wx_com import wxcpt
from .wx_utils import get_request_params,decrypt_message,extract_message_content
__all__ = [
"wecom_service",
"wxcpt",
"get_request_params",
"decrypt_message",

View File

@@ -1,41 +0,0 @@
from wecom_sdk.exceptions.general import SDKException
from wecom_sdk.modules.base import WecomBaseClient
from wecom_sdk.utils.requests import HttpxRequest
class WecomContactClient(WecomBaseClient):
async def get_contact_list(self , userid: str):
"""
获取联系人列表
@param userid: 用户id
@return: 联系人列表
"""
url = self.BASE_URL + "/externalcontact/list"
params = {"access_token": await self.access_token , "userid": userid}
resp = await HttpxRequest.post(url=url, params=params)
if resp.errcode == 0:
return resp.external_contact_list
else:
raise SDKException(resp.errcode, resp.errmsg)
async def get_contact_detail(self , external_userid: str , cursor : None | str = None):
"""
获取联系人详情
@param userid: 用户id
@param external_userid: 外部联系人id
@param cursor: 分页游标
@return: 联系人详情
"""
url = self.BASE_URL + "/externalcontact/get"
params = {"access_token": await self.access_token , "external_userid": external_userid }
params.update({"cursor": cursor} if cursor else {})
resp = await HttpxRequest.post(url=url, params=params)
if resp.get("errcode") == 0:
return resp.get("external_contact" , {})
else:
raise SDKException(resp.errcode, resp.errmsg)

View File

@@ -1,66 +0,0 @@
from wecom_sdk.schemas.base import BaseSchema
from typing import List, Optional
class TextAttr(BaseSchema):
value: str
class WebAttr(BaseSchema):
url: str
title: str
class MiniProgramAttr(BaseSchema):
appid: str
pagepath: str
title: str
class ExternalAttr(BaseSchema):
type: int
name: str
text: Optional[TextAttr] = None
web: Optional[WebAttr] = None
miniprogram: Optional[MiniProgramAttr] = None
class ExternalProfile(BaseSchema):
external_attr: List[ExternalAttr]
class ExternalContact(BaseSchema):
external_userid: str
name: str
position: Optional[str] = None
avatar: Optional[str] = None
corp_name: Optional[str] = None
corp_full_name: Optional[str] = None
type: int
gender: int
unionid: Optional[str] = None
external_profile: Optional[ExternalProfile] = None
class Tag(BaseSchema):
group_name: str
tag_name: str
tag_id: Optional[str] = None
type: int
class WechatChannels(BaseSchema):
nickname: str
source: int
class FollowUser(BaseSchema):
userid: str
remark: Optional[str] = None
description: Optional[str] = None
createtime: int
tags: Optional[List[Tag]] = None
remark_corp_name: Optional[str] = None
remark_mobiles: Optional[List[str]] = None
oper_userid: str
add_way: int
state: Optional[str] = None
wechat_channels: Optional[WechatChannels] = None
class ContactResponse(BaseSchema):
errcode: int
errmsg: str
external_contact: ExternalContact
follow_user: List[FollowUser]
next_cursor: Optional[str] = None

View File

@@ -1,13 +1,6 @@
from uvicorn.server import logger
from wecom_sdk import Wecom
from utils.wxcom.modules.contact import WecomContactClient
from config import setting
from .WXBizMsgCrypt3 import WXBizMsgCrypt
class WecomPro(Wecom , WecomContactClient):
pass
from config import Settings
from utils.wxcom.WXBizMsgCrypt3 import WXBizMsgCrypt
def get_wxcpt():
@@ -20,46 +13,22 @@ def get_wxcpt():
try:
# 验证企业微信配置是否完整
required_configs = [
setting.env.WECOM_APP_TOKEN,
setting.env.WECOM_APP_ENCODING_AES_KEY,
setting.env.WECOM_CORPID
Settings().WECOM_APP_TOKEN,
Settings().WECOM_APP_ENCODING_AES_KEY,
Settings().WECOM_CORPID
]
if not all(required_configs):
raise ValueError("企业微信配置不完整")
return WXBizMsgCrypt(
setting.env.WECOM_APP_TOKEN, # 设置的Token
setting.env.WECOM_APP_ENCODING_AES_KEY, # 设置密钥
setting.env.WECOM_CORPID # 企业ID
Settings().WECOM_APP_TOKEN, # 设置的Token
Settings().WECOM_APP_ENCODING_AES_KEY, # 设置密钥
Settings().WECOM_CORPID # 企业ID
)
except Exception as e:
logger.error(f"初始化WXBizMsgCrypt失败: {str(e)}")
raise
def get_wecom_service():
"""
初始化并返回 Wecom 服务实例
:param setting_env: 配置环境对象,包含企业微信相关配置
:return: Wecom 服务实例
"""
try:
# 验证企业微信配置是否完整
required_configs = [
setting.env.WECOM_CORPID,
setting.env.WECOM_CORPSECRET
]
if not all(required_configs):
raise ValueError("企业微信配置不完整")
return WecomPro(
corpid=setting.env.WECOM_CORPID,
corpsecret=setting.env.WECOM_CORPSECRET
)
except Exception as e:
logger.error(f"初始化Wecom服务失败: {str(e)}")
raise
wecom_service = get_wecom_service()
wxcpt = get_wxcpt()

1238
uv.lock generated

File diff suppressed because it is too large Load Diff