import time from typing import Dict, Tuple, Union import xmltodict from fastapi import HTTPException, Request from uvicorn.server import logger from .wx_com import wxcpt async def get_request_params(request: Request) -> Tuple[bytes, str, str, str]: """获取请求参数并验证""" body = await request.body() msg_signature = request.query_params.get("msg_signature") timestamp = request.query_params.get("timestamp") nonce = request.query_params.get("nonce") if not all([msg_signature, timestamp, nonce]): raise HTTPException(status_code=400, detail="缺少必要的参数") # logger.info( # f"收到消息推送: msg_signature={msg_signature}, timestamp={timestamp}, nonce={nonce}" # ) return body, msg_signature, timestamp, nonce def decrypt_message(body: bytes, msg_signature: str, timestamp: str, nonce: str) -> dict: """解密消息""" ret, sMsg = wxcpt.DecryptMsg(body, msg_signature, timestamp, nonce) if ret != 0: logger.error(f"消息解密失败,错误码: {ret}") raise HTTPException(status_code=400, detail="消息解密失败") xml_dict = xmltodict.parse(sMsg) # logger.info(f"解密后的消息内容: {xml_dict}") return xml_dict def extract_message_content( xml_dict: Dict, ) -> Tuple[str, str, str, str, Union[Dict[str, Union[str, None]], str, None], str, str]: """提取消息内容,支持多种消息类型""" xml_content = xml_dict["xml"] to_user_name = xml_content.get("ToUserName") from_user_name = xml_content.get("FromUserName") create_time = xml_content.get("CreateTime") msg_type = xml_content.get("MsgType") msg_id = xml_content.get("MsgId") agent_id = xml_content.get("AgentID") message_data = {} if msg_type == "text": message_data["Content"] = xml_content.get("Content") logger.info(f"收到文本消息: {message_data['Content']}") elif msg_type == "image": message_data["PicUrl"] = xml_content.get("PicUrl") message_data["MediaId"] = xml_content.get("MediaId") logger.info(f"收到图片消息,媒体ID: {message_data['MediaId']}") elif msg_type == "voice": message_data["MediaId"] = xml_content.get("MediaId") message_data["Format"] = xml_content.get("Format") logger.info(f"收到语音消息,媒体ID: {message_data['MediaId']}") elif msg_type == "video": message_data["MediaId"] = xml_content.get("MediaId") message_data["ThumbMediaId"] = xml_content.get("ThumbMediaId") logger.info(f"收到视频消息,媒体ID: {message_data['MediaId']}") elif msg_type == "location": message_data["Location_X"] = xml_content.get("Location_X") message_data["Location_Y"] = xml_content.get("Location_Y") message_data["Scale"] = xml_content.get("Scale") message_data["Label"] = xml_content.get("Label") message_data["AppType"] = xml_content.get("AppType") logger.info( f"收到位置消息,位置: {message_data['Location_X']}, {message_data['Location_Y']}" ) elif msg_type == "link": message_data["Title"] = xml_content.get("Title") message_data["Description"] = xml_content.get("Description") message_data["Url"] = xml_content.get("Url") message_data["PicUrl"] = xml_content.get("PicUrl") logger.info(f"收到链接消息,标题: {message_data['Title']}") else: message_data = xml_content.get("Content") logger.info(f"收到未知类型消息: {message_data}") return { "ToUserName": to_user_name, "FromUserName": from_user_name, "CreateTime": create_time, "MsgType": msg_type, "MsgId": msg_id, "AgentID": agent_id, **message_data, }