refactor(scheduler): 优化爬取和简历处理流程,改为异步线程池执行

- 将手动触发爬取任务改为使用FastAPI后台任务执行
- 在职位处理逻辑中,将获取候选人列表改为线程池异步调用,避免阻塞事件循环
- 在候选人处理流程中,将获取简历详情改为线程池异步调用
- 在入库操作中使用线程池异步执行,提升处理性能
- 在Boss爬取任务中,将获取职位列表和获取候选人操作改为线程池异步调用
- 统一改造调用同步爬虫方法为异步线程池调用,提升整体异步性能和响应速度
This commit is contained in:
2026-03-25 11:50:34 +08:00
parent fc24e3a37b
commit af11f8ad48
4 changed files with 51 additions and 45 deletions

View File

@@ -16,7 +16,7 @@ if str(src_path) not in sys.path:
sys.path.insert(0, str(src_path))
import uvicorn
from fastapi import FastAPI
from fastapi import FastAPI, BackgroundTasks
from cn.yinlihupo.ylhp_hr_2_0.controller.api import create_app
from cn.yinlihupo.ylhp_hr_2_0.service.scheduler import get_scheduler
@@ -61,10 +61,10 @@ def create_combined_app(enable_scheduler: bool = True) -> FastAPI:
return {"success": True, "message": f"Job {job_id} resumed"}
@app.post("/api/scheduler/trigger/crawl")
async def trigger_crawl():
async def trigger_crawl(background_tasks: BackgroundTasks):
"""手动触发爬取任务"""
scheduler = get_scheduler()
asyncio.create_task(scheduler._crawl_boss())
background_tasks.add_task(scheduler._crawl_boss)
return {"success": True, "message": "Crawl task triggered"}
return app

View File

@@ -252,10 +252,10 @@ class ResumeProcessJob:
print(f"[{datetime.now()}] 处理职位: {job.title} (ID: {job.source_id}, "
f"评价方案: {job.evaluation_schema_id or 'general'})")
# 获取候选人列表
candidates = crawler.get_candidates(job.source_id, page=1)
# 获取候选人列表(在线程池中执行,避免阻塞事件循环)
candidates = await asyncio.to_thread(crawler.get_candidates, job.source_id, page=1)
if not candidates:
result.message = "该职位下没有候选人"
print(f"[{datetime.now()}] 职位 {job.title} 没有候选人")
@@ -358,20 +358,20 @@ class ResumeProcessJob:
)
print(f"[{datetime.now()}] 处理职位: {job.title} (ID: {job.source_id})")
# 获取候选人列表
candidates = crawler.get_candidates(job.source_id, page=1)
# 获取候选人列表(在线程池中执行,避免阻塞事件循环)
candidates = await asyncio.to_thread(crawler.get_candidates, job.source_id, page=1)
if not candidates:
result.message = "该职位下没有候选人"
print(f"[{datetime.now()}] 职位 {job.title} 没有候选人")
return result
print(f"[{datetime.now()}] 职位 {job.title} 找到 {len(candidates)} 个候选人")
# 限制处理数量
candidates_to_process = candidates[:self.max_candidates_per_job]
for candidate in candidates_to_process:
try:
process_result = await self._process_single_candidate(
@@ -425,8 +425,8 @@ class ResumeProcessJob:
print(f"[{datetime.now()}] 处理候选人: {candidate.name}")
try:
# 获取简历详情
resume = crawler.get_resume_detail(candidate)
# 获取简历详情(在线程池中执行,避免阻塞事件循环)
resume = await asyncio.to_thread(crawler.get_resume_detail, candidate)
if not resume:
print(f"[{datetime.now()}] 候选人 {candidate.name} 无法获取简历详情")
@@ -434,13 +434,14 @@ class ResumeProcessJob:
# 构建原始数据
raw_data = self._build_raw_data(candidate, resume, job)
# 统一入库
ingestion_result = self.ingestion_service.ingest(
source=candidate.source,
raw_data=raw_data
# 统一入库(在线程池中执行,避免阻塞事件循环)
ingestion_result = await asyncio.to_thread(
self.ingestion_service.ingest,
candidate.source,
raw_data
)
if not ingestion_result.success:
print(f"[{datetime.now()}] 候选人 {candidate.name} 入库失败: {ingestion_result.message}")
return "failed"

View File

@@ -132,8 +132,8 @@ class ResumeEvaluationService:
print(f"[ResumeEvaluationService] 开始处理候选人: {candidate.name}")
try:
# 1. 获取简历详情 - 同步操作
resume = self._get_resume(crawler, candidate)
# 1. 获取简历详情 - 在线程池中异步执行
resume = await self._get_resume(crawler, candidate)
if not resume:
result.success = False
result.status = "failed"
@@ -192,19 +192,20 @@ class ResumeEvaluationService:
return result
def _get_resume(self, crawler: BaseCrawler, candidate: Candidate) -> Optional[Resume]:
async def _get_resume(self, crawler: BaseCrawler, candidate: Candidate) -> Optional[Resume]:
"""
获取简历详情
获取简历详情(在线程池中执行,避免阻塞事件循环)
Args:
crawler: 爬虫实例
candidate: 候选人对象
Returns:
Resume: 简历对象
"""
import asyncio
try:
return crawler.get_resume_detail(candidate)
return await asyncio.to_thread(crawler.get_resume_detail, candidate)
except Exception as e:
print(f"[ResumeEvaluationService] 获取简历失败: {e}")
return None

View File

@@ -93,39 +93,39 @@ class CrawlScheduler:
async def _crawl_boss(self):
"""爬取 Boss 直聘"""
print(f"[{datetime.now()}] 开始爬取 Boss 直聘...")
try:
# 获取所有活跃账号
recruiters = self.app.recruiter_service.list_active_recruiters(CandidateSource.BOSS)
if not recruiters:
print(f"[{datetime.now()}] 没有可用的 Boss 账号")
return
for recruiter in recruiters:
print(f"[{datetime.now()}] 使用账号: {recruiter.name}")
# 创建爬虫
crawler = self.app.recruiter_service.create_crawler_for_recruiter(recruiter)
if not crawler:
continue
# 获取职位列表
jobs = crawler.get_jobs()
# 获取职位列表(在线程池中执行,避免阻塞事件循环)
jobs = await asyncio.to_thread(crawler.get_jobs)
print(f"[{datetime.now()}] 找到 {len(jobs)} 个职位")
# 遍历职位爬取候选人
for job in jobs:
print(f"[{datetime.now()}] 爬取职位: {job.title}")
# 爬取候选人
candidates = crawler.get_candidates(job.source_id, page=1)
# 爬取候选人(在线程池中执行,避免阻塞事件循环)
candidates = await asyncio.to_thread(crawler.get_candidates, job.source_id, page=1)
print(f"[{datetime.now()}] 职位 '{job.title}' 找到 {len(candidates)} 个候选人")
for candidate in candidates[:10]: # 每职位限制10个候选人
try:
# 获取简历详情
resume = crawler.get_resume_detail(candidate)
# 获取简历详情(在线程池中执行,避免阻塞事件循环)
resume = await asyncio.to_thread(crawler.get_resume_detail, candidate)
if not resume:
continue
@@ -145,8 +145,12 @@ class CrawlScheduler:
"resumeText": resume.raw_content,
}
# 入库
result = self.app.ingestion_service.ingest(CandidateSource.BOSS, raw_data)
# 入库(在线程池中执行,避免阻塞事件循环)
result = await asyncio.to_thread(
self.app.ingestion_service.ingest,
CandidateSource.BOSS,
raw_data
)
print(f"[{datetime.now()}] 候选人 {candidate.name} 入库: {result.message}")
# 触发分析