refactor(api): 重构FastAPI主应用入口,拆分业务模块路由

- 移除集中式招聘者账号路由定义,改为模块化拆分到routes目录
- 按业务拆分路由;包括recruiter(招聘者账号)、scheduler(定时任务)、system(系统接口)三部分
- 新增routes/__init__.py管理各子路由统一导入与导出
- routes/recruiter.py实现招聘者账号的完整CRUD及自动注册功能
- routes/scheduler.py实现定时任务的查询、控制与配置功能
- routes/system.py实现健康检查、API根路径和状态接口
- schemas.py集中定义API请求和响应模型,支持各业务模块使用
- account_sync_job.py完善职位同步逻辑,支持删除过期职位并返回新增与删除数量
- overall 优化应用结构,提升代码可维护性与模块化程度
This commit is contained in:
2026-03-24 17:34:54 +08:00
parent 49cd8682d0
commit 91a4094881
9 changed files with 1240 additions and 266 deletions

View File

@@ -1,73 +1,29 @@
"""FastAPI routes for HR Agent"""
from typing import List, Optional
from datetime import datetime
from pydantic import BaseModel, Field
"""
FastAPI主应用入口
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
按业务模块拆分路由:
- routes/recruiter.py: 招聘者账号管理
- routes/scheduler.py: 定时任务管理
- routes/system.py: 系统接口
"""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from ..domain.candidate import CandidateSource
from ..domain.recruiter import Recruiter, RecruiterStatus
from ..service.recruiter_service import RecruiterService
from ..mapper.recruiter_mapper import RecruiterMapper
from ..config.settings import get_settings
from .routes import recruiter_router, scheduler_router
from .routes.system import router as system_router
# ============== Pydantic Schemas ==============
class RecruiterCreate(BaseModel):
"""创建招聘者账号请求"""
name: str = Field(..., description="账号名称/标识")
source: str = Field(default="boss", description="平台来源: boss, liepin, etc.")
wt_token: str = Field(..., description="WT Token")
class RecruiterUpdate(BaseModel):
"""更新招聘者账号请求"""
name: Optional[str] = Field(None, description="账号名称")
wt_token: Optional[str] = Field(None, description="WT Token")
status: Optional[str] = Field(None, description="状态: active, inactive, expired")
class RecruiterResponse(BaseModel):
"""招聘者账号响应"""
id: str
name: str
source: str
wt_token: str
status: str
last_used_at: Optional[datetime] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class RecruiterListResponse(BaseModel):
"""招聘者列表响应"""
total: int
items: List[RecruiterResponse]
class APIResponse(BaseModel):
"""通用API响应"""
success: bool
message: str
data: Optional[dict] = None
# ============== FastAPI App ==============
def create_app() -> FastAPI:
"""创建FastAPI应用"""
app = FastAPI(
title="简历智能体 API",
description="HR Agent - 多平台简历爬取、AI分析、多渠道通知",
version="0.1.0"
version="0.1.0",
docs_url="/docs",
redoc_url="/redoc"
)
# CORS
# CORS配置
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
@@ -76,183 +32,15 @@ def create_app() -> FastAPI:
allow_headers=["*"],
)
# 依赖注入获取RecruiterService
def get_recruiter_service():
settings = get_settings()
mapper = RecruiterMapper(db_url=settings.db_url)
return RecruiterService(mapper=mapper)
# 注册路由
# 系统路由(根路径、健康检查等)
app.include_router(system_router)
# ============== Recruiter Routes ==============
# 招聘者账号管理路由
app.include_router(recruiter_router)
@app.get("/")
async def root():
"""API根路径"""
return {
"name": "简历智能体 API",
"version": "0.1.0",
"docs": "/docs"
}
@app.get("/health")
async def health_check():
"""健康检查"""
return {"status": "healthy"}
@app.get("/api/recruiters", response_model=RecruiterListResponse)
async def list_recruiters(
source: Optional[str] = None,
service: RecruiterService = Depends(get_recruiter_service)
):
"""
获取招聘者账号列表
Args:
source: 按平台筛选 (boss, liepin, etc.)
"""
if source:
try:
candidate_source = CandidateSource(source.lower())
recruiters = service.list_recruiters(candidate_source)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid source: {source}")
else:
recruiters = service.list_recruiters()
items = [
RecruiterResponse(
id=r.id,
name=r.name,
source=r.source.value,
wt_token=r.wt_token,
status=r.status.value,
last_used_at=r.last_used_at,
created_at=r.created_at,
updated_at=r.updated_at
)
for r in recruiters
]
return RecruiterListResponse(total=len(items), items=items)
@app.get("/api/recruiters/{recruiter_id}", response_model=RecruiterResponse)
async def get_recruiter(
recruiter_id: str,
service: RecruiterService = Depends(get_recruiter_service)
):
"""获取单个招聘者账号详情"""
recruiter = service.get_recruiter(recruiter_id)
if not recruiter:
raise HTTPException(status_code=404, detail="Recruiter not found")
return RecruiterResponse(
id=recruiter.id,
name=recruiter.name,
source=recruiter.source.value,
wt_token=recruiter.wt_token,
status=recruiter.status.value,
last_used_at=recruiter.last_used_at,
created_at=recruiter.created_at,
updated_at=recruiter.updated_at
)
@app.post("/api/recruiters", response_model=RecruiterResponse)
async def create_recruiter(
data: RecruiterCreate,
service: RecruiterService = Depends(get_recruiter_service)
):
"""创建招聘者账号"""
try:
source = CandidateSource(data.source.lower())
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid source: {data.source}")
recruiter = service.add_recruiter(
name=data.name,
source=source,
wt_token=data.wt_token
)
return RecruiterResponse(
id=recruiter.id,
name=recruiter.name,
source=recruiter.source.value,
wt_token=recruiter.wt_token,
status=recruiter.status.value,
last_used_at=recruiter.last_used_at,
created_at=recruiter.created_at,
updated_at=recruiter.updated_at
)
@app.put("/api/recruiters/{recruiter_id}", response_model=RecruiterResponse)
async def update_recruiter(
recruiter_id: str,
data: RecruiterUpdate,
service: RecruiterService = Depends(get_recruiter_service)
):
"""更新招聘者账号"""
recruiter = service.get_recruiter(recruiter_id)
if not recruiter:
raise HTTPException(status_code=404, detail="Recruiter not found")
# 更新字段
if data.name:
recruiter.name = data.name
if data.wt_token:
recruiter.wt_token = data.wt_token
if data.status:
try:
recruiter.status = RecruiterStatus(data.status.lower())
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid status: {data.status}")
updated = service.mapper.save(recruiter)
return RecruiterResponse(
id=updated.id,
name=updated.name,
source=updated.source.value,
wt_token=updated.wt_token,
status=updated.status.value,
last_used_at=updated.last_used_at,
created_at=updated.created_at,
updated_at=updated.updated_at
)
@app.delete("/api/recruiters/{recruiter_id}")
async def delete_recruiter(
recruiter_id: str,
service: RecruiterService = Depends(get_recruiter_service)
):
"""删除招聘者账号"""
success = service.delete_recruiter(recruiter_id)
if not success:
raise HTTPException(status_code=404, detail="Recruiter not found")
return APIResponse(success=True, message="Recruiter deleted successfully")
@app.post("/api/recruiters/{recruiter_id}/activate")
async def activate_recruiter(
recruiter_id: str,
service: RecruiterService = Depends(get_recruiter_service)
):
"""启用招聘者账号"""
success = service.activate_recruiter(recruiter_id)
if not success:
raise HTTPException(status_code=404, detail="Recruiter not found")
return APIResponse(success=True, message="Recruiter activated")
@app.post("/api/recruiters/{recruiter_id}/deactivate")
async def deactivate_recruiter(
recruiter_id: str,
service: RecruiterService = Depends(get_recruiter_service)
):
"""停用招聘者账号"""
success = service.deactivate_recruiter(recruiter_id)
if not success:
raise HTTPException(status_code=404, detail="Recruiter not found")
return APIResponse(success=True, message="Recruiter deactivated")
# 定时任务管理路由
app.include_router(scheduler_router)
return app

View File

@@ -0,0 +1,29 @@
"""
API路由模块
按业务模块拆分的FastAPI路由
- recruiter: 招聘者账号管理
- scheduler: 定时任务管理
- system: 系统接口
"""
from .recruiter import router as recruiter_router
try:
from .scheduler import router as scheduler_router
except ImportError:
# 如果scheduler模块有问题提供一个空的router
from fastapi import APIRouter
scheduler_router = APIRouter()
try:
from .system import router as system_router
except ImportError:
from fastapi import APIRouter
system_router = APIRouter()
__all__ = [
"recruiter_router",
"scheduler_router",
"system_router"
]

View File

@@ -0,0 +1,349 @@
"""
招聘者账号管理路由
提供招聘者账号的CRUD操作和自动注册功能
"""
from typing import List, Optional
from datetime import datetime
from fastapi import APIRouter, HTTPException, Depends
from ..schemas import (
RecruiterCreate, RecruiterRegister, RecruiterUpdate,
RecruiterResponse, RecruiterListResponse, RecruiterRegisterResponse,
RecruiterPrivilegeInfo, RecruiterSyncInfo, RecruiterSourceInfo,
APIResponse
)
from ...domain.candidate import CandidateSource
from ...domain.recruiter import Recruiter, RecruiterStatus
from ...service.recruiter_service import RecruiterService
from ...service.crawler import BossCrawler
from ...mapper.recruiter_mapper import RecruiterMapper
from ...config.settings import get_settings
router = APIRouter(prefix="/api/recruiters", tags=["招聘者账号"])
def get_recruiter_service():
"""依赖注入获取RecruiterService"""
settings = get_settings()
mapper = RecruiterMapper(db_url=settings.db_url)
return RecruiterService(mapper=mapper)
def _build_recruiter_response(recruiter: Recruiter) -> RecruiterResponse:
"""构建招聘者账号响应"""
# 构建权益信息
privilege = None
if recruiter.privilege:
privilege = RecruiterPrivilegeInfo(
vip_level=recruiter.privilege.vip_level,
vip_status=recruiter.privilege.vip_status,
vip_expire_at=recruiter.privilege.vip_expire_at,
resume_view_count=recruiter.privilege.resume_view_count,
resume_view_total=recruiter.privilege.resume_view_total
)
# 构建同步信息
sync_info = None
if recruiter.sync_status:
sync_info = RecruiterSyncInfo(
last_sync_at=recruiter.last_sync_at,
sync_status=recruiter.sync_status.value,
sync_error=recruiter.sync_error
)
return RecruiterResponse(
id=recruiter.id,
name=recruiter.name,
source=recruiter.source.value,
wt_token=recruiter.wt_token,
status=recruiter.status.value,
last_used_at=recruiter.last_used_at,
privilege=privilege,
sync_info=sync_info,
created_at=recruiter.created_at,
updated_at=recruiter.updated_at
)
@router.get("/sources", response_model=List[RecruiterSourceInfo])
async def get_recruiter_sources():
"""获取支持的平台来源列表"""
return [
RecruiterSourceInfo(
value="boss",
label="Boss直聘",
description="Boss直聘招聘平台"
),
RecruiterSourceInfo(
value="liepin",
label="猎聘",
description="猎聘招聘平台(预留)"
),
RecruiterSourceInfo(
value="zhilian",
label="智联招聘",
description="智联招聘平台(预留)"
)
]
@router.get("", response_model=RecruiterListResponse)
async def list_recruiters(
source: Optional[str] = None,
service: RecruiterService = Depends(get_recruiter_service)
):
"""
获取招聘者账号列表
Args:
source: 按平台筛选 (boss, liepin, etc.)
"""
if source:
try:
candidate_source = CandidateSource(source.lower())
recruiters = service.list_recruiters(candidate_source)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid source: {source}")
else:
recruiters = service.list_recruiters()
items = [_build_recruiter_response(r) for r in recruiters]
return RecruiterListResponse(total=len(items), items=items)
@router.get("/{recruiter_id}", response_model=RecruiterResponse)
async def get_recruiter(
recruiter_id: str,
service: RecruiterService = Depends(get_recruiter_service)
):
"""获取单个招聘者账号详情"""
recruiter = service.get_recruiter(recruiter_id)
if not recruiter:
raise HTTPException(status_code=404, detail="Recruiter not found")
return _build_recruiter_response(recruiter)
@router.post("", response_model=RecruiterResponse)
async def create_recruiter(
data: RecruiterCreate,
service: RecruiterService = Depends(get_recruiter_service)
):
"""手动创建招聘者账号"""
try:
source = CandidateSource(data.source.lower())
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid source: {data.source}")
recruiter = service.add_recruiter(
name=data.name,
source=source,
wt_token=data.wt_token
)
return _build_recruiter_response(recruiter)
@router.post("/register", response_model=RecruiterRegisterResponse)
async def register_recruiter(
data: RecruiterRegister,
service: RecruiterService = Depends(get_recruiter_service)
):
"""
自动注册招聘者账号
根据Token自动获取账号信息并注册
"""
# 验证平台来源
try:
source = CandidateSource(data.source.lower())
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid source: {data.source}")
# 目前仅支持Boss平台
if source != CandidateSource.BOSS:
raise HTTPException(
status_code=400,
detail="Auto register only supports 'boss' source currently"
)
try:
# 1. 使用Token创建临时爬虫获取账号信息
crawler = BossCrawler(wt_token=data.wt_token)
# 2. 检查登录状态
if hasattr(crawler.client, 'check_login_status'):
is_valid = crawler.client.check_login_status()
if not is_valid:
return RecruiterRegisterResponse(
success=False,
message="Token无效或账号已过期请检查Token是否正确"
)
# 3. 获取账号信息
account_info = None
if hasattr(crawler.client, 'get_account_info'):
account_info = crawler.client.get_account_info()
if not account_info:
return RecruiterRegisterResponse(
success=False,
message="无法获取账号信息请检查Token是否有效"
)
# 4. 提取账号名称
account_name = ""
if hasattr(account_info, 'name'):
account_name = account_info.name
elif hasattr(account_info, 'zpData') and hasattr(account_info.zpData, 'name'):
account_name = account_info.zpData.name
elif isinstance(account_info, dict):
account_name = account_info.get('name', '') or \
account_info.get('zpData', {}).get('name', '')
if not account_name:
account_name = f"Boss账号_{data.wt_token[:8]}..."
# 5. 检查是否已存在相同Token的账号
existing_recruiters = service.list_recruiters(source)
for existing in existing_recruiters:
if existing.wt_token == data.wt_token:
return RecruiterRegisterResponse(
success=False,
message=f"该Token已注册为账号: {existing.name}",
recruiter=_build_recruiter_response(existing)
)
# 6. 创建账号
recruiter = service.add_recruiter(
name=account_name,
source=source,
wt_token=data.wt_token
)
# 7. 尝试获取并保存权益信息
try:
if hasattr(crawler.client, 'get_account_detail'):
privilege_data = crawler.client.get_account_detail()
# 解析权益数据
from ...job.account_sync_job import AccountSyncJob
sync_job = AccountSyncJob(service)
sync_job._parse_privilege_data(recruiter, privilege_data)
# 保存更新后的权益信息
service.save_recruiter(recruiter)
except Exception as e:
# 权益信息获取失败不影响注册
print(f"[Register] 获取权益信息失败: {e}")
return RecruiterRegisterResponse(
success=True,
message=f"账号注册成功: {account_name}",
recruiter=_build_recruiter_response(recruiter)
)
except ImportError as e:
return RecruiterRegisterResponse(
success=False,
message=f"SDK未安装或导入失败: {str(e)}"
)
except Exception as e:
return RecruiterRegisterResponse(
success=False,
message=f"注册失败: {str(e)}"
)
@router.put("/{recruiter_id}", response_model=RecruiterResponse)
async def update_recruiter(
recruiter_id: str,
data: RecruiterUpdate,
service: RecruiterService = Depends(get_recruiter_service)
):
"""更新招聘者账号"""
recruiter = service.get_recruiter(recruiter_id)
if not recruiter:
raise HTTPException(status_code=404, detail="Recruiter not found")
# 更新字段
if data.name:
recruiter.name = data.name
if data.wt_token:
recruiter.wt_token = data.wt_token
if data.status:
try:
recruiter.status = RecruiterStatus(data.status.lower())
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid status: {data.status}"
)
updated = service.mapper.save(recruiter)
return _build_recruiter_response(updated)
@router.delete("/{recruiter_id}")
async def delete_recruiter(
recruiter_id: str,
service: RecruiterService = Depends(get_recruiter_service)
):
"""删除招聘者账号"""
success = service.delete_recruiter(recruiter_id)
if not success:
raise HTTPException(status_code=404, detail="Recruiter not found")
return APIResponse(success=True, message="Recruiter deleted successfully")
@router.post("/{recruiter_id}/activate")
async def activate_recruiter(
recruiter_id: str,
service: RecruiterService = Depends(get_recruiter_service)
):
"""启用招聘者账号"""
success = service.activate_recruiter(recruiter_id)
if not success:
raise HTTPException(status_code=404, detail="Recruiter not found")
return APIResponse(success=True, message="Recruiter activated")
@router.post("/{recruiter_id}/deactivate")
async def deactivate_recruiter(
recruiter_id: str,
service: RecruiterService = Depends(get_recruiter_service)
):
"""停用招聘者账号"""
success = service.deactivate_recruiter(recruiter_id)
if not success:
raise HTTPException(status_code=404, detail="Recruiter not found")
return APIResponse(success=True, message="Recruiter deactivated")
@router.post("/{recruiter_id}/sync")
async def sync_recruiter(
recruiter_id: str,
service: RecruiterService = Depends(get_recruiter_service)
):
"""
手动同步账号信息
触发账号状态、权益信息、职位列表的同步
"""
recruiter = service.get_recruiter(recruiter_id)
if not recruiter:
raise HTTPException(status_code=404, detail="Recruiter not found")
# TODO: 触发同步任务
# 可以调用 AccountSyncJob 来执行同步
return APIResponse(
success=True,
message=f"Sync triggered for recruiter: {recruiter.name}"
)

View File

@@ -0,0 +1,227 @@
"""
定时任务管理路由
提供定时任务的查询、控制和配置功能
"""
from typing import List, Optional
from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks
from ..schemas import (
JobInfo, JobStatusInfo, JobConfigUpdate,
APIResponse
)
from ...service.scheduler import get_scheduler
router = APIRouter(prefix="/api/scheduler", tags=["定时任务"])
def get_scheduler_instance():
"""依赖注入:获取调度器实例"""
return get_scheduler()
@router.get("/jobs", response_model=List[JobInfo])
async def list_jobs(
scheduler=Depends(get_scheduler_instance)
):
"""获取所有定时任务列表"""
jobs = scheduler.get_jobs()
return [
JobInfo(
id=job["id"],
name=job["name"],
next_run_time=job.get("next_run_time"),
trigger=job["trigger"],
type=job.get("type")
)
for job in jobs
]
@router.get("/jobs/status", response_model=List[JobStatusInfo])
async def get_jobs_status(
scheduler=Depends(get_scheduler_instance)
):
"""获取所有Job任务状态"""
status_list = scheduler.get_job_status()
return [
JobStatusInfo(
job_id=s["job_id"],
name=s["name"],
enabled=s["enabled"],
is_running=s["is_running"],
last_run_time=s.get("last_run_time"),
next_run_time=s.get("next_run_time"),
run_count=s["run_count"],
success_count=s["success_count"],
fail_count=s["fail_count"],
last_error=s.get("last_error")
)
for s in status_list
]
@router.get("/jobs/{job_id}/status", response_model=JobStatusInfo)
async def get_job_status(
job_id: str,
scheduler=Depends(get_scheduler_instance)
):
"""获取指定Job任务状态"""
status_list = scheduler.get_job_status(job_id)
if not status_list:
raise HTTPException(status_code=404, detail="Job not found")
s = status_list[0]
return JobStatusInfo(
job_id=s["job_id"],
name=s["name"],
enabled=s["enabled"],
is_running=s["is_running"],
last_run_time=s.get("last_run_time"),
next_run_time=s.get("next_run_time"),
run_count=s["run_count"],
success_count=s["success_count"],
fail_count=s["fail_count"],
last_error=s.get("last_error")
)
@router.post("/jobs/{job_id}/run")
async def run_job_now(
job_id: str,
background_tasks: BackgroundTasks,
scheduler=Depends(get_scheduler_instance)
):
"""
立即执行指定任务
Args:
job_id: 任务ID (account_sync 或 resume_process)
"""
# 验证任务是否存在
valid_jobs = ["account_sync", "resume_process", "crawl_boss", "analyze_pending"]
if job_id not in valid_jobs:
raise HTTPException(status_code=400, detail=f"Invalid job_id. Valid: {valid_jobs}")
# 在后台执行
background_tasks.add_task(scheduler.run_job_now, job_id)
return APIResponse(
success=True,
message=f"Job {job_id} is running in background"
)
@router.post("/jobs/{job_id}/pause")
async def pause_job(
job_id: str,
scheduler=Depends(get_scheduler_instance)
):
"""暂停指定任务"""
try:
scheduler.pause_job(job_id)
return APIResponse(success=True, message=f"Job {job_id} paused")
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/jobs/{job_id}/resume")
async def resume_job(
job_id: str,
scheduler=Depends(get_scheduler_instance)
):
"""恢复指定任务"""
try:
scheduler.resume_job(job_id)
return APIResponse(success=True, message=f"Job {job_id} resumed")
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@router.put("/jobs/{job_id}/config")
async def update_job_config(
job_id: str,
config: JobConfigUpdate,
scheduler=Depends(get_scheduler_instance)
):
"""
更新任务配置
Args:
job_id: 任务ID
config: 配置更新
"""
# 验证任务ID
valid_job_ids = ["account_sync", "resume_process"]
if job_id not in valid_job_ids:
raise HTTPException(
status_code=400,
detail=f"Can only config Job module tasks: {valid_job_ids}"
)
# 构建更新参数
update_kwargs = {}
if config.enabled is not None:
update_kwargs["enabled"] = config.enabled
if config.interval_minutes is not None:
update_kwargs["interval_minutes"] = config.interval_minutes
if not update_kwargs:
raise HTTPException(status_code=400, detail="No config to update")
try:
scheduler.job_scheduler.update_job_config(job_id, **update_kwargs)
return APIResponse(
success=True,
message=f"Job {job_id} config updated",
data=update_kwargs
)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/status")
async def get_scheduler_status(
scheduler=Depends(get_scheduler_instance)
):
"""获取调度器整体状态"""
jobs = scheduler.get_jobs()
job_status = scheduler.get_job_status()
return {
"running": scheduler._running if hasattr(scheduler, '_running') else False,
"total_jobs": len(jobs),
"job_scheduler_jobs": len([j for j in jobs if j.get("type") == "job"]),
"legacy_jobs": len([j for j in jobs if j.get("type") == "legacy"]),
"job_status_summary": {
"total": len(job_status),
"running": sum(1 for s in job_status if s["is_running"]),
"enabled": sum(1 for s in job_status if s["enabled"])
}
}
@router.post("/start")
async def start_scheduler(
scheduler=Depends(get_scheduler_instance)
):
"""启动调度器"""
try:
scheduler.start()
return APIResponse(success=True, message="Scheduler started")
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/stop")
async def stop_scheduler(
scheduler=Depends(get_scheduler_instance)
):
"""停止调度器"""
try:
scheduler.stop()
return APIResponse(success=True, message="Scheduler stopped")
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))

View File

@@ -0,0 +1,53 @@
"""
系统管理路由
提供系统级别的接口:健康检查、根路径等
"""
from fastapi import APIRouter
from ..schemas import APIResponse
router = APIRouter(tags=["系统"])
@router.get("/")
async def root():
"""API根路径"""
return {
"name": "简历智能体 API",
"version": "0.1.0",
"docs": "/docs",
"endpoints": {
"recruiters": "/api/recruiters",
"scheduler": "/api/scheduler",
"health": "/health"
}
}
@router.get("/health")
async def health_check():
"""健康检查"""
return {
"status": "healthy",
"service": "hr-agent-api",
"version": "0.1.0"
}
@router.get("/api/status")
async def api_status():
"""获取API状态信息"""
return APIResponse(
success=True,
message="API is running",
data={
"version": "0.1.0",
"features": [
"recruiter_management",
"scheduler_management",
"auto_register"
]
}
)

View File

@@ -0,0 +1,168 @@
"""
API共享Schema定义
集中定义所有API请求和响应的数据模型
"""
from typing import List, Optional, Any
from datetime import datetime
from pydantic import BaseModel, Field
# ============== 通用响应 ==============
class APIResponse(BaseModel):
"""通用API响应"""
success: bool
message: str
data: Optional[dict] = None
class PaginationParams(BaseModel):
"""分页参数"""
page: int = Field(default=1, ge=1, description="页码")
page_size: int = Field(default=20, ge=1, le=100, description="每页数量")
# ============== 招聘者账号 ==============
class RecruiterBase(BaseModel):
"""招聘者账号基础信息"""
name: str = Field(..., description="账号名称/标识")
source: str = Field(default="boss", description="平台来源: boss, liepin, etc.")
class RecruiterCreate(RecruiterBase):
"""创建招聘者账号请求"""
wt_token: str = Field(..., description="WT Token")
class RecruiterRegister(BaseModel):
"""自动注册招聘者账号请求"""
source: str = Field(default="boss", description="平台来源: boss")
wt_token: str = Field(..., description="WT Token")
class RecruiterUpdate(BaseModel):
"""更新招聘者账号请求"""
name: Optional[str] = Field(None, description="账号名称")
wt_token: Optional[str] = Field(None, description="WT Token")
status: Optional[str] = Field(None, description="状态: active, inactive, expired")
class RecruiterPrivilegeInfo(BaseModel):
"""招聘者账号权益信息"""
vip_level: Optional[str] = Field(None, description="VIP等级")
vip_status: Optional[str] = Field(None, description="VIP状态")
vip_expire_at: Optional[datetime] = Field(None, description="VIP过期时间")
resume_view_count: int = Field(default=0, description="剩余简历查看次数")
resume_view_total: int = Field(default=0, description="总简历查看次数")
class RecruiterSyncInfo(BaseModel):
"""招聘者账号同步信息"""
last_sync_at: Optional[datetime] = Field(None, description="最后同步时间")
sync_status: Optional[str] = Field(None, description="同步状态: success, failed, pending")
sync_error: Optional[str] = Field(None, description="同步错误信息")
class RecruiterResponse(BaseModel):
"""招聘者账号响应"""
id: str
name: str
source: str
wt_token: str
status: str
last_used_at: Optional[datetime] = None
privilege: Optional[RecruiterPrivilegeInfo] = None
sync_info: Optional[RecruiterSyncInfo] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class RecruiterListResponse(BaseModel):
"""招聘者列表响应"""
total: int
items: List[RecruiterResponse]
class RecruiterRegisterResponse(BaseModel):
"""自动注册招聘者账号响应"""
success: bool
message: str
recruiter: Optional[RecruiterResponse] = None
class RecruiterSourceInfo(BaseModel):
"""招聘者平台来源信息"""
value: str = Field(..., description="来源值")
label: str = Field(..., description="显示名称")
description: Optional[str] = Field(None, description="描述")
# ============== 定时任务 ==============
class JobInfo(BaseModel):
"""任务信息"""
id: str
name: str
next_run_time: Optional[str] = None
trigger: str
type: Optional[str] = Field(None, description="任务类型: legacy, job")
class JobStatusInfo(BaseModel):
"""任务状态信息"""
job_id: str
name: str
enabled: bool
is_running: bool
last_run_time: Optional[str] = None
next_run_time: Optional[str] = None
run_count: int = 0
success_count: int = 0
fail_count: int = 0
last_error: Optional[str] = None
class JobConfigUpdate(BaseModel):
"""任务配置更新请求"""
enabled: Optional[bool] = None
interval_minutes: Optional[int] = Field(None, ge=1, description="执行间隔(分钟)")
# ============== 候选人(预留) ==============
class CandidateResponse(BaseModel):
"""候选人响应(预留)"""
id: str
name: str
source: str
status: str
created_at: Optional[datetime] = None
class CandidateListResponse(BaseModel):
"""候选人列表响应(预留)"""
total: int
items: List[CandidateResponse]
# ============== 职位(预留) ==============
class JobPositionResponse(BaseModel):
"""职位响应(预留)"""
id: str
title: str
source: str
status: str
recruiter_id: Optional[str] = None
created_at: Optional[datetime] = None
class JobPositionListResponse(BaseModel):
"""职位列表响应(预留)"""
total: int
items: List[JobPositionResponse]

View File

@@ -27,6 +27,7 @@ class AccountSyncResult:
self.success = success
self.message = message
self.jobs_synced = 0
self.jobs_deleted = 0
self.timestamp = datetime.now()
@@ -130,8 +131,9 @@ class AccountSyncJob:
# 3. 同步职位列表
try:
jobs_count = await self._sync_jobs(recruiter)
result.jobs_synced = jobs_count
jobs_synced, jobs_deleted = await self._sync_jobs(recruiter)
result.jobs_synced = jobs_synced
result.jobs_deleted = jobs_deleted
except Exception as e:
print(f"[{datetime.now()}] 同步账号 {recruiter.name} 职位列表失败: {e}")
@@ -140,7 +142,7 @@ class AccountSyncJob:
self.recruiter_service.save_recruiter(recruiter)
result.success = True
result.message = f"同步成功,职位: {result.jobs_synced}"
result.message = f"同步成功,新增/更新职位: {result.jobs_synced}, 删除职位: {result.jobs_deleted}"
print(f"[{datetime.now()}] 账号 {recruiter.name} 同步完成: {result.message}")
return result
@@ -270,40 +272,91 @@ class AccountSyncJob:
except Exception as e:
print(f"[{datetime.now()}] 解析权益数据失败: {e}")
async def _sync_jobs(self, recruiter: Recruiter) -> int:
async def _sync_jobs(self, recruiter: Recruiter) -> tuple:
"""
同步职位列表
策略:
1. 获取API返回的最新职位列表
2. 保存/更新这些职位
3. 删除该账号下不在最新列表中的职位
Args:
recruiter: 招聘者账号
Returns:
int: 同步的职位数量
tuple: (同步的职位数量, 删除的职位数量)
"""
try:
crawler = self.recruiter_service.create_crawler_for_recruiter(recruiter)
if not crawler:
return 0
return 0, 0
# 获取职位列表
jobs = crawler.get_jobs()
# 获取API返回的最新职位列表
current_jobs = crawler.get_jobs()
if not jobs:
print(f"[{datetime.now()}] 账号 {recruiter.name} 没有正在招聘的职位")
return 0
# 获取当前时间戳
sync_time = datetime.now()
# 关联recruiter_id并保存职位
for job in jobs:
job.recruiter_id = recruiter.id
job.source = recruiter.source
job.last_sync_at = datetime.now()
# 记录当前有效的职位source_id集合
current_source_ids = set()
# 保存/更新职位
synced_count = 0
if current_jobs:
for job in current_jobs:
job.recruiter_id = recruiter.id
job.source = recruiter.source
job.last_sync_at = sync_time
job.status = JobStatus.ACTIVE # 确保状态为活跃
# 保存或更新职位
saved_job = self.recruiter_service.save_job(job)
current_source_ids.add(job.source_id)
synced_count += 1
# 保存或更新职位
self.recruiter_service.save_job(job)
print(f"[{datetime.now()}] 账号 {recruiter.name} 同步了 {synced_count} 个职位")
else:
print(f"[{datetime.now()}] 账号 {recruiter.name} 没有正在招聘的职位")
print(f"[{datetime.now()}] 账号 {recruiter.name} 同步了 {len(jobs)} 个职位")
return len(jobs)
# 删除该账号下不在当前列表中的职位
deleted_count = await self._delete_obsolete_jobs(recruiter, current_source_ids)
return synced_count, deleted_count
except Exception as e:
print(f"[{datetime.now()}] 同步账号 {recruiter.name} 职位列表失败: {e}")
return 0, 0
async def _delete_obsolete_jobs(self, recruiter: Recruiter, valid_source_ids: set) -> int:
"""
删除过期的职位(不在有效列表中的职位)
Args:
recruiter: 招聘者账号
valid_source_ids: 当前有效的职位source_id集合
Returns:
int: 删除的职位数量
"""
try:
# 获取该账号数据库中所有的职位
db_jobs = self.recruiter_service.find_jobs_by_recruiter(recruiter.id)
deleted_count = 0
for job in db_jobs:
# 如果职位不在当前有效列表中,则删除
if job.source_id not in valid_source_ids:
success = self.recruiter_service.delete_job(job.id)
if success:
deleted_count += 1
print(f"[{datetime.now()}] 删除过期职位: {job.title} (ID: {job.source_id})")
if deleted_count > 0:
print(f"[{datetime.now()}] 账号 {recruiter.name} 删除了 {deleted_count} 个过期职位")
return deleted_count
except Exception as e:
print(f"[{datetime.now()}] 删除过期职位失败: {e}")
return 0

View File

@@ -0,0 +1,271 @@
"""Job data mapper using SQLAlchemy"""
from typing import List, Optional
from datetime import datetime
import uuid
from sqlalchemy import select, update, delete
from sqlalchemy.orm import Session
from ..domain.job import Job, JobStatus, JobRequirement
from ..domain.candidate import CandidateSource
from ..config.database import get_db_manager, JobModel
class JobMapper:
"""职位数据访问 - SQLAlchemy实现"""
def __init__(self, db_url: Optional[str] = None):
self.db_manager = get_db_manager(db_url)
# 确保表存在
self.db_manager.create_tables()
def _get_session(self) -> Session:
"""获取数据库会话"""
return self.db_manager.get_session()
def _model_to_entity(self, model: JobModel) -> Job:
"""将模型转换为实体"""
# 处理 source 的大小写
source_value = model.source.lower() if model.source else "boss"
# 处理 status 的大小写
status_value = model.status.lower() if model.status else "active"
# 解析requirements JSON
requirements = None
if model.requirements:
import json
try:
req_data = json.loads(model.requirements) if isinstance(model.requirements, str) else model.requirements
requirements = JobRequirement(**req_data)
except:
requirements = JobRequirement()
return Job(
id=model.id,
source=CandidateSource(source_value),
source_id=model.source_id,
recruiter_id=model.recruiter_id,
title=model.title,
department=model.department,
location=model.location,
salary_min=model.salary_min,
salary_max=model.salary_max,
requirements=requirements,
description=model.description,
status=JobStatus(status_value),
candidate_count=model.candidate_count or 0,
new_candidate_count=model.new_candidate_count or 0,
last_sync_at=model.last_sync_at,
created_at=model.created_at,
updated_at=model.updated_at
)
def _entity_to_model(self, entity: Job) -> JobModel:
"""将实体转换为模型"""
import json
requirements_json = None
if entity.requirements:
requirements_json = json.dumps({
"min_work_years": entity.requirements.min_work_years,
"max_work_years": entity.requirements.max_work_years,
"education": entity.requirements.education,
"skills": entity.requirements.skills,
"description": entity.requirements.description
})
return JobModel(
id=entity.id or str(uuid.uuid4()),
source=entity.source.value,
source_id=entity.source_id,
recruiter_id=entity.recruiter_id,
title=entity.title,
department=entity.department,
location=entity.location,
salary_min=entity.salary_min,
salary_max=entity.salary_max,
requirements=requirements_json,
description=entity.description,
status=entity.status.value,
candidate_count=entity.candidate_count,
new_candidate_count=entity.new_candidate_count,
last_sync_at=entity.last_sync_at
)
def save(self, job: Job) -> Job:
"""保存职位信息(插入或更新)"""
session = self._get_session()
try:
# 检查是否已存在
existing = self.find_by_source_id(job.source, job.source_id)
if existing:
# 更新现有记录
stmt = (
update(JobModel)
.where(JobModel.id == existing.id)
.values(
title=job.title,
department=job.department,
location=job.location,
salary_min=job.salary_min,
salary_max=job.salary_max,
requirements=self._entity_to_model(job).requirements,
description=job.description,
status=job.status.value,
recruiter_id=job.recruiter_id,
candidate_count=job.candidate_count,
new_candidate_count=job.new_candidate_count,
last_sync_at=job.last_sync_at or datetime.now(),
updated_at=datetime.now()
)
)
session.execute(stmt)
job.id = existing.id
else:
# 插入新记录
job.id = str(uuid.uuid4())
model = self._entity_to_model(job)
session.add(model)
session.commit()
return job
finally:
session.close()
def find_by_id(self, job_id: str) -> Optional[Job]:
"""根据ID查询职位"""
session = self._get_session()
try:
result = session.execute(
select(JobModel).where(JobModel.id == job_id)
)
model = result.scalar_one_or_none()
return self._model_to_entity(model) if model else None
finally:
session.close()
def find_by_source_id(self, source: CandidateSource, source_id: str) -> Optional[Job]:
"""根据平台来源和源ID查询职位"""
session = self._get_session()
try:
result = session.execute(
select(JobModel)
.where(JobModel.source == source.value)
.where(JobModel.source_id == source_id)
)
model = result.scalar_one_or_none()
return self._model_to_entity(model) if model else None
finally:
session.close()
def find_by_recruiter(self, recruiter_id: str) -> List[Job]:
"""根据招聘者ID查询职位列表"""
session = self._get_session()
try:
result = session.execute(
select(JobModel)
.where(JobModel.recruiter_id == recruiter_id)
.order_by(JobModel.last_sync_at.desc())
)
models = result.scalars().all()
return [self._model_to_entity(m) for m in models]
finally:
session.close()
def find_by_source(self, source: CandidateSource) -> List[Job]:
"""根据平台来源查询职位列表"""
session = self._get_session()
try:
result = session.execute(
select(JobModel)
.where(JobModel.source == source.value)
.order_by(JobModel.last_sync_at.desc())
)
models = result.scalars().all()
return [self._model_to_entity(m) for m in models]
finally:
session.close()
def find_active_by_recruiter(self, recruiter_id: str) -> List[Job]:
"""查询指定招聘者的活跃职位"""
session = self._get_session()
try:
result = session.execute(
select(JobModel)
.where(JobModel.recruiter_id == recruiter_id)
.where(JobModel.status == 'active')
.order_by(JobModel.last_sync_at.desc())
)
models = result.scalars().all()
return [self._model_to_entity(m) for m in models]
finally:
session.close()
def find_all(self) -> List[Job]:
"""查询所有职位"""
session = self._get_session()
try:
result = session.execute(
select(JobModel).order_by(JobModel.last_sync_at.desc())
)
models = result.scalars().all()
return [self._model_to_entity(m) for m in models]
finally:
session.close()
def update_status(self, job_id: str, status: JobStatus) -> bool:
"""更新职位状态"""
session = self._get_session()
try:
stmt = (
update(JobModel)
.where(JobModel.id == job_id)
.values(status=status.value, updated_at=datetime.now())
)
result = session.execute(stmt)
session.commit()
return result.rowcount > 0
finally:
session.close()
def update_candidate_count(self, job_id: str, count: int, new_count: int = 0) -> bool:
"""更新职位候选人数量"""
session = self._get_session()
try:
stmt = (
update(JobModel)
.where(JobModel.id == job_id)
.values(
candidate_count=count,
new_candidate_count=new_count,
updated_at=datetime.now()
)
)
result = session.execute(stmt)
session.commit()
return result.rowcount > 0
finally:
session.close()
def delete(self, job_id: str) -> bool:
"""删除职位"""
session = self._get_session()
try:
stmt = delete(JobModel).where(JobModel.id == job_id)
result = session.execute(stmt)
session.commit()
return result.rowcount > 0
finally:
session.close()
def delete_by_recruiter(self, recruiter_id: str) -> int:
"""删除指定招聘者的所有职位"""
session = self._get_session()
try:
stmt = delete(JobModel).where(JobModel.recruiter_id == recruiter_id)
result = session.execute(stmt)
session.commit()
return result.rowcount
finally:
session.close()

View File

@@ -138,23 +138,59 @@ class RecruiterService:
"""
获取指定招聘者的职位列表
TODO: 需要实现JobMapper
目前返回空列表后续需要添加Job数据访问层
Returns:
List[Job]: 职位列表
"""
# 暂时返回空列表,后续需要实现JobMapper
# from ..mapper.job_mapper import JobMapper
# return JobMapper().find_by_recruiter(recruiter_id)
return []
from ..mapper.job_mapper import JobMapper
return JobMapper(self.mapper.db_manager.db_url).find_by_recruiter(recruiter_id)
def find_active_jobs_by_recruiter(self, recruiter_id: str) -> List[Job]:
"""
获取指定招聘者的活跃职位列表
Returns:
List[Job]: 活跃职位列表
"""
from ..mapper.job_mapper import JobMapper
return JobMapper(self.mapper.db_manager.db_url).find_active_by_recruiter(recruiter_id)
def save_job(self, job: Job) -> Job:
"""
保存职位信息
TODO: 需要实现JobMapper
目前仅打印日志后续需要添加Job数据访问层
Args:
job: 职位实体
Returns:
Job: 保存后的职位
"""
# 暂时仅打印日志,后续需要实现JobMapper
# from ..mapper.job_mapper import JobMapper
# return JobMapper().save(job)
print(f"[Job] 保存职位: {job.title} (ID: {job.source_id})")
return job
from ..mapper.job_mapper import JobMapper
saved = JobMapper(self.mapper.db_manager.db_url).save(job)
print(f"[Job] 职位已保存: {saved.title} (ID: {saved.id})")
return saved
def delete_job(self, job_id: str) -> bool:
"""
删除职位
Args:
job_id: 职位ID
Returns:
bool: 是否删除成功
"""
from ..mapper.job_mapper import JobMapper
return JobMapper(self.mapper.db_manager.db_url).delete(job_id)
def delete_jobs_by_recruiter(self, recruiter_id: str) -> int:
"""
删除指定招聘者的所有职位
Args:
recruiter_id: 招聘者ID
Returns:
int: 删除的职位数量
"""
from ..mapper.job_mapper import JobMapper
return JobMapper(self.mapper.db_manager.db_url).delete_by_recruiter(recruiter_id)