feat(light): 添加信息灯牌功能及相关组件

- 新增light路由模块,提供灯牌脚本和答案的API接口
- 添加MongoDB连接工具类,支持异步和同步操作
- 实现灯牌服务层,包含数据缓存逻辑
- 移除数据库日志功能,简化日志模块
- 更新依赖项,添加starlette、anyio等MongoDB相关包
This commit is contained in:
2025-12-16 13:31:04 +08:00
parent 94b6cc8f8d
commit 6e5b59bde6
8 changed files with 342 additions and 133 deletions

View File

@@ -6,3 +6,7 @@ pydantic==2.5.0
pydantic-settings==2.1.0 pydantic-settings==2.1.0
requests==2.31.0 requests==2.31.0
PyJWT==2.8.0 PyJWT==2.8.0
starlette==0.27.0
anyio==3.7.1
pymongo==4.15.5
motor==3.7.1

View File

@@ -4,48 +4,3 @@ from datetime import datetime
# 创建基类,所有模型都需要继承此类 # 创建基类,所有模型都需要继承此类
Base = declarative_base() Base = declarative_base()
class Log(Base):
"""
系统日志数据库模型
该模型定义了日志表的结构,用于存储系统的各种日志信息,
包括普通日志和异常日志。
"""
__tablename__ = "logs" # 数据库表名
# 日志ID主键自动递增
id = Column(BIGINT, primary_key=True, autoincrement=True, comment="日志ID主键")
# 日志时间戳默认为当前UTC时间
timestamp = Column(DateTime, default=datetime.utcnow, nullable=False, comment="日志时间戳")
# 日志级别 (INFO, WARNING, ERROR, DEBUG, CRITICAL)
level = Column(String(20), nullable=False, comment="日志级别 (INFO, WARNING, ERROR, DEBUG, CRITICAL)")
# 产生日志的模块名
module = Column(String(100), nullable=False, comment="产生日志的模块")
# 产生日志的函数名(可选)
function = Column(String(100), nullable=True, comment="产生日志的函数")
# 日志消息内容
message = Column(Text, nullable=False, comment="日志消息")
# 错误堆栈信息(可选,主要用于异常日志)
traceback = Column(Text, nullable=True, comment="错误堆栈信息")
# 请求URL可选用于记录HTTP请求相关信息
request_url = Column(String(500), nullable=True, comment="请求URL")
# 请求方法(可选,如 GET, POST 等)
request_method = Column(String(10), nullable=True, comment="请求方法 (GET, POST等)")
# 用户代理信息(可选)
user_agent = Column(String(500), nullable=True, comment="用户代理")
# IP地址可选
ip_address = Column(String(45), nullable=True, comment="IP地址")
# 关联的用户ID可选
user_id = Column(BIGINT, nullable=True, comment="关联的用户ID")

View File

@@ -6,6 +6,7 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
APP_NAME = os.getenv("APP_NAME", "information-sign") APP_NAME = os.getenv("APP_NAME", "information-sign")
DEBUG = os.getenv("DEBUG", "True").lower() == "true" DEBUG = os.getenv("DEBUG", "True").lower() == "true"
API_PREFIX = os.getenv("API_PREFIX", "/api/v1") API_PREFIX = os.getenv("API_PREFIX", "/api/v1")
@@ -28,8 +29,9 @@ app.add_middleware(
) )
# 注册路由模块 # 注册路由模块
from src.routers import wechat_router from src.routers import wechat_router, light_router
app.include_router(wechat_router, prefix=API_PREFIX+"/wechat", tags=["企业微信"]) app.include_router(wechat_router, prefix=API_PREFIX+"/wechat", tags=["企业微信"])
app.include_router(light_router, prefix=API_PREFIX+"/light", tags=["信息灯牌"])
# 根路径端点,返回欢迎信息 # 根路径端点,返回欢迎信息
@app.get(API_PREFIX+"/") @app.get(API_PREFIX+"/")

View File

@@ -1,5 +1,6 @@
"""路由模块包""" """路由模块包"""
from src.routers.wechat import wechat_router from src.routers.wechat import wechat_router
from src.routers.light import light_router
__all__ = ["chat_router", "customer_allot_router", "wechat_router"] __all__ = ["wechat_router", "light_router"]

63
src/routers/light.py Normal file
View File

@@ -0,0 +1,63 @@
from fastapi import APIRouter, HTTPException, Query
from typing import List, Optional
from src.services.light import fetch_all_lightpanel_scripts, get_lightpanel_script_by_code, get_answers_by_user_id
from src.models.response import success_response, error_response, ResponseModel
from typing import Dict, Any
# 创建路由器实例
light_router = APIRouter()
@light_router.get("/scripts", response_model=ResponseModel[List[Dict[str, Any]]])
async def get_all_lightpanel_scripts():
"""
获取所有LightPanel脚本数据
Returns:
ResponseModel: 包含所有脚本数据的统一响应格式
"""
try:
scripts = fetch_all_lightpanel_scripts()
return success_response(data=scripts, message="成功获取所有LightPanel脚本")
except Exception as e:
return error_response(message=f"获取LightPanel脚本失败: {str(e)}", code=500)
@light_router.get("/script/{code}", response_model=ResponseModel[Dict[str, Any]])
async def get_lightpanel_script(code: str):
"""
根据code获取特定的LightPanel脚本
Args:
code (str): 信息类型
Returns:
ResponseModel: 包含特定脚本数据的统一响应格式
"""
try:
script = get_lightpanel_script_by_code(code)
if script:
return success_response(data=script, message=f"成功获取code为{code}的LightPanel脚本")
else:
return error_response(message=f"未找到code为{code}的LightPanel脚本", code=404)
except Exception as e:
return error_response(message=f"获取LightPanel脚本失败: {str(e)}", code=500)
@light_router.get("/answers/{user_id}", response_model=ResponseModel[Dict[str, Any]])
async def get_answers_by_user(user_id: str):
"""
根据用户ID获取用户的灯牌数据
Args:
user_id (str): 用户的customer_wechat_id
Returns:
ResponseModel: 包含用户灯牌数据的统一响应格式
"""
try:
answers = get_answers_by_user_id(user_id)
if answers is not None:
return success_response(data=answers, message=f"成功获取用户{user_id}的灯牌数据")
else:
# 未找到数据时返回空字典而不是空字符串
return success_response(data={}, message=f"未找到用户{user_id}的灯牌数据")
except Exception as e:
return error_response(message=f"获取用户灯牌数据失败: {str(e)}", code=500)

166
src/services/light.py Normal file
View File

@@ -0,0 +1,166 @@
import os
import logging
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
from dotenv import load_dotenv
from src.utils.mongodb import get_mongodb_sync_db
# 加载环境变量
load_dotenv()
# 配置日志
logger = logging.getLogger(__name__)
# 全局变量用于缓存数据和过期时间
_cached_lightpanel_scripts: Optional[List[Dict[str, Any]]] = None
_cache_expiration_time: Optional[datetime] = None
_CACHE_DURATION_MINUTES = 10 # 缓存10分钟
def get_lightpanel_scripts_collection():
"""
获取lightpanel_scripts集合
Returns:
Collection: MongoDB集合对象
"""
db = get_mongodb_sync_db()
db_name = os.getenv("MONGODB_DATABASE", "TriCore")
collection = db["lightpanle_scripts"] # 注意根据用户提供的信息集合名为lightpanle_scripts可能是个拼写错误
return collection
def is_cache_valid() -> bool:
"""
检查缓存是否有效
Returns:
bool: 缓存有效返回True否则返回False
"""
global _cached_lightpanel_scripts, _cache_expiration_time
if _cached_lightpanel_scripts is None or _cache_expiration_time is None:
return False
return datetime.now() < _cache_expiration_time
def fetch_all_lightpanel_scripts() -> List[Dict[str, Any]]:
"""
从MongoDB获取所有lightpanel_scripts数据带缓存机制
Returns:
List[Dict[str, Any]]: lightpanel_scripts集合中的所有数据
"""
global _cached_lightpanel_scripts, _cache_expiration_time
# 检查缓存是否有效
if is_cache_valid():
logger.info("从缓存中获取lightpanel_scripts数据")
return _cached_lightpanel_scripts
try:
# 获取集合
collection = get_lightpanel_scripts_collection()
# 从数据库获取所有数据
cursor = collection.find({})
scripts = []
for doc in cursor:
# 转换ObjectId为字符串
doc["_id"] = str(doc["_id"])
scripts.append(doc)
# 更新缓存
_cached_lightpanel_scripts = scripts
_cache_expiration_time = datetime.now() + timedelta(minutes=_CACHE_DURATION_MINUTES)
logger.info(f"从数据库获取到{len(scripts)}条lightpanel_scripts数据并更新缓存")
return scripts
except Exception as e:
logger.error(f"获取lightpanel_scripts数据时出错: {str(e)}")
# 如果获取数据失败但有缓存,返回缓存数据
if _cached_lightpanel_scripts is not None:
logger.warning("返回旧的缓存数据")
return _cached_lightpanel_scripts
else:
# 没有缓存数据,抛出异常
raise e
def refresh_lightpanel_scripts_cache() -> None:
"""
手动刷新lightpanel_scripts缓存
"""
global _cached_lightpanel_scripts, _cache_expiration_time
try:
# 获取集合
collection = get_lightpanel_scripts_collection()
# 从数据库获取所有数据
cursor = collection.find({})
scripts = []
for doc in cursor:
# 转换ObjectId为字符串
doc["_id"] = str(doc["_id"])
scripts.append(doc)
# 更新缓存
_cached_lightpanel_scripts = scripts
_cache_expiration_time = datetime.now() + timedelta(minutes=_CACHE_DURATION_MINUTES)
logger.info(f"手动刷新缓存成功,共{len(scripts)}条数据")
except Exception as e:
logger.error(f"手动刷新lightpanel_scripts缓存时出错: {str(e)}")
raise e
def get_lightpanel_script_by_code(code: str) -> Optional[Dict[str, Any]]:
"""
根据code获取特定的lightpanel_script
Args:
code (str): 信息类型
Returns:
Optional[Dict[str, Any]]: 匹配的脚本数据如果没有找到返回None
"""
scripts = fetch_all_lightpanel_scripts()
for script in scripts:
if script.get("code") == code:
return script
return None
def get_lightpanle_answers_collection():
"""
获取lightpanle_answers集合
Returns:
Collection: MongoDB集合对象
"""
db = get_mongodb_sync_db()
db_name = os.getenv("MONGODB_DATABASE", "TriCore")
collection = db["lightpanle_answers"]
return collection
def get_answers_by_user_id(user_id: str) -> Optional[Dict[str, Any]]:
"""
根据user_id即customer_wechat_id查询lightpanle_answers集合返回对应的answers数据
Args:
user_id (str): 用户的customer_wechat_id
Returns:
Optional[Dict[str, Any]]: 包含answers字段的字典若未找到则返回空字典
"""
try:
collection = get_lightpanle_answers_collection()
doc = collection.find_one({"customer_wechat_id": user_id})
if doc:
# 转换ObjectId为字符串
doc["_id"] = str(doc["_id"])
return doc.get("answers")
# 未找到数据时返回空字典而不是None
return {}
except Exception as e:
logger.error(f"根据user_id({user_id})查询answers数据时出错: {str(e)}")
raise e

View File

@@ -3,14 +3,10 @@ import os
import logging import logging
import traceback import traceback
from typing import Optional from typing import Optional
from datetime import datetime
# 将项目根目录添加到 Python 路径中,确保可以正确导入项目模块 # 将项目根目录添加到 Python 路径中,确保可以正确导入项目模块
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from src.db.database import get_db
from src.db.models import Log
# 配置基础日志设置 # 配置基础日志设置
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
@@ -20,61 +16,6 @@ logging.basicConfig(
# 创建模块级日志记录器 # 创建模块级日志记录器
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def log_to_database(
level: str,
message: str,
module: str,
function: Optional[str] = None,
traceback_info: Optional[str] = None,
request_url: Optional[str] = None,
request_method: Optional[str] = None,
user_agent: Optional[str] = None,
ip_address: Optional[str] = None,
user_id: Optional[int] = None
):
"""
将日志信息保存到数据库
参数:
- level: 日志级别 (INFO, WARNING, ERROR, DEBUG)
- message: 日志消息内容
- module: 产生日志的模块名
- function: 产生日志的函数名(可选)
- traceback_info: 异常堆栈信息(可选)
- request_url: 请求URL可选
- request_method: 请求方法(可选)
- user_agent: 用户代理信息(可选)
- ip_address: IP地址可选
- user_id: 用户ID可选
"""
try:
# 获取数据库会话
db_generator = get_db()
db = next(db_generator)
# 创建日志条目对象
log_entry = Log(
level=level,
message=message,
module=module,
function=function,
traceback=traceback_info,
request_url=request_url,
request_method=request_method,
user_agent=user_agent,
ip_address=ip_address,
user_id=user_id
)
# 保存到数据库
db.add(log_entry)
db.commit()
db.refresh(log_entry)
db.close()
except Exception as e:
# 如果数据库记录失败,至少打印到控制台
logger.error(f"Failed to log to database: {str(e)}")
def capture_exception( def capture_exception(
exception: Exception, exception: Exception,
module: str, module: str,
@@ -86,7 +27,7 @@ def capture_exception(
user_id: Optional[int] = None user_id: Optional[int] = None
): ):
""" """
捕获并记录异常信息 捕获并记录异常信息到控制台
参数: 参数:
- exception: 捕获到的异常对象 - exception: 捕获到的异常对象
@@ -101,26 +42,12 @@ def capture_exception(
# 获取异常堆栈信息 # 获取异常堆栈信息
tb_str = ''.join(traceback.format_exception(type(exception), exception, exception.__traceback__)) tb_str = ''.join(traceback.format_exception(type(exception), exception, exception.__traceback__))
# 记录错误日志到数据库和控制台 # 打印到控制台
log_to_database(
level="ERROR",
message=str(exception),
module=module,
function=function,
traceback_info=tb_str,
request_url=request_url,
request_method=request_method,
user_agent=user_agent,
ip_address=ip_address,
user_id=user_id
)
# 同时打印到控制台
logger.error(f"[{module}] {str(exception)}", exc_info=True) logger.error(f"[{module}] {str(exception)}", exc_info=True)
def info(message: str, module: str, function: Optional[str] = None): def info(message: str, module: str, function: Optional[str] = None):
""" """
记录INFO级别日志 记录INFO级别日志到控制台
参数: 参数:
- message: 日志消息内容 - message: 日志消息内容
@@ -128,11 +55,10 @@ def info(message: str, module: str, function: Optional[str] = None):
- function: 产生日志的函数名(可选) - function: 产生日志的函数名(可选)
""" """
logger.info(f"[{module}] {message}") logger.info(f"[{module}] {message}")
log_to_database("INFO", message, module, function)
def warning(message: str, module: str, function: Optional[str] = None): def warning(message: str, module: str, function: Optional[str] = None):
""" """
记录WARNING级别日志 记录WARNING级别日志到控制台
参数: 参数:
- message: 日志消息内容 - message: 日志消息内容
@@ -140,11 +66,10 @@ def warning(message: str, module: str, function: Optional[str] = None):
- function: 产生日志的函数名(可选) - function: 产生日志的函数名(可选)
""" """
logger.warning(f"[{module}] {message}") logger.warning(f"[{module}] {message}")
log_to_database("WARNING", message, module, function)
def error(message: str, module: str, function: Optional[str] = None): def error(message: str, module: str, function: Optional[str] = None):
""" """
记录ERROR级别日志 记录ERROR级别日志到控制台
参数: 参数:
- message: 日志消息内容 - message: 日志消息内容
@@ -152,11 +77,10 @@ def error(message: str, module: str, function: Optional[str] = None):
- function: 产生日志的函数名(可选) - function: 产生日志的函数名(可选)
""" """
logger.error(f"[{module}] {message}") logger.error(f"[{module}] {message}")
log_to_database("ERROR", message, module, function)
def debug(message: str, module: str, function: Optional[str] = None): def debug(message: str, module: str, function: Optional[str] = None):
""" """
记录DEBUG级别日志 记录DEBUG级别日志到控制台
参数: 参数:
- message: 日志消息内容 - message: 日志消息内容
@@ -164,4 +88,3 @@ def debug(message: str, module: str, function: Optional[str] = None):
- function: 产生日志的函数名(可选) - function: 产生日志的函数名(可选)
""" """
logger.debug(f"[{module}] {message}") logger.debug(f"[{module}] {message}")
log_to_database("DEBUG", message, module, function)

95
src/utils/mongodb.py Normal file
View File

@@ -0,0 +1,95 @@
import os
import asyncio
import logging
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
from pymongo import MongoClient
# 配置日志
logger = logging.getLogger(__name__)
# 从环境变量获取MongoDB配置
MONGODB_URI = os.getenv("MONGODB_URI", "mongodb://localhost:27017")
MONGODB_DATABASE = os.getenv("MONGODB_DATABASE", "default_db")
class MongoDB:
"""MongoDB连接管理类"""
_client: AsyncIOMotorClient = None
_sync_client: MongoClient = None
_db: AsyncIOMotorDatabase = None
_initialized: bool = False
_sync_initialized: bool = False
@classmethod
async def initialize(cls):
"""初始化MongoDB连接异步"""
if cls._initialized:
return
try:
cls._client = AsyncIOMotorClient(MONGODB_URI)
cls._db = cls._client[MONGODB_DATABASE]
# 验证连接
await cls._client.server_info()
logger.info("MongoDB异步连接成功")
cls._initialized = True
except Exception as e:
logger.error(f"MongoDB异步连接失败: {str(e)}")
raise
@classmethod
def initialize_sync(cls):
"""初始化MongoDB连接同步"""
if cls._sync_initialized:
return
try:
cls._sync_client = MongoClient(MONGODB_URI)
# 验证连接
cls._sync_client.server_info()
logger.info("MongoDB同步连接成功")
cls._sync_initialized = True
except Exception as e:
logger.error(f"MongoDB同步连接失败: {str(e)}")
raise
@classmethod
def get_db(cls) -> AsyncIOMotorDatabase:
"""获取异步数据库实例"""
if not cls._initialized:
raise RuntimeError("MongoDB尚未初始化请先调用initialize()")
return cls._db
@classmethod
def get_sync_db(cls):
"""获取同步数据库实例"""
if not cls._sync_initialized:
cls.initialize_sync()
return cls._sync_client[MONGODB_DATABASE]
@classmethod
async def close(cls):
"""关闭MongoDB连接"""
if cls._client:
cls._client.close()
await asyncio.sleep(0.1) # 等待连接关闭
cls._initialized = False
logger.info("MongoDB异步连接已关闭")
if cls._sync_client:
cls._sync_client.close()
cls._sync_initialized = False
logger.info("MongoDB同步连接已关闭")
# 创建全局MongoDB实例
mongodb = MongoDB()
async def get_mongodb_db() -> AsyncIOMotorDatabase:
"""获取MongoDB数据库实例用于依赖注入"""
if not MongoDB._initialized:
await MongoDB.initialize()
return MongoDB.get_db()
def get_mongodb_sync_db():
"""获取MongoDB同步数据库实例用于依赖注入"""
return MongoDB.get_sync_db()