diff --git a/run_server.py b/run_server.py index 82ca47e..a2400b8 100644 --- a/run_server.py +++ b/run_server.py @@ -16,7 +16,7 @@ if str(src_path) not in sys.path: sys.path.insert(0, str(src_path)) import uvicorn -from fastapi import FastAPI +from fastapi import FastAPI, BackgroundTasks from cn.yinlihupo.ylhp_hr_2_0.controller.api import create_app from cn.yinlihupo.ylhp_hr_2_0.service.scheduler import get_scheduler @@ -61,10 +61,10 @@ def create_combined_app(enable_scheduler: bool = True) -> FastAPI: return {"success": True, "message": f"Job {job_id} resumed"} @app.post("/api/scheduler/trigger/crawl") - async def trigger_crawl(): + async def trigger_crawl(background_tasks: BackgroundTasks): """手动触发爬取任务""" scheduler = get_scheduler() - asyncio.create_task(scheduler._crawl_boss()) + background_tasks.add_task(scheduler._crawl_boss) return {"success": True, "message": "Crawl task triggered"} return app diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/job/resume_process_job.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/job/resume_process_job.py index a4b1718..10e8a26 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/job/resume_process_job.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/job/resume_process_job.py @@ -252,10 +252,10 @@ class ResumeProcessJob: print(f"[{datetime.now()}] 处理职位: {job.title} (ID: {job.source_id}, " f"评价方案: {job.evaluation_schema_id or 'general'})") - - # 获取候选人列表 - candidates = crawler.get_candidates(job.source_id, page=1) - + + # 获取候选人列表(在线程池中执行,避免阻塞事件循环) + candidates = await asyncio.to_thread(crawler.get_candidates, job.source_id, page=1) + if not candidates: result.message = "该职位下没有候选人" print(f"[{datetime.now()}] 职位 {job.title} 没有候选人") @@ -358,20 +358,20 @@ class ResumeProcessJob: ) print(f"[{datetime.now()}] 处理职位: {job.title} (ID: {job.source_id})") - - # 获取候选人列表 - candidates = crawler.get_candidates(job.source_id, page=1) - + + # 获取候选人列表(在线程池中执行,避免阻塞事件循环) + candidates = await asyncio.to_thread(crawler.get_candidates, job.source_id, page=1) + if not candidates: result.message = "该职位下没有候选人" print(f"[{datetime.now()}] 职位 {job.title} 没有候选人") return result - + print(f"[{datetime.now()}] 职位 {job.title} 找到 {len(candidates)} 个候选人") - + # 限制处理数量 candidates_to_process = candidates[:self.max_candidates_per_job] - + for candidate in candidates_to_process: try: process_result = await self._process_single_candidate( @@ -425,8 +425,8 @@ class ResumeProcessJob: print(f"[{datetime.now()}] 处理候选人: {candidate.name}") try: - # 获取简历详情 - resume = crawler.get_resume_detail(candidate) + # 获取简历详情(在线程池中执行,避免阻塞事件循环) + resume = await asyncio.to_thread(crawler.get_resume_detail, candidate) if not resume: print(f"[{datetime.now()}] 候选人 {candidate.name} 无法获取简历详情") @@ -434,13 +434,14 @@ class ResumeProcessJob: # 构建原始数据 raw_data = self._build_raw_data(candidate, resume, job) - - # 统一入库 - ingestion_result = self.ingestion_service.ingest( - source=candidate.source, - raw_data=raw_data + + # 统一入库(在线程池中执行,避免阻塞事件循环) + ingestion_result = await asyncio.to_thread( + self.ingestion_service.ingest, + candidate.source, + raw_data ) - + if not ingestion_result.success: print(f"[{datetime.now()}] 候选人 {candidate.name} 入库失败: {ingestion_result.message}") return "failed" diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/resume_evaluation_service.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/resume_evaluation_service.py index 0bc2607..c4ea613 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/resume_evaluation_service.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/resume_evaluation_service.py @@ -132,8 +132,8 @@ class ResumeEvaluationService: print(f"[ResumeEvaluationService] 开始处理候选人: {candidate.name}") try: - # 1. 获取简历详情 - 同步操作 - resume = self._get_resume(crawler, candidate) + # 1. 获取简历详情 - 在线程池中异步执行 + resume = await self._get_resume(crawler, candidate) if not resume: result.success = False result.status = "failed" @@ -192,19 +192,20 @@ class ResumeEvaluationService: return result - def _get_resume(self, crawler: BaseCrawler, candidate: Candidate) -> Optional[Resume]: + async def _get_resume(self, crawler: BaseCrawler, candidate: Candidate) -> Optional[Resume]: """ - 获取简历详情 - + 获取简历详情(在线程池中执行,避免阻塞事件循环) + Args: crawler: 爬虫实例 candidate: 候选人对象 - + Returns: Resume: 简历对象 """ + import asyncio try: - return crawler.get_resume_detail(candidate) + return await asyncio.to_thread(crawler.get_resume_detail, candidate) except Exception as e: print(f"[ResumeEvaluationService] 获取简历失败: {e}") return None diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/scheduler.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/scheduler.py index 232cdba..0b1dacc 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/scheduler.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/scheduler.py @@ -93,39 +93,39 @@ class CrawlScheduler: async def _crawl_boss(self): """爬取 Boss 直聘""" print(f"[{datetime.now()}] 开始爬取 Boss 直聘...") - + try: # 获取所有活跃账号 recruiters = self.app.recruiter_service.list_active_recruiters(CandidateSource.BOSS) - + if not recruiters: print(f"[{datetime.now()}] 没有可用的 Boss 账号") return - + for recruiter in recruiters: print(f"[{datetime.now()}] 使用账号: {recruiter.name}") - + # 创建爬虫 crawler = self.app.recruiter_service.create_crawler_for_recruiter(recruiter) if not crawler: continue - - # 获取职位列表 - jobs = crawler.get_jobs() + + # 获取职位列表(在线程池中执行,避免阻塞事件循环) + jobs = await asyncio.to_thread(crawler.get_jobs) print(f"[{datetime.now()}] 找到 {len(jobs)} 个职位") - + # 遍历职位爬取候选人 for job in jobs: print(f"[{datetime.now()}] 爬取职位: {job.title}") - - # 爬取候选人 - candidates = crawler.get_candidates(job.source_id, page=1) + + # 爬取候选人(在线程池中执行,避免阻塞事件循环) + candidates = await asyncio.to_thread(crawler.get_candidates, job.source_id, page=1) print(f"[{datetime.now()}] 职位 '{job.title}' 找到 {len(candidates)} 个候选人") - + for candidate in candidates[:10]: # 每职位限制10个候选人 try: - # 获取简历详情 - resume = crawler.get_resume_detail(candidate) + # 获取简历详情(在线程池中执行,避免阻塞事件循环) + resume = await asyncio.to_thread(crawler.get_resume_detail, candidate) if not resume: continue @@ -145,8 +145,12 @@ class CrawlScheduler: "resumeText": resume.raw_content, } - # 入库 - result = self.app.ingestion_service.ingest(CandidateSource.BOSS, raw_data) + # 入库(在线程池中执行,避免阻塞事件循环) + result = await asyncio.to_thread( + self.app.ingestion_service.ingest, + CandidateSource.BOSS, + raw_data + ) print(f"[{datetime.now()}] 候选人 {candidate.name} 入库: {result.message}") # 触发分析