feat(wechat): 添加微信JS-SDK签名生成功能

实现微信JS-SDK和AgentConfig的签名生成功能,包括:
1. 新增签名请求和响应模型
2. 添加获取jsapi_ticket和生成签名的服务方法
3. 实现两个签名生成接口
4. 添加票据缓存机制提升性能
This commit is contained in:
2025-12-15 21:22:26 +08:00
parent b4bc89902d
commit 58b1a5433f
4 changed files with 184 additions and 1 deletions

View File

@@ -0,0 +1,2 @@
from .response import ResponseModel, ListResponseModel, TokenResponseModel, success_response, error_response
from .signature import SignatureRequest, SignatureResponse

26
src/models/signature.py Normal file
View File

@@ -0,0 +1,26 @@
from pydantic import BaseModel
from typing import Optional
class SignatureRequest(BaseModel):
"""
签名请求模型
Attributes:
url (str): 当前页面的完整URL不包含#及其后面部分
"""
url: str
class SignatureResponse(BaseModel):
"""
签名响应模型
Attributes:
timestamp (str): 时间戳
nonceStr (str): 随机字符串
signature (str): 签名
"""
timestamp: str
nonceStr: str
signature: str

View File

@@ -1,6 +1,7 @@
from fastapi import APIRouter, HTTPException, Depends from fastapi import APIRouter, HTTPException, Depends
from typing import Optional, Dict from typing import Optional, Dict
import os import os
import time
from src.services.wechat import ( from src.services.wechat import (
get_wechat_access_token, get_wechat_access_token,
get_userid_by_mobile, get_userid_by_mobile,
@@ -9,9 +10,14 @@ from src.services.wechat import (
get_department_list, get_department_list,
get_user_detail, get_user_detail,
get_customer_list, get_customer_list,
get_external_contact_detail get_external_contact_detail,
get_jsapi_ticket,
generate_signature,
generate_nonce_str,
get_corp_access_token
) )
from src.models.response import success_response, error_response from src.models.response import success_response, error_response
from src.models.signature import SignatureRequest, SignatureResponse
from src.utils.auth import create_access_token from src.utils.auth import create_access_token
# 创建路由实例 # 创建路由实例
@@ -362,3 +368,79 @@ async def api_get_customer_detail(external_userid: str, cursor: str = ""):
code=500 code=500
) )
@wechat_router.post("/config-signature", summary="生成JS-SDK权限签名")
async def generate_config_signature(request: SignatureRequest):
"""
生成JS-SDK使用权限签名
- **url**: 当前页面的完整URL不包含#及其后面部分
"""
try:
# 获取access_token
access_token = get_corp_access_token()
# 获取jsapi_ticket
jsapi_ticket = get_jsapi_ticket(access_token, 'jsapi')
# 生成签名参数
timestamp = str(int(time.time()))
nonce_str = generate_nonce_str()
# 生成签名
signature = generate_signature(jsapi_ticket, nonce_str, timestamp, request.url)
# 返回签名信息
return success_response(
data={
"timestamp": timestamp,
"nonceStr": nonce_str,
"signature": signature
},
message="生成签名成功",
code=200
)
except Exception as e:
return error_response(
message=f"生成签名失败: {str(e)}",
code=500
)
@wechat_router.post("/agent-config-signature", summary="生成AgentConfig权限签名")
async def generate_agent_config_signature(request: SignatureRequest):
"""
生成AgentConfig使用权限签名
- **url**: 当前页面的完整URL不包含#及其后面部分
"""
try:
# 获取access_token
access_token = get_corp_access_token()
# 获取agent_config_ticket
agent_config_ticket = get_jsapi_ticket(access_token, 'agent_config')
# 生成签名参数
timestamp = str(int(time.time()))
nonce_str = generate_nonce_str()
# 生成签名
signature = generate_signature(agent_config_ticket, nonce_str, timestamp, request.url)
# 返回签名信息
return success_response(
data={
"timestamp": timestamp,
"nonceStr": nonce_str,
"signature": signature
},
message="生成AgentConfig签名成功",
code=200
)
except Exception as e:
return error_response(
message=f"生成AgentConfig签名失败: {str(e)}",
code=500
)

View File

@@ -1,7 +1,10 @@
import os import os
import requests import requests
import time import time
import hashlib
import random
from typing import Dict, Optional, Any from typing import Dict, Optional, Any
from datetime import datetime, timedelta
# 企业微信 API 接口地址 # 企业微信 API 接口地址
# 获取 access_token 接口 # 获取 access_token 接口
ACCESS_URL = "http://146.56.202.222:12345/proxy/https://qyapi.weixin.qq.com/cgi-bin/gettoken" ACCESS_URL = "http://146.56.202.222:12345/proxy/https://qyapi.weixin.qq.com/cgi-bin/gettoken"
@@ -117,6 +120,74 @@ def get_userid_by_mobile(mobile: str) -> str:
return data["userid"] return data["userid"]
# JSAPI Ticket缓存
access_token_cache = {
'jsapi_ticket': None,
'jsapi_expires_at': None,
'agent_config_ticket': None,
'agent_config_expires_at': None
}
def get_corp_access_token():
"""获取企业的access_token"""
return get_wechat_access_token()
def get_agent_access_token():
"""获取应用的access_token如果需要的话"""
# 这里可以根据实际需求实现获取应用access_token的逻辑
# 如果应用和企业使用相同的access_token则可以直接返回企业access_token
return get_wechat_access_token()
def generate_nonce_str():
"""生成随机字符串"""
return ''.join(random.choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for _ in range(16))
def get_jsapi_ticket(access_token, ticket_type='jsapi'):
"""获取jsapi_ticket带缓存"""
now = datetime.now()
# 检查缓存
if ticket_type == 'agent_config':
cache_key, expires_key = 'agent_config_ticket', 'agent_config_expires_at'
url = f'http://146.56.202.222:12345/proxy/https://qyapi.weixin.qq.com/cgi-bin/ticket/get?access_token={access_token}&type=agent_config'
else:
cache_key, expires_key = 'jsapi_ticket', 'jsapi_expires_at'
url = f'http://146.56.202.222:12345/proxy/https://qyapi.weixin.qq.com/cgi-bin/get_jsapi_ticket?access_token={access_token}'
# 检查缓存是否有效
if (access_token_cache[cache_key] and
access_token_cache[expires_key] and
now < access_token_cache[expires_key]):
print(f"使用缓存的{ticket_type}票据")
return access_token_cache[cache_key]
# 获取新的ticket
with requests.Session() as session:
response = session.get(url, timeout=10)
data = response.json()
print(f"获取{ticket_type}票据响应:", data)
if data.get('errcode') == 0:
# 缓存ticket设置过期时间为7200秒减去300秒提前5分钟过期
access_token_cache[cache_key] = data['ticket']
access_token_cache[expires_key] = now + timedelta(seconds=7200 - 300)
return data['ticket']
else:
raise Exception(f"获取{ticket_type}票据失败: {data}")
def generate_signature(ticket, nonce_str, timestamp, url):
"""生成签名"""
# 按字典序排序
params = {'jsapi_ticket': ticket, 'noncestr': nonce_str, 'timestamp': timestamp, 'url': url}
# 拼接字符串
string1 = '&'.join([f'{k}={v}' for k, v in sorted(params.items())])
# SHA1加密
return hashlib.sha1(string1.encode('utf-8')).hexdigest()
def send_textcard_message( def send_textcard_message(
touser: str, touser: str,
@@ -360,3 +431,5 @@ def get_external_contact_detail(external_userid: str, cursor: str = "") -> Dict:
raise RuntimeError(f"获取外部联系人详情失败: {data.get('errmsg')}") raise RuntimeError(f"获取外部联系人详情失败: {data.get('errmsg')}")
return data return data