添加mcp配置

This commit is contained in:
2026-01-15 17:34:22 +08:00
parent 0e4a85551f
commit d3bf64ae61
21 changed files with 1601 additions and 275 deletions

View File

@@ -0,0 +1,78 @@
from datetime import datetime, timedelta
from wecom.exceptions.general import SDKException
from wecom.schemas.token import (
AccessTokenInfo,
AccessTokenParams,
)
from wecom.utils.requests import HttpxRequest
BASE_URL: str = "https://qyapi.weixin.qq.com/cgi-bin"
class WecomBaseClient:
BASE_URL: str = BASE_URL
def __init__(self, corpid: str, corpsecret: str):
"""
企业微信SDK
@param corpid: 企业ID
@param corpsecret: 应用的凭证密钥
每个应用有独立的secret获取到的access_token只能本应用使用所以每个应用的access_token应该分开来获取
"""
self.corpid = corpid
self.corpsecret = corpsecret
self._access_token = None
self.access_token_valid_time = None
@property
async def access_token(self) -> str:
"""企业微信SDK的access_token"""
if (
self.access_token_valid_time
and datetime.now() < self.access_token_valid_time
):
return self._access_token
await self.__get_access_token()
return self._access_token
@access_token.setter
def access_token(self, value: str):
self._access_token = value
async def __get_access_token(self, refresh: bool = False) -> str:
"""
获取access_token
access_token的有效期通过返回的expires_in来传达正常情况下为7200秒2小时有效期内重复获取返回相同结果过期后获取会返回新的access_token。
由于企业微信每个应用的access_token是彼此独立的所以进行缓存时需要区分应用来进行存储。
详细说明https://work.weixin.qq.com/api/doc/90000/90135/91039
@return: access_token: str 或 None
"""
if (
not refresh
and self.access_token_valid_time
and datetime.now() < self.access_token_valid_time
):
return self.access_token
url = self.BASE_URL + "/gettoken"
params = AccessTokenParams(
corpid=self.corpid, corpsecret=self.corpsecret
).model_dump()
resp = AccessTokenInfo(**await HttpxRequest.get(url=url, params=params))
if resp.errcode == 0:
self.access_token_valid_time = datetime.now() + timedelta(
seconds=resp.expires_in
)
self.access_token = resp.access_token
return resp.access_token
else:
raise SDKException(resp.errcode, resp.errmsg)

View File

@@ -0,0 +1,82 @@
from wecom.exceptions.general import SDKException
from wecom.modules.base import WecomBaseClient
from wecom.schemas.departments import (
CreateDepartmentInfo,
CreateDepartmentParams,
DepartmentInfo,
UpdateDepartmentInfo,
UpdateDepartmentParams,
)
from wecom.utils.requests import HttpxRequest
class WecomDepartmentClient(WecomBaseClient):
async def create_departments(self, data: CreateDepartmentParams) -> int:
"""
创建部门
@param data: 创建部门的参数
@return: 部门id
"""
url = self.BASE_URL + "/department/create"
params = {"access_token": await self.access_token}
resp = CreateDepartmentInfo(
**await HttpxRequest.post(url=url, params=params, json=data)
)
if resp.errcode == 0:
return resp.id
else:
raise SDKException(resp.errcode, resp.errmsg)
async def delete_departments(self, id: int) -> bool:
"""
删除部门
@param id: 部门id
@return: 删除状态(Boolean)
"""
url = self.BASE_URL + "/department/delete"
params = {"access_token": await self.access_token, "id": id}
resp = await HttpxRequest.get(url=url, params=params)
if resp.errcode == 0:
return True
else:
raise SDKException(resp.errcode, resp.errmsg)
async def update_departments(self, data: UpdateDepartmentParams) -> bool:
"""
更新部门
@param data: 更新部门的参数
@return: 更新状态(Boolean)
"""
url = self.BASE_URL + "/department/update"
params = {"access_token": await self.access_token}
resp = UpdateDepartmentInfo(
**await HttpxRequest.post(url=url, params=params, json=data)
)
if resp.errcode == 0:
return True
else:
raise SDKException(resp.errcode, resp.errmsg)
async def get_departments(self, id: int = None) -> list[DepartmentInfo]:
"""
获取部门列表
@param id: 部门id。获取指定部门及其下的子部门。
如果不填,默认获取全量组织架构
@return: 部门列表
"""
url = self.BASE_URL + "/department/list"
params = {"access_token": await self.access_token, "id": id}
resp = DepartmentInfo(**await HttpxRequest.get(url=url, params=params))
if resp.errcode == 0:
return resp.department
else:
raise SDKException(resp.errcode, resp.errmsg)

View File

@@ -0,0 +1,60 @@
from typing import Literal
from wecom.exceptions.general import SDKException
from wecom.modules.base import WecomBaseClient
from wecom.schemas.message import (
MessageParams,
RecallMessageInfo,
RecallMessageParams,
SendMessageInfo,
)
from wecom.utils.requests import HttpxRequest
class WecomMessageClient(WecomBaseClient):
async def send_message(
self,
data: MessageParams,
) -> str:
"""
企业微信发送消息
@param data: 发送消息的参数
各类消息的参数详情 https://developer.work.weixin.qq.com/document/path/90236
@return: 消息ID
"""
url = self.BASE_URL + "/message/send"
params = {"access_token": await self.access_token}
data = data.model_dump()
resp = SendMessageInfo(
**await HttpxRequest.post(url=url, params=params, json=data)
)
if resp.errcode == 0:
return resp.msgid
else:
raise SDKException(resp.errcode, resp.errmsg)
async def recall_message(self, data: RecallMessageParams) -> bool:
"""
企业微信撤回消息
@param msgid: 消息ID
@return: 撤回状态(Boolean)
"""
data = data.model_dump()
url = self.BASE_URL + "/message/recall"
params = {"access_token": await self.access_token}
resp = RecallMessageInfo(
**await HttpxRequest.post(url=url, params=params, json=data)
)
if resp.errcode == 0:
return True
else:
raise SDKException(resp.errcode, resp.errmsg)

View File

@@ -0,0 +1,10 @@
from wecom.modules.base import WecomBaseClient
from wecom.modules.department import WecomDepartmentClient
from wecom.modules.message import WecomMessageClient
from wecom.modules.users import WecomUsersClient
class Wecom(
WecomDepartmentClient, WecomUsersClient, WecomMessageClient, WecomBaseClient
):
pass

View File

@@ -0,0 +1,95 @@
from wecom.exceptions.general import SDKException
from wecom.modules.base import WecomBaseClient
from wecom.schemas.departments import DepartmentInfo
from wecom.schemas.users import (
DepartmentUserDetailInfo,
DepartmentUserInfo,
UserInfo,
)
from wecom.utils.requests import HttpxRequest
class WecomUsersClient(WecomBaseClient):
async def get_user(self, userid: str) -> dict:
"""
读取成员
@param userid: 成员UserID。对应管理端的账号企业内必须唯一。不区分大小写长度为1~64个字节
@return: 成员信息
"""
url = self.BASE_URL + "/user/get"
params = {"access_token": await self.access_token, "userid": userid}
resp = UserInfo(**await HttpxRequest.get(url=url, params=params))
if resp.errcode == 0:
return resp.model_dump(exclude={"errcode", "errmsg"})
else:
raise SDKException(resp.errcode, resp.errmsg)
async def get_user_in_department_detail(self, department_id: str) -> dict:
"""
读取部门成员完整信息
@param department_id: 获取的部门id
@return: 部门成员信息
"""
url = self.BASE_URL + "/user/list"
params = {
"access_token": await self.access_token,
"department_id": department_id,
}
resp = DepartmentUserDetailInfo(
**await HttpxRequest.get(url=url, params=params)
)
if resp.errcode == 0:
return resp.model_dump(exclude={"errcode", "errmsg"})
else:
raise SDKException(resp.errcode, resp.errmsg)
async def get_user_in_department(self, department_id: int) -> dict:
"""
读取部门成员简要信息
@param department_id: 获取的部门id
@return: 部门成员信息
"""
url = self.BASE_URL + "/user/simplelist"
params = {
"access_token": await self.access_token,
"department_id": department_id,
}
resp = DepartmentUserInfo(**await HttpxRequest.get(url=url, params=params))
if resp.errcode == 0:
return resp.model_dump(exclude={"errcode", "errmsg"})
else:
raise SDKException(resp.errcode, resp.errmsg)
@staticmethod
def convert_userid(userid: str, decrypt: bool = False):
"""
学工号/企业微信ID转换方法
@param userid: 学工号/企业微信ID
@param decrypt: 是否解密
@return: 转换后的学工号/企业微信ID
"""
if decrypt:
year = str(int(userid[10:12]) + 1945)
no = str(int(userid[2:9]) - 115342)
no = no[1:7]
userid = year + no
else:
userid = (
"8"
+ userid[2:3]
+ str(int(userid[-6:]) + 1115342)
+ userid[8:9]
+ str(int(userid[0:4]) - 1945)
)
return userid