diff --git a/.env.example b/.env.example index ce9074f..a780792 100644 --- a/.env.example +++ b/.env.example @@ -8,4 +8,13 @@ API_PORT=8000 # Security SECRET_KEY=your-secret-key-here ALGORITHM=HS256 -ACCESS_TOKEN_EXPIRE_MINUTES=30 \ No newline at end of file +ACCESS_TOKEN_EXPIRE_MINUTES=30 + +# Redis configuration +REDIS_HOST=localhost +REDIS_PORT=6379 +REDIS_DB=0 +REDIS_PASSWORD= + +# Redis cache configuration +REDIS_CACHE_DURATION_MINUTES=10 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 30a3999..5e65f03 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,9 +4,10 @@ sqlalchemy==2.0.23 python-dotenv==1.0.0 pydantic==2.5.0 pydantic-settings==2.1.0 -requests==2.31.0 +requests==2.3.1.0 PyJWT==2.8.0 starlette==0.27.0 anyio==3.7.1 pymongo==4.15.5 -motor==3.7.1 \ No newline at end of file +motor==3.7.1 +redis==7.1.0 \ No newline at end of file diff --git a/src/routers/light.py b/src/routers/light.py index d0ef893..8579c25 100644 --- a/src/routers/light.py +++ b/src/routers/light.py @@ -1,6 +1,10 @@ -from fastapi import APIRouter, HTTPException, Query -from typing import List, Optional -from src.services.light import fetch_all_lightpanel_scripts, get_lightpanel_script_by_code, get_answers_by_user_id +import asyncio +import json +from fastapi import APIRouter, HTTPException, Query, Request +from fastapi.responses import StreamingResponse +from fastapi.encoders import jsonable_encoder +from typing import List, Optional, Dict, Any +from src.services.light import fetch_all_lightpanel_scripts, get_lightpanel_script_by_code, get_answers_by_user_id, get_user_scripts_with_answers as service_get_user_scripts_with_answers from src.models.response import success_response, error_response, ResponseModel from typing import Dict, Any @@ -24,22 +28,22 @@ async def get_all_lightpanel_scripts(): @light_router.get("/script/{code}", response_model=ResponseModel[Dict[str, Any]]) async def get_lightpanel_script(code: str): """ - 根据code获取特定的LightPanel脚本 + 根据code获取特定的LightPanel灯牌 Args: code (str): 信息类型 Returns: - ResponseModel: 包含特定脚本数据的统一响应格式 + ResponseModel: 包含特定灯牌数据的统一响应格式 """ try: script = get_lightpanel_script_by_code(code) if script: - return success_response(data=script, message=f"成功获取code为{code}的LightPanel脚本") + return success_response(data=script, message=f"成功获取code为{code}的LightPanel灯牌") else: - return error_response(message=f"未找到code为{code}的LightPanel脚本", code=404) + return error_response(message=f"未找到code为{code}的LightPanel灯牌", code=404) except Exception as e: - return error_response(message=f"获取LightPanel脚本失败: {str(e)}", code=500) + return error_response(message=f"获取LightPanel灯牌失败: {str(e)}", code=500) @light_router.get("/answers/{user_id}", response_model=ResponseModel[Dict[str, Any]]) async def get_answers_by_user(user_id: str): @@ -66,7 +70,7 @@ async def get_answers_by_user(user_id: str): @light_router.get("/user-scripts/{user_id}", response_model=ResponseModel[List[Dict[str, Any]]]) async def get_user_scripts_with_answers(user_id: str): """ - 获取所有LightPanel脚本数据,并根据用户的回答数据填充每个指标的值 + 获取所有LightPanel灯牌数据,并根据用户的回答数据填充每个指标的值 Args: user_id (str): 用户的customer_wechat_id @@ -75,39 +79,48 @@ async def get_user_scripts_with_answers(user_id: str): ResponseModel: 包含所有脚本数据及用户回答的统一响应格式 """ try: - # 获取所有脚本数据 - scripts = fetch_all_lightpanel_scripts() - - # 获取用户回答数据 - user_answers = get_answers_by_user_id(user_id) - - # 如果没有用户回答数据,初始化为空字典 - if user_answers is None: - user_answers = {} - - # 构建结果数据 - result = [] - for script in scripts: - # 复制脚本数据,但排除_id字段 - script_data = {key: value for key, value in script.items() if key != "_id"} - - # 获取该脚本的code - code = script_data.get("code") - - # 如果用户有该脚本的回答数据,则填充;否则设置为空字符串 - if code in user_answers and user_answers[code]: - answer_data = user_answers[code] - script_data["value"] = answer_data.get("value", "") - script_data["collected_at"] = answer_data.get("collected_at", "") - script_data["source"] = answer_data.get("source", "") - else: - # 如果没有对应的回答数据,设置为空字符串 - script_data["value"] = "" - script_data["collected_at"] = "" - script_data["source"] = "" - - result.append(script_data) - - return success_response(data=result, message=f"成功获取用户{user_id}的脚本数据") + # 使用服务层的公共方法获取用户脚本和答案数据 + result = service_get_user_scripts_with_answers(user_id) + return success_response(data=result, message=f"成功获取用户{user_id}的灯牌数据") except Exception as e: - return error_response(message=f"获取用户脚本数据失败: {str(e)}", code=500) + return error_response(message=f"获取用户灯牌数据失败: {str(e)}", code=500) + + +@light_router.get("/user-scripts/stream/{user_id}") +async def stream_user_scripts(user_id: str, request: Request): + """ + SSE接口:每15秒推送一次最新的用户脚本数据。 + 数据结构与 /user-scripts/{user_id} 完全一致。 + """ + async def event_generator(): + while True: + # 1. 检查客户端是否断开连接,如果断开则停止循环 + if await request.is_disconnected(): + print(f"用户 {user_id} 已断开连接,停止数据推送") + break + + try: + # 2. 调用公共逻辑获取最新数据 + result_data = service_get_user_scripts_with_answers(user_id) + + # 3. 封装成统一的响应格式 (ResponseModel) + # 注意:SSE传输的是字符串,所以我们需要手动构建响应对象并转为JSON + response_obj = success_response(data=result_data, message=f"实时更新: 用户{user_id}的脚本数据") + + # 4. 序列化为JSON字符串 + # jsonable_encoder 可以处理 Pydantic 模型、datetime 等特殊类型 + json_data = json.dumps(jsonable_encoder(response_obj), ensure_ascii=False) + + # 5. 发送 SSE 数据格式 (格式必须是 "data: <内容>\n\n") + yield f"data: {json_data}\n\n" + + except Exception as e: + # 发生错误时,也可以推送错误信息给前端,或者选择跳过本次推送 + error_obj = error_response(message=f"获取数据流失败: {str(e)}", code=500) + json_error = json.dumps(jsonable_encoder(error_obj), ensure_ascii=False) + yield f"data: {json_error}\n\n" + + # 6. 等待 15 秒 + await asyncio.sleep(15) + + return StreamingResponse(event_generator(), media_type="text/event-stream") diff --git a/src/services/light.py b/src/services/light.py index 99d69c6..f06a211 100644 --- a/src/services/light.py +++ b/src/services/light.py @@ -1,5 +1,8 @@ import os import logging +import json +import redis +import threading from typing import List, Dict, Any, Optional from datetime import datetime, timedelta from dotenv import load_dotenv @@ -10,11 +13,47 @@ load_dotenv() # 配置日志 logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) -# 全局变量用于缓存数据和过期时间 -_cached_lightpanel_scripts: Optional[List[Dict[str, Any]]] = None -_cache_expiration_time: Optional[datetime] = None -_CACHE_DURATION_MINUTES = 10 # 缓存10分钟 +if not logger.handlers: + handler = logging.StreamHandler() + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.propagate = False # 防止重复记录 + +# Redis配置 +REDIS_HOST = os.getenv("REDIS_HOST", "localhost") +REDIS_PORT = int(os.getenv("REDIS_PORT", 6379)) +REDIS_DB = int(os.getenv("REDIS_DB", 0)) +REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", None) + +# Redis缓存配置 +_CACHE_DURATION_MINUTES = int(os.getenv("REDIS_CACHE_DURATION_MINUTES", 720)) # 默认缓存10分钟 + +# 创建Redis连接 +try: + redis_client = redis.Redis( + host=REDIS_HOST, + port=REDIS_PORT, + db=REDIS_DB, + password=REDIS_PASSWORD, + decode_responses=True, + socket_connect_timeout=5, + socket_timeout=5, + retry_on_timeout=True + ) + # 测试连接 + redis_client.ping() + logger.info("成功连接到Redis服务器") +except Exception as e: + logger.error(f"无法连接到Redis服务器: {str(e)}") + redis_client = None + +# 缓存配置 +CACHE_KEY = "lightpanel_scripts" +_CACHE_LOCK = threading.Lock() # 用于防止缓存击穿的锁 +_REFRESHING = False # 标记是否正在刷新缓存 def get_lightpanel_scripts_collection(): """ @@ -30,32 +69,102 @@ def get_lightpanel_scripts_collection(): def is_cache_valid() -> bool: """ - 检查缓存是否有效 + 检查Redis缓存是否有效 Returns: bool: 缓存有效返回True,否则返回False """ - global _cached_lightpanel_scripts, _cache_expiration_time + global redis_client, CACHE_KEY - if _cached_lightpanel_scripts is None or _cache_expiration_time is None: + if redis_client is None: return False - return datetime.now() < _cache_expiration_time + try: + # 检查缓存键是否存在 + return redis_client.exists(CACHE_KEY) > 0 + except Exception as e: + logger.error(f"检查Redis缓存有效性时出错: {str(e)}") + return False def fetch_all_lightpanel_scripts() -> List[Dict[str, Any]]: """ - 从MongoDB获取所有lightpanel_scripts数据,带缓存机制 + 从MongoDB获取所有lightpanel_scripts数据,带Redis缓存机制 Returns: List[Dict[str, Any]]: lightpanel_scripts集合中的所有数据 """ - global _cached_lightpanel_scripts, _cache_expiration_time + global redis_client, CACHE_KEY, _CACHE_DURATION_MINUTES, _CACHE_LOCK, _REFRESHING + + # 如果Redis不可用,直接从数据库获取 + if redis_client is None: + return _fetch_from_database() # 检查缓存是否有效 if is_cache_valid(): - logger.info("从缓存中获取lightpanel_scripts数据") - return _cached_lightpanel_scripts + try: + # 从Redis缓存中获取数据 + cached_data = redis_client.get(CACHE_KEY) + if cached_data: + logger.info("从Redis缓存中获取lightpanel_scripts数据") + return json.loads(cached_data) + except Exception as e: + logger.error(f"从Redis缓存中获取数据时出错: {str(e)}") + # 使用锁防止缓存击穿 + with _CACHE_LOCK: + # 双重检查缓存,可能在等待锁期间已经被其他线程更新 + if is_cache_valid(): + try: + cached_data = redis_client.get(CACHE_KEY) + if cached_data: + logger.info("从Redis缓存中获取lightpanel_scripts数据(双重检查)") + return json.loads(cached_data) + except Exception as e: + logger.error(f"从Redis缓存中获取数据时出错: {str(e)}") + + # 防止多个线程同时刷新缓存 + if _REFRESHING: + # 如果正在刷新,短暂等待后再次尝试从缓存获取 + import time + time.sleep(0.1) + if is_cache_valid(): + try: + cached_data = redis_client.get(CACHE_KEY) + if cached_data: + logger.info("从Redis缓存中获取lightpanel_scripts数据(等待后)") + return json.loads(cached_data) + except Exception as e: + logger.error(f"从Redis缓存中获取数据时出错: {str(e)}") + + # 设置刷新标记 + _REFRESHING = True + try: + # 从数据库获取数据 + scripts = _fetch_from_database() + + # 更新Redis缓存,设置过期时间 + try: + redis_client.setex( + CACHE_KEY, + timedelta(minutes=_CACHE_DURATION_MINUTES), + json.dumps(scripts, default=str) + ) + logger.info(f"更新Redis缓存成功,共{len(scripts)}条数据") + except Exception as e: + logger.error(f"更新Redis缓存时出错: {str(e)}") + + return scripts + finally: + # 清除刷新标记 + _REFRESHING = False + +def _fetch_from_database() -> List[Dict[str, Any]]: + """ + 从MongoDB获取所有lightpanel_scripts数据(私有函数) + + Returns: + List[Dict[str, Any]]: lightpanel_scripts集合中的所有数据 + """ try: # 获取集合 collection = get_lightpanel_scripts_collection() @@ -68,46 +177,38 @@ def fetch_all_lightpanel_scripts() -> List[Dict[str, Any]]: doc["_id"] = str(doc["_id"]) scripts.append(doc) - # 更新缓存 - _cached_lightpanel_scripts = scripts - _cache_expiration_time = datetime.now() + timedelta(minutes=_CACHE_DURATION_MINUTES) - - logger.info(f"从数据库获取到{len(scripts)}条lightpanel_scripts数据,并更新缓存") + logger.info(f"从数据库获取到{len(scripts)}条lightpanel_scripts数据") return scripts except Exception as e: - logger.error(f"获取lightpanel_scripts数据时出错: {str(e)}") - # 如果获取数据失败但有缓存,返回缓存数据 - if _cached_lightpanel_scripts is not None: - logger.warning("返回旧的缓存数据") - return _cached_lightpanel_scripts - else: - # 没有缓存数据,抛出异常 - raise e + logger.error(f"从数据库获取lightpanel_scripts数据时出错: {str(e)}") + raise e + def refresh_lightpanel_scripts_cache() -> None: """ 手动刷新lightpanel_scripts缓存 """ - global _cached_lightpanel_scripts, _cache_expiration_time + global redis_client, CACHE_KEY, _CACHE_DURATION_MINUTES try: - # 获取集合 - collection = get_lightpanel_scripts_collection() + # 从数据库获取数据 + scripts = _fetch_from_database() - # 从数据库获取所有数据 - cursor = collection.find({}) - scripts = [] - for doc in cursor: - # 转换ObjectId为字符串 - doc["_id"] = str(doc["_id"]) - scripts.append(doc) - - # 更新缓存 - _cached_lightpanel_scripts = scripts - _cache_expiration_time = datetime.now() + timedelta(minutes=_CACHE_DURATION_MINUTES) - - logger.info(f"手动刷新缓存成功,共{len(scripts)}条数据") + # 如果Redis可用,更新缓存 + if redis_client is not None: + try: + redis_client.setex( + CACHE_KEY, + timedelta(minutes=_CACHE_DURATION_MINUTES), + json.dumps(scripts, default=str) + ) + logger.info(f"手动刷新Redis缓存成功,共{len(scripts)}条数据") + except Exception as e: + logger.error(f"手动刷新Redis缓存时出错: {str(e)}") + raise e + else: + logger.warning("Redis不可用,仅从数据库获取数据") except Exception as e: logger.error(f"手动刷新lightpanel_scripts缓存时出错: {str(e)}") @@ -163,4 +264,50 @@ def get_answers_by_user_id(user_id: str) -> Optional[Dict[str, Any]]: return {} except Exception as e: logger.error(f"根据user_id({user_id})查询answers数据时出错: {str(e)}") - raise e \ No newline at end of file + raise e + + +def get_user_scripts_with_answers(user_id: str) -> List[Dict[str, Any]]: + """ + 获取所有LightPanel灯牌数据,并根据用户的回答数据填充每个指标的值 + + Args: + user_id (str): 用户的customer_wechat_id + + Returns: + List[Dict[str, Any]]: 包含所有脚本数据及用户回答的列表 + """ + # 获取所有灯牌数据 + scripts = fetch_all_lightpanel_scripts() + + # 获取用户回答数据 + user_answers = get_answers_by_user_id(user_id) + + # 如果没有用户回答数据,初始化为空字典 + if user_answers is None: + user_answers = {} + + # 构建结果数据 + result = [] + for script in scripts: + # 复制脚本数据,但排除_id字段 + script_data = {key: value for key, value in script.items() if key != "_id"} + + # 获取该灯牌的code + code = script_data.get("code") + + # 如果用户有该灯牌的回答数据,则填充;否则设置为空字符串 + if code in user_answers and user_answers[code]: + answer_data = user_answers[code] + script_data["value"] = answer_data.get("value", "") + script_data["collected_at"] = answer_data.get("collected_at", "") + script_data["source"] = answer_data.get("source", "") + else: + # 如果没有对应的回答数据,设置为空字符串 + script_data["value"] = "" + script_data["collected_at"] = "" + script_data["source"] = "" + + result.append(script_data) + + return result \ No newline at end of file