feat(light): 添加Redis缓存支持和SSE实时数据推送功能

- 在requirements.txt中添加redis依赖并更新requests版本
- 在.env.example中添加Redis相关配置
- 重构light服务层,使用Redis替代内存缓存,增加线程安全机制
- 新增SSE接口,每15秒推送一次用户灯牌数据更新
- 优化日志配置和错误处理机制
This commit is contained in:
2025-12-16 15:37:40 +08:00
parent 3b37e13833
commit 953da74b29
4 changed files with 259 additions and 89 deletions

View File

@@ -8,4 +8,13 @@ API_PORT=8000
# Security # Security
SECRET_KEY=your-secret-key-here SECRET_KEY=your-secret-key-here
ALGORITHM=HS256 ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30 ACCESS_TOKEN_EXPIRE_MINUTES=30
# Redis configuration
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
REDIS_PASSWORD=
# Redis cache configuration
REDIS_CACHE_DURATION_MINUTES=10

View File

@@ -4,9 +4,10 @@ sqlalchemy==2.0.23
python-dotenv==1.0.0 python-dotenv==1.0.0
pydantic==2.5.0 pydantic==2.5.0
pydantic-settings==2.1.0 pydantic-settings==2.1.0
requests==2.31.0 requests==2.3.1.0
PyJWT==2.8.0 PyJWT==2.8.0
starlette==0.27.0 starlette==0.27.0
anyio==3.7.1 anyio==3.7.1
pymongo==4.15.5 pymongo==4.15.5
motor==3.7.1 motor==3.7.1
redis==7.1.0

View File

@@ -1,6 +1,10 @@
from fastapi import APIRouter, HTTPException, Query import asyncio
from typing import List, Optional import json
from src.services.light import fetch_all_lightpanel_scripts, get_lightpanel_script_by_code, get_answers_by_user_id from fastapi import APIRouter, HTTPException, Query, Request
from fastapi.responses import StreamingResponse
from fastapi.encoders import jsonable_encoder
from typing import List, Optional, Dict, Any
from src.services.light import fetch_all_lightpanel_scripts, get_lightpanel_script_by_code, get_answers_by_user_id, get_user_scripts_with_answers as service_get_user_scripts_with_answers
from src.models.response import success_response, error_response, ResponseModel from src.models.response import success_response, error_response, ResponseModel
from typing import Dict, Any from typing import Dict, Any
@@ -24,22 +28,22 @@ async def get_all_lightpanel_scripts():
@light_router.get("/script/{code}", response_model=ResponseModel[Dict[str, Any]]) @light_router.get("/script/{code}", response_model=ResponseModel[Dict[str, Any]])
async def get_lightpanel_script(code: str): async def get_lightpanel_script(code: str):
""" """
根据code获取特定的LightPanel脚本 根据code获取特定的LightPanel灯牌
Args: Args:
code (str): 信息类型 code (str): 信息类型
Returns: Returns:
ResponseModel: 包含特定脚本数据的统一响应格式 ResponseModel: 包含特定灯牌数据的统一响应格式
""" """
try: try:
script = get_lightpanel_script_by_code(code) script = get_lightpanel_script_by_code(code)
if script: if script:
return success_response(data=script, message=f"成功获取code为{code}的LightPanel脚本") return success_response(data=script, message=f"成功获取code为{code}的LightPanel灯牌")
else: else:
return error_response(message=f"未找到code为{code}的LightPanel脚本", code=404) return error_response(message=f"未找到code为{code}的LightPanel灯牌", code=404)
except Exception as e: except Exception as e:
return error_response(message=f"获取LightPanel脚本失败: {str(e)}", code=500) return error_response(message=f"获取LightPanel灯牌失败: {str(e)}", code=500)
@light_router.get("/answers/{user_id}", response_model=ResponseModel[Dict[str, Any]]) @light_router.get("/answers/{user_id}", response_model=ResponseModel[Dict[str, Any]])
async def get_answers_by_user(user_id: str): async def get_answers_by_user(user_id: str):
@@ -66,7 +70,7 @@ async def get_answers_by_user(user_id: str):
@light_router.get("/user-scripts/{user_id}", response_model=ResponseModel[List[Dict[str, Any]]]) @light_router.get("/user-scripts/{user_id}", response_model=ResponseModel[List[Dict[str, Any]]])
async def get_user_scripts_with_answers(user_id: str): async def get_user_scripts_with_answers(user_id: str):
""" """
获取所有LightPanel脚本数据,并根据用户的回答数据填充每个指标的值 获取所有LightPanel灯牌数据,并根据用户的回答数据填充每个指标的值
Args: Args:
user_id (str): 用户的customer_wechat_id user_id (str): 用户的customer_wechat_id
@@ -75,39 +79,48 @@ async def get_user_scripts_with_answers(user_id: str):
ResponseModel: 包含所有脚本数据及用户回答的统一响应格式 ResponseModel: 包含所有脚本数据及用户回答的统一响应格式
""" """
try: try:
# 获取所有脚本数据 # 使用服务层的公共方法获取用户脚本和答案数据
scripts = fetch_all_lightpanel_scripts() result = service_get_user_scripts_with_answers(user_id)
return success_response(data=result, message=f"成功获取用户{user_id}的灯牌数据")
# 获取用户回答数据
user_answers = get_answers_by_user_id(user_id)
# 如果没有用户回答数据,初始化为空字典
if user_answers is None:
user_answers = {}
# 构建结果数据
result = []
for script in scripts:
# 复制脚本数据但排除_id字段
script_data = {key: value for key, value in script.items() if key != "_id"}
# 获取该脚本的code
code = script_data.get("code")
# 如果用户有该脚本的回答数据,则填充;否则设置为空字符串
if code in user_answers and user_answers[code]:
answer_data = user_answers[code]
script_data["value"] = answer_data.get("value", "")
script_data["collected_at"] = answer_data.get("collected_at", "")
script_data["source"] = answer_data.get("source", "")
else:
# 如果没有对应的回答数据,设置为空字符串
script_data["value"] = ""
script_data["collected_at"] = ""
script_data["source"] = ""
result.append(script_data)
return success_response(data=result, message=f"成功获取用户{user_id}的脚本数据")
except Exception as e: except Exception as e:
return error_response(message=f"获取用户脚本数据失败: {str(e)}", code=500) return error_response(message=f"获取用户灯牌数据失败: {str(e)}", code=500)
@light_router.get("/user-scripts/stream/{user_id}")
async def stream_user_scripts(user_id: str, request: Request):
"""
SSE接口每15秒推送一次最新的用户脚本数据。
数据结构与 /user-scripts/{user_id} 完全一致。
"""
async def event_generator():
while True:
# 1. 检查客户端是否断开连接,如果断开则停止循环
if await request.is_disconnected():
print(f"用户 {user_id} 已断开连接,停止数据推送")
break
try:
# 2. 调用公共逻辑获取最新数据
result_data = service_get_user_scripts_with_answers(user_id)
# 3. 封装成统一的响应格式 (ResponseModel)
# 注意SSE传输的是字符串所以我们需要手动构建响应对象并转为JSON
response_obj = success_response(data=result_data, message=f"实时更新: 用户{user_id}的脚本数据")
# 4. 序列化为JSON字符串
# jsonable_encoder 可以处理 Pydantic 模型、datetime 等特殊类型
json_data = json.dumps(jsonable_encoder(response_obj), ensure_ascii=False)
# 5. 发送 SSE 数据格式 (格式必须是 "data: <内容>\n\n")
yield f"data: {json_data}\n\n"
except Exception as e:
# 发生错误时,也可以推送错误信息给前端,或者选择跳过本次推送
error_obj = error_response(message=f"获取数据流失败: {str(e)}", code=500)
json_error = json.dumps(jsonable_encoder(error_obj), ensure_ascii=False)
yield f"data: {json_error}\n\n"
# 6. 等待 15 秒
await asyncio.sleep(15)
return StreamingResponse(event_generator(), media_type="text/event-stream")

View File

@@ -1,5 +1,8 @@
import os import os
import logging import logging
import json
import redis
import threading
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta from datetime import datetime, timedelta
from dotenv import load_dotenv from dotenv import load_dotenv
@@ -10,11 +13,47 @@ load_dotenv()
# 配置日志 # 配置日志
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# 全局变量用于缓存数据和过期时间 if not logger.handlers:
_cached_lightpanel_scripts: Optional[List[Dict[str, Any]]] = None handler = logging.StreamHandler()
_cache_expiration_time: Optional[datetime] = None formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
_CACHE_DURATION_MINUTES = 10 # 缓存10分钟 handler.setFormatter(formatter)
logger.addHandler(handler)
logger.propagate = False # 防止重复记录
# Redis配置
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_DB = int(os.getenv("REDIS_DB", 0))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", None)
# Redis缓存配置
_CACHE_DURATION_MINUTES = int(os.getenv("REDIS_CACHE_DURATION_MINUTES", 720)) # 默认缓存10分钟
# 创建Redis连接
try:
redis_client = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
db=REDIS_DB,
password=REDIS_PASSWORD,
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5,
retry_on_timeout=True
)
# 测试连接
redis_client.ping()
logger.info("成功连接到Redis服务器")
except Exception as e:
logger.error(f"无法连接到Redis服务器: {str(e)}")
redis_client = None
# 缓存配置
CACHE_KEY = "lightpanel_scripts"
_CACHE_LOCK = threading.Lock() # 用于防止缓存击穿的锁
_REFRESHING = False # 标记是否正在刷新缓存
def get_lightpanel_scripts_collection(): def get_lightpanel_scripts_collection():
""" """
@@ -30,32 +69,102 @@ def get_lightpanel_scripts_collection():
def is_cache_valid() -> bool: def is_cache_valid() -> bool:
""" """
检查缓存是否有效 检查Redis缓存是否有效
Returns: Returns:
bool: 缓存有效返回True否则返回False bool: 缓存有效返回True否则返回False
""" """
global _cached_lightpanel_scripts, _cache_expiration_time global redis_client, CACHE_KEY
if _cached_lightpanel_scripts is None or _cache_expiration_time is None: if redis_client is None:
return False return False
return datetime.now() < _cache_expiration_time try:
# 检查缓存键是否存在
return redis_client.exists(CACHE_KEY) > 0
except Exception as e:
logger.error(f"检查Redis缓存有效性时出错: {str(e)}")
return False
def fetch_all_lightpanel_scripts() -> List[Dict[str, Any]]: def fetch_all_lightpanel_scripts() -> List[Dict[str, Any]]:
""" """
从MongoDB获取所有lightpanel_scripts数据带缓存机制 从MongoDB获取所有lightpanel_scripts数据Redis缓存机制
Returns: Returns:
List[Dict[str, Any]]: lightpanel_scripts集合中的所有数据 List[Dict[str, Any]]: lightpanel_scripts集合中的所有数据
""" """
global _cached_lightpanel_scripts, _cache_expiration_time global redis_client, CACHE_KEY, _CACHE_DURATION_MINUTES, _CACHE_LOCK, _REFRESHING
# 如果Redis不可用直接从数据库获取
if redis_client is None:
return _fetch_from_database()
# 检查缓存是否有效 # 检查缓存是否有效
if is_cache_valid(): if is_cache_valid():
logger.info("从缓存中获取lightpanel_scripts数据") try:
return _cached_lightpanel_scripts # 从Redis缓存中获取数据
cached_data = redis_client.get(CACHE_KEY)
if cached_data:
logger.info("从Redis缓存中获取lightpanel_scripts数据")
return json.loads(cached_data)
except Exception as e:
logger.error(f"从Redis缓存中获取数据时出错: {str(e)}")
# 使用锁防止缓存击穿
with _CACHE_LOCK:
# 双重检查缓存,可能在等待锁期间已经被其他线程更新
if is_cache_valid():
try:
cached_data = redis_client.get(CACHE_KEY)
if cached_data:
logger.info("从Redis缓存中获取lightpanel_scripts数据双重检查")
return json.loads(cached_data)
except Exception as e:
logger.error(f"从Redis缓存中获取数据时出错: {str(e)}")
# 防止多个线程同时刷新缓存
if _REFRESHING:
# 如果正在刷新,短暂等待后再次尝试从缓存获取
import time
time.sleep(0.1)
if is_cache_valid():
try:
cached_data = redis_client.get(CACHE_KEY)
if cached_data:
logger.info("从Redis缓存中获取lightpanel_scripts数据等待后")
return json.loads(cached_data)
except Exception as e:
logger.error(f"从Redis缓存中获取数据时出错: {str(e)}")
# 设置刷新标记
_REFRESHING = True
try:
# 从数据库获取数据
scripts = _fetch_from_database()
# 更新Redis缓存设置过期时间
try:
redis_client.setex(
CACHE_KEY,
timedelta(minutes=_CACHE_DURATION_MINUTES),
json.dumps(scripts, default=str)
)
logger.info(f"更新Redis缓存成功{len(scripts)}条数据")
except Exception as e:
logger.error(f"更新Redis缓存时出错: {str(e)}")
return scripts
finally:
# 清除刷新标记
_REFRESHING = False
def _fetch_from_database() -> List[Dict[str, Any]]:
"""
从MongoDB获取所有lightpanel_scripts数据私有函数
Returns:
List[Dict[str, Any]]: lightpanel_scripts集合中的所有数据
"""
try: try:
# 获取集合 # 获取集合
collection = get_lightpanel_scripts_collection() collection = get_lightpanel_scripts_collection()
@@ -68,46 +177,38 @@ def fetch_all_lightpanel_scripts() -> List[Dict[str, Any]]:
doc["_id"] = str(doc["_id"]) doc["_id"] = str(doc["_id"])
scripts.append(doc) scripts.append(doc)
# 更新缓存 logger.info(f"从数据库获取到{len(scripts)}条lightpanel_scripts数据")
_cached_lightpanel_scripts = scripts
_cache_expiration_time = datetime.now() + timedelta(minutes=_CACHE_DURATION_MINUTES)
logger.info(f"从数据库获取到{len(scripts)}条lightpanel_scripts数据并更新缓存")
return scripts return scripts
except Exception as e: except Exception as e:
logger.error(f"获取lightpanel_scripts数据时出错: {str(e)}") logger.error(f"从数据库获取lightpanel_scripts数据时出错: {str(e)}")
# 如果获取数据失败但有缓存,返回缓存数据 raise e
if _cached_lightpanel_scripts is not None:
logger.warning("返回旧的缓存数据")
return _cached_lightpanel_scripts
else:
# 没有缓存数据,抛出异常
raise e
def refresh_lightpanel_scripts_cache() -> None: def refresh_lightpanel_scripts_cache() -> None:
""" """
手动刷新lightpanel_scripts缓存 手动刷新lightpanel_scripts缓存
""" """
global _cached_lightpanel_scripts, _cache_expiration_time global redis_client, CACHE_KEY, _CACHE_DURATION_MINUTES
try: try:
# 获取集合 # 从数据库获取数据
collection = get_lightpanel_scripts_collection() scripts = _fetch_from_database()
# 从数据库获取所有数据 # 如果Redis可用更新缓存
cursor = collection.find({}) if redis_client is not None:
scripts = [] try:
for doc in cursor: redis_client.setex(
# 转换ObjectId为字符串 CACHE_KEY,
doc["_id"] = str(doc["_id"]) timedelta(minutes=_CACHE_DURATION_MINUTES),
scripts.append(doc) json.dumps(scripts, default=str)
)
# 更新缓存 logger.info(f"手动刷新Redis缓存成功{len(scripts)}条数据")
_cached_lightpanel_scripts = scripts except Exception as e:
_cache_expiration_time = datetime.now() + timedelta(minutes=_CACHE_DURATION_MINUTES) logger.error(f"手动刷新Redis缓存时出错: {str(e)}")
raise e
logger.info(f"手动刷新缓存成功,共{len(scripts)}条数据") else:
logger.warning("Redis不可用仅从数据库获取数据")
except Exception as e: except Exception as e:
logger.error(f"手动刷新lightpanel_scripts缓存时出错: {str(e)}") logger.error(f"手动刷新lightpanel_scripts缓存时出错: {str(e)}")
@@ -163,4 +264,50 @@ def get_answers_by_user_id(user_id: str) -> Optional[Dict[str, Any]]:
return {} return {}
except Exception as e: except Exception as e:
logger.error(f"根据user_id({user_id})查询answers数据时出错: {str(e)}") logger.error(f"根据user_id({user_id})查询answers数据时出错: {str(e)}")
raise e raise e
def get_user_scripts_with_answers(user_id: str) -> List[Dict[str, Any]]:
"""
获取所有LightPanel灯牌数据并根据用户的回答数据填充每个指标的值
Args:
user_id (str): 用户的customer_wechat_id
Returns:
List[Dict[str, Any]]: 包含所有脚本数据及用户回答的列表
"""
# 获取所有灯牌数据
scripts = fetch_all_lightpanel_scripts()
# 获取用户回答数据
user_answers = get_answers_by_user_id(user_id)
# 如果没有用户回答数据,初始化为空字典
if user_answers is None:
user_answers = {}
# 构建结果数据
result = []
for script in scripts:
# 复制脚本数据但排除_id字段
script_data = {key: value for key, value in script.items() if key != "_id"}
# 获取该灯牌的code
code = script_data.get("code")
# 如果用户有该灯牌的回答数据,则填充;否则设置为空字符串
if code in user_answers and user_answers[code]:
answer_data = user_answers[code]
script_data["value"] = answer_data.get("value", "")
script_data["collected_at"] = answer_data.get("collected_at", "")
script_data["source"] = answer_data.get("source", "")
else:
# 如果没有对应的回答数据,设置为空字符串
script_data["value"] = ""
script_data["collected_at"] = ""
script_data["source"] = ""
result.append(script_data)
return result