Files
wecom-wnzs-adapter/utils/wxcom/wx_utils.py
2026-01-14 17:58:25 +08:00

94 lines
3.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import time
from typing import Dict, Tuple, Union
import xmltodict
from fastapi import HTTPException, Request
from uvicorn.server import logger
from .wx_com import wxcpt
async def get_request_params(request: Request) -> Tuple[bytes, str, str, str]:
"""获取请求参数并验证"""
body = await request.body()
msg_signature = request.query_params.get("msg_signature")
timestamp = request.query_params.get("timestamp")
nonce = request.query_params.get("nonce")
if not all([msg_signature, timestamp, nonce]):
raise HTTPException(status_code=400, detail="缺少必要的参数")
# logger.info(
# f"收到消息推送: msg_signature={msg_signature}, timestamp={timestamp}, nonce={nonce}"
# )
return body, msg_signature, timestamp, nonce
def decrypt_message(body: bytes, msg_signature: str, timestamp: str, nonce: str) -> dict:
"""解密消息"""
ret, sMsg = wxcpt.DecryptMsg(body, msg_signature, timestamp, nonce)
if ret != 0:
logger.error(f"消息解密失败,错误码: {ret}")
raise HTTPException(status_code=400, detail="消息解密失败")
xml_dict = xmltodict.parse(sMsg)
# logger.info(f"解密后的消息内容: {xml_dict}")
return xml_dict
def extract_message_content(
xml_dict: Dict,
) -> Tuple[str, str, str, str, Union[Dict[str, Union[str, None]], str, None], str, str]:
"""提取消息内容,支持多种消息类型"""
xml_content = xml_dict["xml"]
to_user_name = xml_content.get("ToUserName")
from_user_name = xml_content.get("FromUserName")
create_time = xml_content.get("CreateTime")
msg_type = xml_content.get("MsgType")
msg_id = xml_content.get("MsgId")
agent_id = xml_content.get("AgentID")
message_data = {}
if msg_type == "text":
message_data["Content"] = xml_content.get("Content")
logger.info(f"收到文本消息: {message_data['Content']}")
elif msg_type == "image":
message_data["PicUrl"] = xml_content.get("PicUrl")
message_data["MediaId"] = xml_content.get("MediaId")
logger.info(f"收到图片消息媒体ID: {message_data['MediaId']}")
elif msg_type == "voice":
message_data["MediaId"] = xml_content.get("MediaId")
message_data["Format"] = xml_content.get("Format")
logger.info(f"收到语音消息媒体ID: {message_data['MediaId']}")
elif msg_type == "video":
message_data["MediaId"] = xml_content.get("MediaId")
message_data["ThumbMediaId"] = xml_content.get("ThumbMediaId")
logger.info(f"收到视频消息媒体ID: {message_data['MediaId']}")
elif msg_type == "location":
message_data["Location_X"] = xml_content.get("Location_X")
message_data["Location_Y"] = xml_content.get("Location_Y")
message_data["Scale"] = xml_content.get("Scale")
message_data["Label"] = xml_content.get("Label")
message_data["AppType"] = xml_content.get("AppType")
logger.info(
f"收到位置消息,位置: {message_data['Location_X']}, {message_data['Location_Y']}"
)
elif msg_type == "link":
message_data["Title"] = xml_content.get("Title")
message_data["Description"] = xml_content.get("Description")
message_data["Url"] = xml_content.get("Url")
message_data["PicUrl"] = xml_content.get("PicUrl")
logger.info(f"收到链接消息,标题: {message_data['Title']}")
else:
message_data = xml_content.get("Content")
logger.info(f"收到未知类型消息: {message_data}")
return {
"ToUserName": to_user_name,
"FromUserName": from_user_name,
"CreateTime": create_time,
"MsgType": msg_type,
"MsgId": msg_id,
"AgentID": agent_id,
**message_data,
}