diff --git a/src/models/__init__.py b/src/models/__init__.py index e69de29..b55e86b 100644 --- a/src/models/__init__.py +++ b/src/models/__init__.py @@ -0,0 +1,2 @@ +from .response import ResponseModel, ListResponseModel, TokenResponseModel, success_response, error_response +from .signature import SignatureRequest, SignatureResponse \ No newline at end of file diff --git a/src/models/signature.py b/src/models/signature.py new file mode 100644 index 0000000..5a1459d --- /dev/null +++ b/src/models/signature.py @@ -0,0 +1,26 @@ +from pydantic import BaseModel +from typing import Optional + + +class SignatureRequest(BaseModel): + """ + 签名请求模型 + + Attributes: + url (str): 当前页面的完整URL,不包含#及其后面部分 + """ + url: str + + +class SignatureResponse(BaseModel): + """ + 签名响应模型 + + Attributes: + timestamp (str): 时间戳 + nonceStr (str): 随机字符串 + signature (str): 签名 + """ + timestamp: str + nonceStr: str + signature: str \ No newline at end of file diff --git a/src/routers/wechat.py b/src/routers/wechat.py index 706e584..4423976 100644 --- a/src/routers/wechat.py +++ b/src/routers/wechat.py @@ -1,6 +1,7 @@ from fastapi import APIRouter, HTTPException, Depends from typing import Optional, Dict import os +import time from src.services.wechat import ( get_wechat_access_token, get_userid_by_mobile, @@ -9,9 +10,14 @@ from src.services.wechat import ( get_department_list, get_user_detail, get_customer_list, - get_external_contact_detail + get_external_contact_detail, + get_jsapi_ticket, + generate_signature, + generate_nonce_str, + get_corp_access_token ) from src.models.response import success_response, error_response +from src.models.signature import SignatureRequest, SignatureResponse from src.utils.auth import create_access_token # 创建路由实例 @@ -362,3 +368,79 @@ async def api_get_customer_detail(external_userid: str, cursor: str = ""): code=500 ) + +@wechat_router.post("/config-signature", summary="生成JS-SDK权限签名") +async def generate_config_signature(request: SignatureRequest): + """ + 生成JS-SDK使用权限签名 + + - **url**: 当前页面的完整URL,不包含#及其后面部分 + """ + try: + # 获取access_token + access_token = get_corp_access_token() + + # 获取jsapi_ticket + jsapi_ticket = get_jsapi_ticket(access_token, 'jsapi') + + # 生成签名参数 + timestamp = str(int(time.time())) + nonce_str = generate_nonce_str() + + # 生成签名 + signature = generate_signature(jsapi_ticket, nonce_str, timestamp, request.url) + + # 返回签名信息 + return success_response( + data={ + "timestamp": timestamp, + "nonceStr": nonce_str, + "signature": signature + }, + message="生成签名成功", + code=200 + ) + except Exception as e: + return error_response( + message=f"生成签名失败: {str(e)}", + code=500 + ) + + +@wechat_router.post("/agent-config-signature", summary="生成AgentConfig权限签名") +async def generate_agent_config_signature(request: SignatureRequest): + """ + 生成AgentConfig使用权限签名 + + - **url**: 当前页面的完整URL,不包含#及其后面部分 + """ + try: + # 获取access_token + access_token = get_corp_access_token() + + # 获取agent_config_ticket + agent_config_ticket = get_jsapi_ticket(access_token, 'agent_config') + + # 生成签名参数 + timestamp = str(int(time.time())) + nonce_str = generate_nonce_str() + + # 生成签名 + signature = generate_signature(agent_config_ticket, nonce_str, timestamp, request.url) + + # 返回签名信息 + return success_response( + data={ + "timestamp": timestamp, + "nonceStr": nonce_str, + "signature": signature + }, + message="生成AgentConfig签名成功", + code=200 + ) + except Exception as e: + return error_response( + message=f"生成AgentConfig签名失败: {str(e)}", + code=500 + ) + diff --git a/src/services/wechat.py b/src/services/wechat.py index 915bbc6..290ce95 100644 --- a/src/services/wechat.py +++ b/src/services/wechat.py @@ -1,7 +1,10 @@ import os import requests import time +import hashlib +import random from typing import Dict, Optional, Any +from datetime import datetime, timedelta # 企业微信 API 接口地址 # 获取 access_token 接口 ACCESS_URL = "http://146.56.202.222:12345/proxy/https://qyapi.weixin.qq.com/cgi-bin/gettoken" @@ -117,6 +120,74 @@ def get_userid_by_mobile(mobile: str) -> str: return data["userid"] +# JSAPI Ticket缓存 +access_token_cache = { + 'jsapi_ticket': None, + 'jsapi_expires_at': None, + 'agent_config_ticket': None, + 'agent_config_expires_at': None +} + + +def get_corp_access_token(): + """获取企业的access_token""" + return get_wechat_access_token() + + +def get_agent_access_token(): + """获取应用的access_token(如果需要的话)""" + # 这里可以根据实际需求实现获取应用access_token的逻辑 + # 如果应用和企业使用相同的access_token,则可以直接返回企业access_token + return get_wechat_access_token() + + +def generate_nonce_str(): + """生成随机字符串""" + return ''.join(random.choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for _ in range(16)) + + +def get_jsapi_ticket(access_token, ticket_type='jsapi'): + """获取jsapi_ticket(带缓存)""" + now = datetime.now() + # 检查缓存 + if ticket_type == 'agent_config': + cache_key, expires_key = 'agent_config_ticket', 'agent_config_expires_at' + url = f'http://146.56.202.222:12345/proxy/https://qyapi.weixin.qq.com/cgi-bin/ticket/get?access_token={access_token}&type=agent_config' + else: + cache_key, expires_key = 'jsapi_ticket', 'jsapi_expires_at' + url = f'http://146.56.202.222:12345/proxy/https://qyapi.weixin.qq.com/cgi-bin/get_jsapi_ticket?access_token={access_token}' + + # 检查缓存是否有效 + if (access_token_cache[cache_key] and + access_token_cache[expires_key] and + now < access_token_cache[expires_key]): + print(f"使用缓存的{ticket_type}票据") + return access_token_cache[cache_key] + + # 获取新的ticket + with requests.Session() as session: + response = session.get(url, timeout=10) + data = response.json() + print(f"获取{ticket_type}票据响应:", data) + if data.get('errcode') == 0: + # 缓存ticket,设置过期时间为7200秒减去300秒(提前5分钟过期) + access_token_cache[cache_key] = data['ticket'] + access_token_cache[expires_key] = now + timedelta(seconds=7200 - 300) + return data['ticket'] + else: + raise Exception(f"获取{ticket_type}票据失败: {data}") + + +def generate_signature(ticket, nonce_str, timestamp, url): + """生成签名""" + # 按字典序排序 + params = {'jsapi_ticket': ticket, 'noncestr': nonce_str, 'timestamp': timestamp, 'url': url} + # 拼接字符串 + string1 = '&'.join([f'{k}={v}' for k, v in sorted(params.items())]) + # SHA1加密 + return hashlib.sha1(string1.encode('utf-8')).hexdigest() + + def send_textcard_message( touser: str, @@ -360,3 +431,5 @@ def get_external_contact_detail(external_userid: str, cursor: str = "") -> Dict: raise RuntimeError(f"获取外部联系人详情失败: {data.get('errmsg')}") return data + +