From 6e5b59bde6bced2dbd0ed00914812c7e392c064f Mon Sep 17 00:00:00 2001 From: chenpanliang <3245129380@qq.com> Date: Tue, 16 Dec 2025 13:31:04 +0800 Subject: [PATCH] =?UTF-8?q?feat(light):=20=E6=B7=BB=E5=8A=A0=E4=BF=A1?= =?UTF-8?q?=E6=81=AF=E7=81=AF=E7=89=8C=E5=8A=9F=E8=83=BD=E5=8F=8A=E7=9B=B8?= =?UTF-8?q?=E5=85=B3=E7=BB=84=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增light路由模块,提供灯牌脚本和答案的API接口 - 添加MongoDB连接工具类,支持异步和同步操作 - 实现灯牌服务层,包含数据缓存逻辑 - 移除数据库日志功能,简化日志模块 - 更新依赖项,添加starlette、anyio等MongoDB相关包 --- requirements.txt | 6 +- src/db/models.py | 47 +----------- src/main.py | 4 +- src/routers/__init__.py | 3 +- src/routers/light.py | 63 +++++++++++++++ src/services/light.py | 166 ++++++++++++++++++++++++++++++++++++++++ src/utils/logger.py | 91 ++-------------------- src/utils/mongodb.py | 95 +++++++++++++++++++++++ 8 files changed, 342 insertions(+), 133 deletions(-) create mode 100644 src/routers/light.py create mode 100644 src/services/light.py create mode 100644 src/utils/mongodb.py diff --git a/requirements.txt b/requirements.txt index 3affb4b..30a3999 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,8 @@ python-dotenv==1.0.0 pydantic==2.5.0 pydantic-settings==2.1.0 requests==2.31.0 -PyJWT==2.8.0 \ No newline at end of file +PyJWT==2.8.0 +starlette==0.27.0 +anyio==3.7.1 +pymongo==4.15.5 +motor==3.7.1 \ No newline at end of file diff --git a/src/db/models.py b/src/db/models.py index 5e6f592..39cb24c 100644 --- a/src/db/models.py +++ b/src/db/models.py @@ -3,49 +3,4 @@ from sqlalchemy.ext.declarative import declarative_base from datetime import datetime # 创建基类,所有模型都需要继承此类 -Base = declarative_base() - -class Log(Base): - """ - 系统日志数据库模型 - - 该模型定义了日志表的结构,用于存储系统的各种日志信息, - 包括普通日志和异常日志。 - """ - __tablename__ = "logs" # 数据库表名 - - # 日志ID,主键,自动递增 - id = Column(BIGINT, primary_key=True, autoincrement=True, comment="日志ID,主键") - - # 日志时间戳,默认为当前UTC时间 - timestamp = Column(DateTime, default=datetime.utcnow, nullable=False, comment="日志时间戳") - - # 日志级别 (INFO, WARNING, ERROR, DEBUG, CRITICAL) - level = Column(String(20), nullable=False, comment="日志级别 (INFO, WARNING, ERROR, DEBUG, CRITICAL)") - - # 产生日志的模块名 - module = Column(String(100), nullable=False, comment="产生日志的模块") - - # 产生日志的函数名(可选) - function = Column(String(100), nullable=True, comment="产生日志的函数") - - # 日志消息内容 - message = Column(Text, nullable=False, comment="日志消息") - - # 错误堆栈信息(可选,主要用于异常日志) - traceback = Column(Text, nullable=True, comment="错误堆栈信息") - - # 请求URL(可选,用于记录HTTP请求相关信息) - request_url = Column(String(500), nullable=True, comment="请求URL") - - # 请求方法(可选,如 GET, POST 等) - request_method = Column(String(10), nullable=True, comment="请求方法 (GET, POST等)") - - # 用户代理信息(可选) - user_agent = Column(String(500), nullable=True, comment="用户代理") - - # IP地址(可选) - ip_address = Column(String(45), nullable=True, comment="IP地址") - - # 关联的用户ID(可选) - user_id = Column(BIGINT, nullable=True, comment="关联的用户ID") \ No newline at end of file +Base = declarative_base() \ No newline at end of file diff --git a/src/main.py b/src/main.py index 3d2df47..0f25152 100644 --- a/src/main.py +++ b/src/main.py @@ -6,6 +6,7 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware + APP_NAME = os.getenv("APP_NAME", "information-sign") DEBUG = os.getenv("DEBUG", "True").lower() == "true" API_PREFIX = os.getenv("API_PREFIX", "/api/v1") @@ -28,8 +29,9 @@ app.add_middleware( ) # 注册路由模块 -from src.routers import wechat_router +from src.routers import wechat_router, light_router app.include_router(wechat_router, prefix=API_PREFIX+"/wechat", tags=["企业微信"]) +app.include_router(light_router, prefix=API_PREFIX+"/light", tags=["信息灯牌"]) # 根路径端点,返回欢迎信息 @app.get(API_PREFIX+"/") diff --git a/src/routers/__init__.py b/src/routers/__init__.py index 70a2d22..4495dbc 100644 --- a/src/routers/__init__.py +++ b/src/routers/__init__.py @@ -1,5 +1,6 @@ """路由模块包""" from src.routers.wechat import wechat_router +from src.routers.light import light_router -__all__ = ["chat_router", "customer_allot_router", "wechat_router"] \ No newline at end of file +__all__ = ["wechat_router", "light_router"] diff --git a/src/routers/light.py b/src/routers/light.py new file mode 100644 index 0000000..81b760b --- /dev/null +++ b/src/routers/light.py @@ -0,0 +1,63 @@ +from fastapi import APIRouter, HTTPException, Query +from typing import List, Optional +from src.services.light import fetch_all_lightpanel_scripts, get_lightpanel_script_by_code, get_answers_by_user_id +from src.models.response import success_response, error_response, ResponseModel +from typing import Dict, Any + +# 创建路由器实例 +light_router = APIRouter() + +@light_router.get("/scripts", response_model=ResponseModel[List[Dict[str, Any]]]) +async def get_all_lightpanel_scripts(): + """ + 获取所有LightPanel脚本数据 + + Returns: + ResponseModel: 包含所有脚本数据的统一响应格式 + """ + try: + scripts = fetch_all_lightpanel_scripts() + return success_response(data=scripts, message="成功获取所有LightPanel脚本") + except Exception as e: + return error_response(message=f"获取LightPanel脚本失败: {str(e)}", code=500) + +@light_router.get("/script/{code}", response_model=ResponseModel[Dict[str, Any]]) +async def get_lightpanel_script(code: str): + """ + 根据code获取特定的LightPanel脚本 + + Args: + code (str): 信息类型 + + Returns: + ResponseModel: 包含特定脚本数据的统一响应格式 + """ + try: + script = get_lightpanel_script_by_code(code) + if script: + return success_response(data=script, message=f"成功获取code为{code}的LightPanel脚本") + else: + return error_response(message=f"未找到code为{code}的LightPanel脚本", code=404) + except Exception as e: + return error_response(message=f"获取LightPanel脚本失败: {str(e)}", code=500) + +@light_router.get("/answers/{user_id}", response_model=ResponseModel[Dict[str, Any]]) +async def get_answers_by_user(user_id: str): + """ + 根据用户ID获取用户的灯牌数据 + + Args: + user_id (str): 用户的customer_wechat_id + + Returns: + ResponseModel: 包含用户灯牌数据的统一响应格式 + """ + try: + answers = get_answers_by_user_id(user_id) + if answers is not None: + return success_response(data=answers, message=f"成功获取用户{user_id}的灯牌数据") + else: + # 未找到数据时返回空字典而不是空字符串 + return success_response(data={}, message=f"未找到用户{user_id}的灯牌数据") + except Exception as e: + return error_response(message=f"获取用户灯牌数据失败: {str(e)}", code=500) diff --git a/src/services/light.py b/src/services/light.py new file mode 100644 index 0000000..99d69c6 --- /dev/null +++ b/src/services/light.py @@ -0,0 +1,166 @@ +import os +import logging +from typing import List, Dict, Any, Optional +from datetime import datetime, timedelta +from dotenv import load_dotenv +from src.utils.mongodb import get_mongodb_sync_db + +# 加载环境变量 +load_dotenv() + +# 配置日志 +logger = logging.getLogger(__name__) + +# 全局变量用于缓存数据和过期时间 +_cached_lightpanel_scripts: Optional[List[Dict[str, Any]]] = None +_cache_expiration_time: Optional[datetime] = None +_CACHE_DURATION_MINUTES = 10 # 缓存10分钟 + +def get_lightpanel_scripts_collection(): + """ + 获取lightpanel_scripts集合 + + Returns: + Collection: MongoDB集合对象 + """ + db = get_mongodb_sync_db() + db_name = os.getenv("MONGODB_DATABASE", "TriCore") + collection = db["lightpanle_scripts"] # 注意:根据用户提供的信息,集合名为lightpanle_scripts(可能是个拼写错误) + return collection + +def is_cache_valid() -> bool: + """ + 检查缓存是否有效 + + Returns: + bool: 缓存有效返回True,否则返回False + """ + global _cached_lightpanel_scripts, _cache_expiration_time + + if _cached_lightpanel_scripts is None or _cache_expiration_time is None: + return False + + return datetime.now() < _cache_expiration_time + +def fetch_all_lightpanel_scripts() -> List[Dict[str, Any]]: + """ + 从MongoDB获取所有lightpanel_scripts数据,带缓存机制 + + Returns: + List[Dict[str, Any]]: lightpanel_scripts集合中的所有数据 + """ + global _cached_lightpanel_scripts, _cache_expiration_time + + # 检查缓存是否有效 + if is_cache_valid(): + logger.info("从缓存中获取lightpanel_scripts数据") + return _cached_lightpanel_scripts + + try: + # 获取集合 + collection = get_lightpanel_scripts_collection() + + # 从数据库获取所有数据 + cursor = collection.find({}) + scripts = [] + for doc in cursor: + # 转换ObjectId为字符串 + doc["_id"] = str(doc["_id"]) + scripts.append(doc) + + # 更新缓存 + _cached_lightpanel_scripts = scripts + _cache_expiration_time = datetime.now() + timedelta(minutes=_CACHE_DURATION_MINUTES) + + logger.info(f"从数据库获取到{len(scripts)}条lightpanel_scripts数据,并更新缓存") + return scripts + + except Exception as e: + logger.error(f"获取lightpanel_scripts数据时出错: {str(e)}") + # 如果获取数据失败但有缓存,返回缓存数据 + if _cached_lightpanel_scripts is not None: + logger.warning("返回旧的缓存数据") + return _cached_lightpanel_scripts + else: + # 没有缓存数据,抛出异常 + raise e + +def refresh_lightpanel_scripts_cache() -> None: + """ + 手动刷新lightpanel_scripts缓存 + """ + global _cached_lightpanel_scripts, _cache_expiration_time + + try: + # 获取集合 + collection = get_lightpanel_scripts_collection() + + # 从数据库获取所有数据 + cursor = collection.find({}) + scripts = [] + for doc in cursor: + # 转换ObjectId为字符串 + doc["_id"] = str(doc["_id"]) + scripts.append(doc) + + # 更新缓存 + _cached_lightpanel_scripts = scripts + _cache_expiration_time = datetime.now() + timedelta(minutes=_CACHE_DURATION_MINUTES) + + logger.info(f"手动刷新缓存成功,共{len(scripts)}条数据") + + except Exception as e: + logger.error(f"手动刷新lightpanel_scripts缓存时出错: {str(e)}") + raise e + +def get_lightpanel_script_by_code(code: str) -> Optional[Dict[str, Any]]: + """ + 根据code获取特定的lightpanel_script + + Args: + code (str): 信息类型 + + Returns: + Optional[Dict[str, Any]]: 匹配的脚本数据,如果没有找到返回None + """ + scripts = fetch_all_lightpanel_scripts() + for script in scripts: + if script.get("code") == code: + return script + return None + + +def get_lightpanle_answers_collection(): + """ + 获取lightpanle_answers集合 + + Returns: + Collection: MongoDB集合对象 + """ + db = get_mongodb_sync_db() + db_name = os.getenv("MONGODB_DATABASE", "TriCore") + collection = db["lightpanle_answers"] + return collection + +def get_answers_by_user_id(user_id: str) -> Optional[Dict[str, Any]]: + """ + 根据user_id(即customer_wechat_id)查询lightpanle_answers集合,返回对应的answers数据 + + Args: + user_id (str): 用户的customer_wechat_id + + Returns: + Optional[Dict[str, Any]]: 包含answers字段的字典,若未找到则返回空字典 + """ + try: + collection = get_lightpanle_answers_collection() + doc = collection.find_one({"customer_wechat_id": user_id}) + if doc: + # 转换ObjectId为字符串 + doc["_id"] = str(doc["_id"]) + return doc.get("answers") + # 未找到数据时返回空字典而不是None + return {} + except Exception as e: + logger.error(f"根据user_id({user_id})查询answers数据时出错: {str(e)}") + raise e \ No newline at end of file diff --git a/src/utils/logger.py b/src/utils/logger.py index d1be55a..48e440f 100644 --- a/src/utils/logger.py +++ b/src/utils/logger.py @@ -3,14 +3,10 @@ import os import logging import traceback from typing import Optional -from datetime import datetime # 将项目根目录添加到 Python 路径中,确保可以正确导入项目模块 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) -from src.db.database import get_db -from src.db.models import Log - # 配置基础日志设置 logging.basicConfig( level=logging.INFO, @@ -20,61 +16,6 @@ logging.basicConfig( # 创建模块级日志记录器 logger = logging.getLogger(__name__) -def log_to_database( - level: str, - message: str, - module: str, - function: Optional[str] = None, - traceback_info: Optional[str] = None, - request_url: Optional[str] = None, - request_method: Optional[str] = None, - user_agent: Optional[str] = None, - ip_address: Optional[str] = None, - user_id: Optional[int] = None -): - """ - 将日志信息保存到数据库 - - 参数: - - level: 日志级别 (INFO, WARNING, ERROR, DEBUG) - - message: 日志消息内容 - - module: 产生日志的模块名 - - function: 产生日志的函数名(可选) - - traceback_info: 异常堆栈信息(可选) - - request_url: 请求URL(可选) - - request_method: 请求方法(可选) - - user_agent: 用户代理信息(可选) - - ip_address: IP地址(可选) - - user_id: 用户ID(可选) - """ - try: - # 获取数据库会话 - db_generator = get_db() - db = next(db_generator) - - # 创建日志条目对象 - log_entry = Log( - level=level, - message=message, - module=module, - function=function, - traceback=traceback_info, - request_url=request_url, - request_method=request_method, - user_agent=user_agent, - ip_address=ip_address, - user_id=user_id - ) - - # 保存到数据库 - db.add(log_entry) - db.commit() - db.refresh(log_entry) - db.close() - except Exception as e: - # 如果数据库记录失败,至少打印到控制台 - logger.error(f"Failed to log to database: {str(e)}") - def capture_exception( exception: Exception, module: str, @@ -86,7 +27,7 @@ def capture_exception( user_id: Optional[int] = None ): """ - 捕获并记录异常信息 + 捕获并记录异常信息到控制台 参数: - exception: 捕获到的异常对象 @@ -101,26 +42,12 @@ def capture_exception( # 获取异常堆栈信息 tb_str = ''.join(traceback.format_exception(type(exception), exception, exception.__traceback__)) - # 记录错误日志到数据库和控制台 - log_to_database( - level="ERROR", - message=str(exception), - module=module, - function=function, - traceback_info=tb_str, - request_url=request_url, - request_method=request_method, - user_agent=user_agent, - ip_address=ip_address, - user_id=user_id - ) - - # 同时打印到控制台 + # 打印到控制台 logger.error(f"[{module}] {str(exception)}", exc_info=True) def info(message: str, module: str, function: Optional[str] = None): """ - 记录INFO级别日志 + 记录INFO级别日志到控制台 参数: - message: 日志消息内容 @@ -128,11 +55,10 @@ def info(message: str, module: str, function: Optional[str] = None): - function: 产生日志的函数名(可选) """ logger.info(f"[{module}] {message}") - log_to_database("INFO", message, module, function) def warning(message: str, module: str, function: Optional[str] = None): """ - 记录WARNING级别日志 + 记录WARNING级别日志到控制台 参数: - message: 日志消息内容 @@ -140,11 +66,10 @@ def warning(message: str, module: str, function: Optional[str] = None): - function: 产生日志的函数名(可选) """ logger.warning(f"[{module}] {message}") - log_to_database("WARNING", message, module, function) def error(message: str, module: str, function: Optional[str] = None): """ - 记录ERROR级别日志 + 记录ERROR级别日志到控制台 参数: - message: 日志消息内容 @@ -152,16 +77,14 @@ def error(message: str, module: str, function: Optional[str] = None): - function: 产生日志的函数名(可选) """ logger.error(f"[{module}] {message}") - log_to_database("ERROR", message, module, function) def debug(message: str, module: str, function: Optional[str] = None): """ - 记录DEBUG级别日志 + 记录DEBUG级别日志到控制台 参数: - message: 日志消息内容 - module: 产生日志的模块名 - function: 产生日志的函数名(可选) """ - logger.debug(f"[{module}] {message}") - log_to_database("DEBUG", message, module, function) \ No newline at end of file + logger.debug(f"[{module}] {message}") \ No newline at end of file diff --git a/src/utils/mongodb.py b/src/utils/mongodb.py new file mode 100644 index 0000000..24f8ce5 --- /dev/null +++ b/src/utils/mongodb.py @@ -0,0 +1,95 @@ +import os +import asyncio +import logging +from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase +from pymongo import MongoClient + +# 配置日志 +logger = logging.getLogger(__name__) + +# 从环境变量获取MongoDB配置 +MONGODB_URI = os.getenv("MONGODB_URI", "mongodb://localhost:27017") +MONGODB_DATABASE = os.getenv("MONGODB_DATABASE", "default_db") + +class MongoDB: + """MongoDB连接管理类""" + _client: AsyncIOMotorClient = None + _sync_client: MongoClient = None + _db: AsyncIOMotorDatabase = None + _initialized: bool = False + _sync_initialized: bool = False + + @classmethod + async def initialize(cls): + """初始化MongoDB连接(异步)""" + if cls._initialized: + return + + try: + cls._client = AsyncIOMotorClient(MONGODB_URI) + cls._db = cls._client[MONGODB_DATABASE] + + # 验证连接 + await cls._client.server_info() + logger.info("MongoDB异步连接成功") + cls._initialized = True + except Exception as e: + logger.error(f"MongoDB异步连接失败: {str(e)}") + raise + + @classmethod + def initialize_sync(cls): + """初始化MongoDB连接(同步)""" + if cls._sync_initialized: + return + + try: + cls._sync_client = MongoClient(MONGODB_URI) + # 验证连接 + cls._sync_client.server_info() + logger.info("MongoDB同步连接成功") + cls._sync_initialized = True + except Exception as e: + logger.error(f"MongoDB同步连接失败: {str(e)}") + raise + + @classmethod + def get_db(cls) -> AsyncIOMotorDatabase: + """获取异步数据库实例""" + if not cls._initialized: + raise RuntimeError("MongoDB尚未初始化,请先调用initialize()") + return cls._db + + @classmethod + def get_sync_db(cls): + """获取同步数据库实例""" + if not cls._sync_initialized: + cls.initialize_sync() + return cls._sync_client[MONGODB_DATABASE] + + @classmethod + async def close(cls): + """关闭MongoDB连接""" + if cls._client: + cls._client.close() + await asyncio.sleep(0.1) # 等待连接关闭 + cls._initialized = False + logger.info("MongoDB异步连接已关闭") + + if cls._sync_client: + cls._sync_client.close() + cls._sync_initialized = False + logger.info("MongoDB同步连接已关闭") + +# 创建全局MongoDB实例 +mongodb = MongoDB() + +async def get_mongodb_db() -> AsyncIOMotorDatabase: + """获取MongoDB数据库实例(用于依赖注入)""" + if not MongoDB._initialized: + await MongoDB.initialize() + return MongoDB.get_db() + +def get_mongodb_sync_db(): + """获取MongoDB同步数据库实例(用于依赖注入)""" + return MongoDB.get_sync_db() \ No newline at end of file