feat(api): 使用FastAPI重构招聘者账号管理与启动服务器

- 移除旧的命令行工具add_recruiter.py和main.py,统一改用API方式管理招聘者账号
- 新增FastAPI应用,提供招聘者账号的CRUD接口及激活/停用功能
- 添加CORS中间件,支持跨域请求
- 支持通过API接口创建、查询、更新、删除招聘者账号,并返回标准化响应
- 集成异步后台定时任务调度器,定时爬取Boss直聘简历和分析报告
- 新增run_server.py启动脚本,支持启动FastAPI服务器和定时任务调度器的组合应用
- 定时任务支持任务列表查询、暂停、恢复及手动触发爬取任务的API
- 更新pyproject.toml依赖,新增fastapi、uvicorn和apscheduler等库
- 优化系统架构,实现Web API和后台调度功能解耦与整合,提高系统扩展性及易用性
This commit is contained in:
2026-03-24 14:50:50 +08:00
parent 04596d298b
commit 3c29ca04eb
7 changed files with 901 additions and 202 deletions

View File

@@ -0,0 +1,261 @@
"""FastAPI routes for HR Agent"""
from typing import List, Optional
from datetime import datetime
from pydantic import BaseModel, Field
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from ..domain.candidate import CandidateSource
from ..domain.recruiter import Recruiter, RecruiterStatus
from ..service.recruiter_service import RecruiterService
from ..mapper.recruiter_mapper import RecruiterMapper
from ..config.settings import get_settings
# ============== Pydantic Schemas ==============
class RecruiterCreate(BaseModel):
"""创建招聘者账号请求"""
name: str = Field(..., description="账号名称/标识")
source: str = Field(default="boss", description="平台来源: boss, liepin, etc.")
wt_token: str = Field(..., description="WT Token")
class RecruiterUpdate(BaseModel):
"""更新招聘者账号请求"""
name: Optional[str] = Field(None, description="账号名称")
wt_token: Optional[str] = Field(None, description="WT Token")
status: Optional[str] = Field(None, description="状态: active, inactive, expired")
class RecruiterResponse(BaseModel):
"""招聘者账号响应"""
id: str
name: str
source: str
wt_token: str
status: str
last_used_at: Optional[datetime] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class RecruiterListResponse(BaseModel):
"""招聘者列表响应"""
total: int
items: List[RecruiterResponse]
class APIResponse(BaseModel):
"""通用API响应"""
success: bool
message: str
data: Optional[dict] = None
# ============== FastAPI App ==============
def create_app() -> FastAPI:
"""创建FastAPI应用"""
app = FastAPI(
title="简历智能体 API",
description="HR Agent - 多平台简历爬取、AI分析、多渠道通知",
version="0.1.0"
)
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 依赖注入获取RecruiterService
def get_recruiter_service():
settings = get_settings()
mapper = RecruiterMapper(db_url=settings.db_url)
return RecruiterService(mapper=mapper)
# ============== Recruiter Routes ==============
@app.get("/")
async def root():
"""API根路径"""
return {
"name": "简历智能体 API",
"version": "0.1.0",
"docs": "/docs"
}
@app.get("/health")
async def health_check():
"""健康检查"""
return {"status": "healthy"}
@app.get("/api/recruiters", response_model=RecruiterListResponse)
async def list_recruiters(
source: Optional[str] = None,
service: RecruiterService = Depends(get_recruiter_service)
):
"""
获取招聘者账号列表
Args:
source: 按平台筛选 (boss, liepin, etc.)
"""
if source:
try:
candidate_source = CandidateSource(source.lower())
recruiters = service.list_recruiters(candidate_source)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid source: {source}")
else:
recruiters = service.list_recruiters()
items = [
RecruiterResponse(
id=r.id,
name=r.name,
source=r.source.value,
wt_token=r.wt_token,
status=r.status.value,
last_used_at=r.last_used_at,
created_at=r.created_at,
updated_at=r.updated_at
)
for r in recruiters
]
return RecruiterListResponse(total=len(items), items=items)
@app.get("/api/recruiters/{recruiter_id}", response_model=RecruiterResponse)
async def get_recruiter(
recruiter_id: str,
service: RecruiterService = Depends(get_recruiter_service)
):
"""获取单个招聘者账号详情"""
recruiter = service.get_recruiter(recruiter_id)
if not recruiter:
raise HTTPException(status_code=404, detail="Recruiter not found")
return RecruiterResponse(
id=recruiter.id,
name=recruiter.name,
source=recruiter.source.value,
wt_token=recruiter.wt_token,
status=recruiter.status.value,
last_used_at=recruiter.last_used_at,
created_at=recruiter.created_at,
updated_at=recruiter.updated_at
)
@app.post("/api/recruiters", response_model=RecruiterResponse)
async def create_recruiter(
data: RecruiterCreate,
service: RecruiterService = Depends(get_recruiter_service)
):
"""创建招聘者账号"""
try:
source = CandidateSource(data.source.lower())
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid source: {data.source}")
recruiter = service.add_recruiter(
name=data.name,
source=source,
wt_token=data.wt_token
)
return RecruiterResponse(
id=recruiter.id,
name=recruiter.name,
source=recruiter.source.value,
wt_token=recruiter.wt_token,
status=recruiter.status.value,
last_used_at=recruiter.last_used_at,
created_at=recruiter.created_at,
updated_at=recruiter.updated_at
)
@app.put("/api/recruiters/{recruiter_id}", response_model=RecruiterResponse)
async def update_recruiter(
recruiter_id: str,
data: RecruiterUpdate,
service: RecruiterService = Depends(get_recruiter_service)
):
"""更新招聘者账号"""
recruiter = service.get_recruiter(recruiter_id)
if not recruiter:
raise HTTPException(status_code=404, detail="Recruiter not found")
# 更新字段
if data.name:
recruiter.name = data.name
if data.wt_token:
recruiter.wt_token = data.wt_token
if data.status:
try:
recruiter.status = RecruiterStatus(data.status.lower())
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid status: {data.status}")
updated = service.mapper.save(recruiter)
return RecruiterResponse(
id=updated.id,
name=updated.name,
source=updated.source.value,
wt_token=updated.wt_token,
status=updated.status.value,
last_used_at=updated.last_used_at,
created_at=updated.created_at,
updated_at=updated.updated_at
)
@app.delete("/api/recruiters/{recruiter_id}")
async def delete_recruiter(
recruiter_id: str,
service: RecruiterService = Depends(get_recruiter_service)
):
"""删除招聘者账号"""
success = service.delete_recruiter(recruiter_id)
if not success:
raise HTTPException(status_code=404, detail="Recruiter not found")
return APIResponse(success=True, message="Recruiter deleted successfully")
@app.post("/api/recruiters/{recruiter_id}/activate")
async def activate_recruiter(
recruiter_id: str,
service: RecruiterService = Depends(get_recruiter_service)
):
"""启用招聘者账号"""
success = service.activate_recruiter(recruiter_id)
if not success:
raise HTTPException(status_code=404, detail="Recruiter not found")
return APIResponse(success=True, message="Recruiter activated")
@app.post("/api/recruiters/{recruiter_id}/deactivate")
async def deactivate_recruiter(
recruiter_id: str,
service: RecruiterService = Depends(get_recruiter_service)
):
"""停用招聘者账号"""
success = service.deactivate_recruiter(recruiter_id)
if not success:
raise HTTPException(status_code=404, detail="Recruiter not found")
return APIResponse(success=True, message="Recruiter deactivated")
return app
# 创建应用实例
app = create_app()

View File

@@ -0,0 +1,189 @@
"""Background scheduler for crawling tasks"""
import asyncio
from datetime import datetime
from typing import Optional
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from ..domain.candidate import CandidateSource
from ..main import get_app
class CrawlScheduler:
"""
爬虫定时任务调度器
定时执行简历爬取任务
"""
def __init__(self):
self.scheduler: Optional[AsyncIOScheduler] = None
self.app = None
self._running = False
def start(self):
"""启动调度器"""
if self._running:
return
self.scheduler = AsyncIOScheduler()
self.scheduler.start()
self._running = True
# 初始化应用
self.app = get_app()
print(f"[{datetime.now()}] 爬虫调度器已启动")
# 注册定时任务
self._register_jobs()
def _register_jobs(self):
"""注册定时任务"""
# 每30分钟爬取一次 Boss 直聘
self.scheduler.add_job(
self._crawl_boss,
trigger=IntervalTrigger(minutes=1),
id="crawl_boss",
name="爬取Boss直聘简历",
replace_existing=True
)
# 每小时执行一次完整分析
self.scheduler.add_job(
self._analyze_pending,
trigger=IntervalTrigger(hours=5),
id="analyze_pending",
name="分析待处理简历",
replace_existing=True
)
print(f"[{datetime.now()}] 已注册定时任务:")
for job in self.scheduler.get_jobs():
print(f" - {job.name}: {job.trigger}")
async def _crawl_boss(self):
"""爬取 Boss 直聘"""
print(f"[{datetime.now()}] 开始爬取 Boss 直聘...")
try:
# 获取所有活跃账号
recruiters = self.app.recruiter_service.list_active_recruiters(CandidateSource.BOSS)
if not recruiters:
print(f"[{datetime.now()}] 没有可用的 Boss 账号")
return
for recruiter in recruiters:
print(f"[{datetime.now()}] 使用账号: {recruiter.name}")
# 创建爬虫
crawler = self.app.recruiter_service.create_crawler_for_recruiter(recruiter)
if not crawler:
continue
# 获取职位列表
jobs = crawler.get_jobs()
print(f"[{datetime.now()}] 找到 {len(jobs)} 个职位")
# 遍历职位爬取候选人
for job in jobs[:3]: # 限制前3个职位避免请求过多
print(f"[{datetime.now()}] 爬取职位: {job.title}")
# 爬取候选人
candidates = crawler.get_candidates(job.source_id, page=1)
print(f"[{datetime.now()}] 职位 '{job.title}' 找到 {len(candidates)} 个候选人")
for candidate in candidates[:10]: # 每职位限制10个候选人
try:
# 获取简历详情
resume = crawler.get_resume_detail(candidate)
if not resume:
continue
# 构建原始数据
raw_data = {
"geekId": candidate.source_id,
"name": candidate.name,
"phone": candidate.phone,
"email": candidate.email,
"age": candidate.age,
"gender": candidate.gender,
"company": candidate.current_company,
"position": candidate.current_position,
"workYears": candidate.work_years,
"education": candidate.education,
"school": candidate.school,
"resumeText": resume.raw_content,
}
# 入库
result = self.app.ingestion_service.ingest(CandidateSource.BOSS, raw_data)
print(f"[{datetime.now()}] 候选人 {candidate.name} 入库: {result.message}")
# 触发分析
if result.success and result.candidate_id:
await self.app._analyze_and_notify(result.candidate_id, resume)
except Exception as e:
print(f"[{datetime.now()}] 处理候选人失败: {e}")
continue
# 标记账号已使用
self.app.recruiter_service.mark_recruiter_used(recruiter.id)
except Exception as e:
print(f"[{datetime.now()}] 爬取 Boss 直聘失败: {e}")
async def _analyze_pending(self):
"""分析待处理的简历"""
print(f"[{datetime.now()}] 开始分析待处理简历...")
# TODO: 实现分析逻辑
pass
def stop(self):
"""停止调度器"""
if self.scheduler:
self.scheduler.shutdown()
self._running = False
print(f"[{datetime.now()}] 爬虫调度器已停止")
def pause_job(self, job_id: str):
"""暂停指定任务"""
if self.scheduler:
self.scheduler.pause_job(job_id)
print(f"[{datetime.now()}] 任务 {job_id} 已暂停")
def resume_job(self, job_id: str):
"""恢复指定任务"""
if self.scheduler:
self.scheduler.resume_job(job_id)
print(f"[{datetime.now()}] 任务 {job_id} 已恢复")
def get_jobs(self):
"""获取所有任务"""
if self.scheduler:
return [
{
"id": job.id,
"name": job.name,
"next_run_time": job.next_run_time.isoformat() if job.next_run_time else None,
"trigger": str(job.trigger)
}
for job in self.scheduler.get_jobs()
]
return []
# 全局调度器实例
_scheduler: Optional[CrawlScheduler] = None
def get_scheduler() -> CrawlScheduler:
"""获取调度器实例(单例)"""
global _scheduler
if _scheduler is None:
_scheduler = CrawlScheduler()
return _scheduler