diff --git a/.gitignore b/.gitignore index e754787..3d91010 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,9 @@ __pycache__/ #qoder **/.qoder/** +#idea +**/.idea/** + # Distribution / packaging .Python build/ diff --git a/migrations/001_init_schema.sql b/migrations/001_init_schema.sql index fa61e82..b7d0beb 100644 --- a/migrations/001_init_schema.sql +++ b/migrations/001_init_schema.sql @@ -11,10 +11,25 @@ CREATE TABLE IF NOT EXISTS recruiters ( wt_token VARCHAR(512) NOT NULL, -- WT Token (加密存储) status VARCHAR(32) DEFAULT 'ACTIVE', -- ACTIVE, INACTIVE, EXPIRED last_used_at TIMESTAMP, -- 最后使用时间 + + -- 账号权益信息 + vip_level VARCHAR(32), -- VIP等级 + vip_status VARCHAR(32), -- VIP状态 + vip_expire_at TIMESTAMP, -- VIP过期时间 + resume_view_count INT DEFAULT 0, -- 剩余简历查看次数 + resume_view_total INT DEFAULT 0, -- 总简历查看次数 + + -- 账号统计信息 + last_sync_at TIMESTAMP, -- 最后同步时间 + sync_status VARCHAR(32), -- 同步状态: SUCCESS, FAILED, PENDING + sync_error TEXT, -- 同步错误信息 + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY uk_source_token (source, wt_token(255)), - INDEX idx_status (status) + INDEX idx_status (status), + INDEX idx_sync_status (sync_status), + INDEX idx_last_sync_at (last_sync_at) ); -- ============================================ @@ -73,6 +88,7 @@ CREATE TABLE IF NOT EXISTS jobs ( id VARCHAR(64) PRIMARY KEY, source VARCHAR(32) NOT NULL, -- BOSS, LIEPIN, etc. source_id VARCHAR(128) NOT NULL, + recruiter_id VARCHAR(64), -- 关联的招聘者账号ID title VARCHAR(256) NOT NULL, department VARCHAR(128), location VARCHAR(128), @@ -81,10 +97,19 @@ CREATE TABLE IF NOT EXISTS jobs ( requirements TEXT, -- 职位要求JSON description TEXT, -- 职位描述 status VARCHAR(32) DEFAULT 'ACTIVE', -- ACTIVE, PAUSED, CLOSED, ARCHIVED + + -- 职位统计信息 + candidate_count INT DEFAULT 0, -- 候选人数量 + new_candidate_count INT DEFAULT 0, -- 新候选人数量 + last_sync_at TIMESTAMP, -- 最后同步时间 + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY uk_source_source_id (source, source_id), - INDEX idx_status (status) + INDEX idx_status (status), + INDEX idx_recruiter_id (recruiter_id), + INDEX idx_last_sync_at (last_sync_at), + FOREIGN KEY (recruiter_id) REFERENCES recruiters(id) ON DELETE SET NULL ); -- ============================================ diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/config/database.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/config/database.py index 0a10a7c..90e7270 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/config/database.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/config/database.py @@ -17,6 +17,19 @@ class RecruiterModel(Base): wt_token = Column(String(512), nullable=False) status = Column(String(32), default='active') last_used_at = Column(DateTime) + + # 账号权益信息 + vip_level = Column(String(32)) + vip_status = Column(String(32)) + vip_expire_at = Column(DateTime) + resume_view_count = Column(Integer, default=0) + resume_view_total = Column(Integer, default=0) + + # 同步信息 + last_sync_at = Column(DateTime) + sync_status = Column(String(32)) + sync_error = Column(Text) + created_at = Column(DateTime, server_default=func.now()) updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now()) @@ -69,6 +82,7 @@ class JobModel(Base): id = Column(String(64), primary_key=True) source = Column(String(32), nullable=False) source_id = Column(String(128), nullable=False) + recruiter_id = Column(String(64), ForeignKey('recruiters.id')) title = Column(String(256), nullable=False) department = Column(String(128)) location = Column(String(128)) @@ -77,6 +91,12 @@ class JobModel(Base): requirements = Column(Text) description = Column(Text) status = Column(String(32), default='ACTIVE') + + # 统计信息 + candidate_count = Column(Integer, default=0) + new_candidate_count = Column(Integer, default=0) + last_sync_at = Column(DateTime) + created_at = Column(DateTime, server_default=func.now()) updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now()) diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/domain/job.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/domain/job.py index 9be8db7..70fa3ec 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/domain/job.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/domain/job.py @@ -31,6 +31,7 @@ class Job: id: Optional[str] = None source: CandidateSource = CandidateSource.BOSS source_id: str = "" + recruiter_id: Optional[str] = None # 关联的招聘者账号ID # 职位信息 title: str = "" @@ -48,6 +49,11 @@ class Job: # 状态 status: JobStatus = JobStatus.ACTIVE + # 统计信息 + candidate_count: int = 0 # 候选人数量 + new_candidate_count: int = 0 # 新候选人数量 + last_sync_at: Optional[datetime] = None # 最后同步时间 + # 元数据 created_at: Optional[datetime] = None updated_at: Optional[datetime] = None diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/domain/recruiter.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/domain/recruiter.py index 5e42f09..77c64f4 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/domain/recruiter.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/domain/recruiter.py @@ -1,5 +1,5 @@ """Recruiter entity definitions""" -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime from typing import Optional from enum import Enum @@ -14,6 +14,23 @@ class RecruiterStatus(Enum): EXPIRED = "expired" # Token过期 +class SyncStatus(Enum): + """同步状态""" + PENDING = "pending" + SUCCESS = "success" + FAILED = "failed" + + +@dataclass +class RecruiterPrivilege: + """招聘者账号权益信息""" + vip_level: Optional[str] = None # VIP等级 + vip_status: Optional[str] = None # VIP状态 + vip_expire_at: Optional[datetime] = None # VIP过期时间 + resume_view_count: int = 0 # 剩余简历查看次数 + resume_view_total: int = 0 # 总简历查看次数 + + @dataclass class Recruiter: """招聘者账号实体""" @@ -23,6 +40,15 @@ class Recruiter: wt_token: str = "" # WT Token status: RecruiterStatus = RecruiterStatus.ACTIVE last_used_at: Optional[datetime] = None + + # 账号权益信息 + privilege: RecruiterPrivilege = field(default_factory=RecruiterPrivilege) + + # 同步信息 + last_sync_at: Optional[datetime] = None + sync_status: SyncStatus = SyncStatus.PENDING + sync_error: Optional[str] = None + created_at: Optional[datetime] = None updated_at: Optional[datetime] = None @@ -31,11 +57,36 @@ class Recruiter: self.created_at = datetime.now() if self.updated_at is None: self.updated_at = datetime.now() + if self.privilege is None: + self.privilege = RecruiterPrivilege() def is_active(self) -> bool: """检查账号是否活跃""" return self.status == RecruiterStatus.ACTIVE + def is_sync_success(self) -> bool: + """检查上次同步是否成功""" + return self.sync_status == SyncStatus.SUCCESS + + def can_view_resume(self) -> bool: + """检查是否可以查看简历""" + return ( + self.is_active() and + self.privilege.resume_view_count > 0 + ) + def mark_used(self): """标记为已使用""" self.last_used_at = datetime.now() + + def mark_sync_success(self): + """标记同步成功""" + self.last_sync_at = datetime.now() + self.sync_status = SyncStatus.SUCCESS + self.sync_error = None + + def mark_sync_failed(self, error: str): + """标记同步失败""" + self.last_sync_at = datetime.now() + self.sync_status = SyncStatus.FAILED + self.sync_error = error diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/job/__init__.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/job/__init__.py new file mode 100644 index 0000000..0c4b19a --- /dev/null +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/job/__init__.py @@ -0,0 +1,24 @@ +""" +定时任务模块 + +该模块包含所有定时任务的实现: +- account_sync_job: 账号同步任务(状态、权益、职位) +- resume_process_job: 简历处理任务 + +使用方式: + from ..job import AccountSyncJob, ResumeProcessJob + + # 创建任务实例 + account_job = AccountSyncJob(recruiter_service) + resume_job = ResumeProcessJob(ingestion_service) +""" + +from .account_sync_job import AccountSyncJob +from .resume_process_job import ResumeProcessJob +from .job_scheduler import JobScheduler + +__all__ = [ + "AccountSyncJob", + "ResumeProcessJob", + "JobScheduler" +] diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/job/account_sync_job.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/job/account_sync_job.py new file mode 100644 index 0000000..6e913f6 --- /dev/null +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/job/account_sync_job.py @@ -0,0 +1,309 @@ +""" +账号同步定时任务 + +负责同步招聘者账号的以下信息: +1. 账号状态检查(Token是否有效) +2. 账号权益信息(VIP等级、剩余查看次数等) +3. 正在招聘的职位列表 + +入库更新操作: +- 更新 recruiters 表的账号状态和权益信息 +- 更新 jobs 表的职位信息 +""" +from datetime import datetime +from typing import List, Optional, Dict, Any +import uuid + +from ..domain.recruiter import Recruiter, RecruiterStatus, RecruiterPrivilege, SyncStatus +from ..domain.job import Job, JobStatus +from ..domain.candidate import CandidateSource +from ..service.recruiter_service import RecruiterService + + +class AccountSyncResult: + """账号同步结果""" + def __init__(self, recruiter_id: str, success: bool, message: str = ""): + self.recruiter_id = recruiter_id + self.success = success + self.message = message + self.jobs_synced = 0 + self.timestamp = datetime.now() + + +class AccountSyncJob: + """ + 账号同步定时任务 + + 定时执行账号信息同步,包括: + - 检查账号登录状态 + - 同步账号权益信息 + - 同步职位列表 + """ + + def __init__(self, recruiter_service: RecruiterService): + self.recruiter_service = recruiter_service + + async def execute(self) -> List[AccountSyncResult]: + """ + 执行账号同步任务 + + Returns: + List[AccountSyncResult]: 各账号的同步结果 + """ + print(f"[{datetime.now()}] 开始执行账号同步任务...") + + results = [] + + # 获取所有账号(不限于活跃账号,需要检查所有账号状态) + recruiters = self.recruiter_service.find_all_recruiters() + + if not recruiters: + print(f"[{datetime.now()}] 没有配置任何招聘者账号") + return results + + print(f"[{datetime.now()}] 找到 {len(recruiters)} 个招聘者账号需要同步") + + for recruiter in recruiters: + try: + result = await self._sync_single_recruiter(recruiter) + results.append(result) + except Exception as e: + error_msg = f"同步账号 {recruiter.name} 时发生异常: {str(e)}" + print(f"[{datetime.now()}] {error_msg}") + + # 标记同步失败 + recruiter.mark_sync_failed(str(e)) + self.recruiter_service.save_recruiter(recruiter) + + results.append(AccountSyncResult( + recruiter_id=recruiter.id or "", + success=False, + message=error_msg + )) + + # 统计结果 + success_count = sum(1 for r in results if r.success) + print(f"[{datetime.now()}] 账号同步任务完成: {success_count}/{len(results)} 个账号同步成功") + + return results + + async def _sync_single_recruiter(self, recruiter: Recruiter) -> AccountSyncResult: + """ + 同步单个账号的信息 + + Args: + recruiter: 招聘者账号 + + Returns: + AccountSyncResult: 同步结果 + """ + print(f"[{datetime.now()}] 开始同步账号: {recruiter.name} (ID: {recruiter.id})") + + result = AccountSyncResult( + recruiter_id=recruiter.id or "", + success=False + ) + + # 1. 检查账号登录状态 + is_valid = await self._check_login_status(recruiter) + + if not is_valid: + # Token失效,更新状态为EXPIRED + recruiter.status = RecruiterStatus.EXPIRED + recruiter.mark_sync_failed("Token已失效或账号已过期") + self.recruiter_service.save_recruiter(recruiter) + + result.success = False + result.message = "Token已失效,账号状态已更新为EXPIRED" + print(f"[{datetime.now()}] 账号 {recruiter.name} Token已失效") + return result + + # 账号有效,确保状态为ACTIVE + if recruiter.status == RecruiterStatus.EXPIRED: + recruiter.status = RecruiterStatus.ACTIVE + + # 2. 同步账号权益信息 + try: + await self._sync_privilege_info(recruiter) + except Exception as e: + print(f"[{datetime.now()}] 同步账号 {recruiter.name} 权益信息失败: {e}") + + # 3. 同步职位列表 + try: + jobs_count = await self._sync_jobs(recruiter) + result.jobs_synced = jobs_count + except Exception as e: + print(f"[{datetime.now()}] 同步账号 {recruiter.name} 职位列表失败: {e}") + + # 标记同步成功 + recruiter.mark_sync_success() + self.recruiter_service.save_recruiter(recruiter) + + result.success = True + result.message = f"同步成功,职位数: {result.jobs_synced}" + print(f"[{datetime.now()}] 账号 {recruiter.name} 同步完成: {result.message}") + + return result + + async def _check_login_status(self, recruiter: Recruiter) -> bool: + """ + 检查账号登录状态 + + Args: + recruiter: 招聘者账号 + + Returns: + bool: 账号是否有效 + """ + try: + # 创建爬虫实例 + crawler = self.recruiter_service.create_crawler_for_recruiter(recruiter) + if not crawler: + print(f"[{datetime.now()}] 无法为账号 {recruiter.name} 创建爬虫") + return False + + # 检查登录状态 - 使用SDK的check_login_status方法 + if hasattr(crawler, 'client') and hasattr(crawler.client, 'check_login_status'): + is_valid = crawler.client.check_login_status() + print(f"[{datetime.now()}] 账号 {recruiter.name} 登录状态: {'有效' if is_valid else '无效'}") + return is_valid + else: + # 如果SDK没有提供该方法,尝试调用get_account_info作为替代 + print(f"[{datetime.now()}] SDK未提供check_login_status,尝试获取账号信息验证...") + try: + if hasattr(crawler.client, 'get_account_info'): + crawler.client.get_account_info() + return True + except Exception: + return False + return True + + except Exception as e: + print(f"[{datetime.now()}] 检查账号 {recruiter.name} 登录状态失败: {e}") + return False + + async def _sync_privilege_info(self, recruiter: Recruiter) -> bool: + """ + 同步账号权益信息 + + Args: + recruiter: 招聘者账号 + + Returns: + bool: 是否同步成功 + """ + try: + crawler = self.recruiter_service.create_crawler_for_recruiter(recruiter) + if not crawler: + return False + + # 获取账号权益详情 - 使用SDK的get_account_detail方法 + if hasattr(crawler, 'client') and hasattr(crawler.client, 'get_account_detail'): + privilege_data = crawler.client.get_account_detail() + + # 解析权益数据并更新到recruiter + self._parse_privilege_data(recruiter, privilege_data) + + print(f"[{datetime.now()}] 账号 {recruiter.name} 权益信息同步完成: " + f"VIP={recruiter.privilege.vip_status}, " + f"剩余查看次数={recruiter.privilege.resume_view_count}") + return True + else: + print(f"[{datetime.now()}] SDK未提供get_account_detail方法") + return False + + except Exception as e: + print(f"[{datetime.now()}] 同步账号 {recruiter.name} 权益信息失败: {e}") + return False + + def _parse_privilege_data(self, recruiter: Recruiter, privilege_data: Any): + """ + 解析权益数据并更新到recruiter对象 + + Args: + recruiter: 招聘者账号 + privilege_data: SDK返回的权益数据 + """ + try: + # 根据SDK返回的数据结构解析 + # 这里假设返回的是BossVipResponse对象,根据实际情况调整 + + if hasattr(privilege_data, 'zpData'): + data = privilege_data.zpData + else: + data = privilege_data + + # 解析VIP信息 + if hasattr(data, 'vipLevel'): + recruiter.privilege.vip_level = str(data.vipLevel) + elif isinstance(data, dict) and 'vipLevel' in data: + recruiter.privilege.vip_level = str(data['vipLevel']) + + if hasattr(data, 'vipStatus'): + recruiter.privilege.vip_status = str(data.vipStatus) + elif isinstance(data, dict) and 'vipStatus' in data: + recruiter.privilege.vip_status = str(data['vipStatus']) + + if hasattr(data, 'vipExpireTime'): + # 解析时间戳 + expire_time = data.vipExpireTime + if isinstance(expire_time, (int, float)): + from datetime import datetime + recruiter.privilege.vip_expire_at = datetime.fromtimestamp(expire_time / 1000) + elif isinstance(data, dict) and 'vipExpireTime' in data: + expire_time = data['vipExpireTime'] + if isinstance(expire_time, (int, float)): + from datetime import datetime + recruiter.privilege.vip_expire_at = datetime.fromtimestamp(expire_time / 1000) + + # 解析简历查看次数 + if hasattr(data, 'remainCount'): + recruiter.privilege.resume_view_count = int(data.remainCount) + elif isinstance(data, dict) and 'remainCount' in data: + recruiter.privilege.resume_view_count = int(data['remainCount']) + + if hasattr(data, 'totalCount'): + recruiter.privilege.resume_view_total = int(data.totalCount) + elif isinstance(data, dict) and 'totalCount' in data: + recruiter.privilege.resume_view_total = int(data['totalCount']) + + except Exception as e: + print(f"[{datetime.now()}] 解析权益数据失败: {e}") + + async def _sync_jobs(self, recruiter: Recruiter) -> int: + """ + 同步职位列表 + + Args: + recruiter: 招聘者账号 + + Returns: + int: 同步的职位数量 + """ + try: + crawler = self.recruiter_service.create_crawler_for_recruiter(recruiter) + if not crawler: + return 0 + + # 获取职位列表 + jobs = crawler.get_jobs() + + if not jobs: + print(f"[{datetime.now()}] 账号 {recruiter.name} 没有正在招聘的职位") + return 0 + + # 关联recruiter_id并保存职位 + for job in jobs: + job.recruiter_id = recruiter.id + job.source = recruiter.source + job.last_sync_at = datetime.now() + + # 保存或更新职位 + self.recruiter_service.save_job(job) + + print(f"[{datetime.now()}] 账号 {recruiter.name} 同步了 {len(jobs)} 个职位") + return len(jobs) + + except Exception as e: + print(f"[{datetime.now()}] 同步账号 {recruiter.name} 职位列表失败: {e}") + return 0 diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/job/job_scheduler.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/job/job_scheduler.py new file mode 100644 index 0000000..6a485e9 --- /dev/null +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/job/job_scheduler.py @@ -0,0 +1,372 @@ +""" +Job调度器 + +统一管理所有定时任务的调度: +- 账号同步任务 (AccountSyncJob) +- 简历处理任务 (ResumeProcessJob) + +提供任务配置、启停控制、状态监控等功能 +""" +from datetime import datetime +from typing import Optional, Dict, List, Callable +from dataclasses import dataclass, field + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.cron import CronTrigger +from apscheduler.triggers.interval import IntervalTrigger +from apscheduler.job import Job as APJob + +from .account_sync_job import AccountSyncJob +from .resume_process_job import ResumeProcessJob + + +@dataclass +class JobConfig: + """任务配置""" + job_id: str + name: str + enabled: bool = True + trigger_type: str = "interval" # "interval" or "cron" + # interval参数 + interval_minutes: int = 1 + # cron参数 + cron_expression: str = "" + # 额外参数 + max_instances: int = 1 + misfire_grace_time: int = 3600 # 1小时 + + +@dataclass +class JobStatusInfo: + """任务状态信息""" + job_id: str + name: str + enabled: bool + is_running: bool + last_run_time: Optional[datetime] = None + next_run_time: Optional[datetime] = None + run_count: int = 0 + success_count: int = 0 + fail_count: int = 0 + last_error: Optional[str] = None + + +class JobScheduler: + """ + Job调度器 + + 管理所有定时任务的调度和执行 + """ + + # 默认任务配置 + DEFAULT_ACCOUNT_SYNC_CONFIG = JobConfig( + job_id="account_sync", + name="账号同步任务", + enabled=True, + trigger_type="interval", + interval_minutes=1, # 每1分钟同步一次账号信息 + ) + + DEFAULT_RESUME_PROCESS_CONFIG = JobConfig( + job_id="resume_process", + name="简历处理任务", + enabled=True, + trigger_type="interval", + interval_minutes=2, # 每1分钟处理一次简历 + ) + + def __init__( + self, + account_sync_job: AccountSyncJob, + resume_process_job: ResumeProcessJob + ): + self.scheduler: Optional[AsyncIOScheduler] = None + self._running = False + + # 任务实例 + self.account_sync_job = account_sync_job + self.resume_process_job = resume_process_job + + # 任务配置 + self.account_sync_config = self.DEFAULT_ACCOUNT_SYNC_CONFIG + self.resume_process_config = self.DEFAULT_RESUME_PROCESS_CONFIG + + # 任务状态跟踪 + self._job_status: Dict[str, JobStatusInfo] = {} + self._init_status_tracking() + + def _init_status_tracking(self): + """初始化状态跟踪""" + self._job_status[self.account_sync_config.job_id] = JobStatusInfo( + job_id=self.account_sync_config.job_id, + name=self.account_sync_config.name, + enabled=self.account_sync_config.enabled, + is_running=False + ) + self._job_status[self.resume_process_config.job_id] = JobStatusInfo( + job_id=self.resume_process_config.job_id, + name=self.resume_process_config.name, + enabled=self.resume_process_config.enabled, + is_running=False + ) + + def start(self): + """启动调度器""" + if self._running: + print(f"[{datetime.now()}] Job调度器已经在运行中") + return + + self.scheduler = AsyncIOScheduler() + self.scheduler.start() + self._running = True + + print(f"[{datetime.now()}] Job调度器已启动") + + # 注册任务 + self._register_jobs() + + def _register_jobs(self): + """注册所有定时任务""" + # 注册账号同步任务 + if self.account_sync_config.enabled: + self._register_account_sync_job() + + # 注册简历处理任务 + if self.resume_process_config.enabled: + self._register_resume_process_job() + + print(f"[{datetime.now()}] 已注册定时任务:") + for job in self.scheduler.get_jobs(): + print(f" - {job.name}: {job.trigger}") + + def _register_account_sync_job(self): + """注册账号同步任务""" + config = self.account_sync_config + + if config.trigger_type == "interval": + trigger = IntervalTrigger(minutes=config.interval_minutes) + else: + trigger = CronTrigger.from_crontab(config.cron_expression) + + self.scheduler.add_job( + self._execute_account_sync, + trigger=trigger, + id=config.job_id, + name=config.name, + replace_existing=True, + max_instances=config.max_instances, + misfire_grace_time=config.misfire_grace_time + ) + + print(f"[{datetime.now()}] 已注册账号同步任务,间隔: {config.interval_minutes}分钟") + + def _register_resume_process_job(self): + """注册简历处理任务""" + config = self.resume_process_config + + if config.trigger_type == "interval": + trigger = IntervalTrigger(minutes=config.interval_minutes) + else: + trigger = CronTrigger.from_crontab(config.cron_expression) + + self.scheduler.add_job( + self._execute_resume_process, + trigger=trigger, + id=config.job_id, + name=config.name, + replace_existing=True, + max_instances=config.max_instances, + misfire_grace_time=config.misfire_grace_time + ) + + print(f"[{datetime.now()}] 已注册简历处理任务,间隔: {config.interval_minutes}分钟") + + async def _execute_account_sync(self): + """执行账号同步任务(包装方法)""" + status = self._job_status[self.account_sync_config.job_id] + + if status.is_running: + print(f"[{datetime.now()}] 账号同步任务正在运行中,跳过本次执行") + return + + status.is_running = True + status.last_run_time = datetime.now() + status.run_count += 1 + + print(f"[{datetime.now()}] 开始执行账号同步任务...") + + try: + results = await self.account_sync_job.execute() + + # 统计结果 + success_count = sum(1 for r in results if r.success) + status.success_count += success_count + status.fail_count += len(results) - success_count + status.last_error = None + + print(f"[{datetime.now()}] 账号同步任务执行完成") + + except Exception as e: + error_msg = str(e) + status.fail_count += 1 + status.last_error = error_msg + print(f"[{datetime.now()}] 账号同步任务执行失败: {error_msg}") + + finally: + status.is_running = False + + async def _execute_resume_process(self): + """执行简历处理任务(包装方法)""" + status = self._job_status[self.resume_process_config.job_id] + + if status.is_running: + print(f"[{datetime.now()}] 简历处理任务正在运行中,跳过本次执行") + return + + status.is_running = True + status.last_run_time = datetime.now() + status.run_count += 1 + + print(f"[{datetime.now()}] 开始执行简历处理任务...") + + try: + results = await self.resume_process_job.execute() + + # 统计结果 + total_processed = sum(r.candidates_processed for r in results) + status.success_count += total_processed + status.fail_count += sum(r.candidates_failed for r in results) + status.last_error = None + + print(f"[{datetime.now()}] 简历处理任务执行完成,处理了 {total_processed} 个候选人") + + except Exception as e: + error_msg = str(e) + status.fail_count += 1 + status.last_error = error_msg + print(f"[{datetime.now()}] 简历处理任务执行失败: {error_msg}") + + finally: + status.is_running = False + + def stop(self): + """停止调度器""" + if self.scheduler: + self.scheduler.shutdown() + self._running = False + print(f"[{datetime.now()}] Job调度器已停止") + + def pause_job(self, job_id: str): + """暂停指定任务""" + if self.scheduler: + self.scheduler.pause_job(job_id) + if job_id in self._job_status: + self._job_status[job_id].enabled = False + print(f"[{datetime.now()}] 任务 {job_id} 已暂停") + + def resume_job(self, job_id: str): + """恢复指定任务""" + if self.scheduler: + self.scheduler.resume_job(job_id) + if job_id in self._job_status: + self._job_status[job_id].enabled = True + print(f"[{datetime.now()}] 任务 {job_id} 已恢复") + + def get_job_status(self, job_id: Optional[str] = None) -> List[Dict]: + """ + 获取任务状态 + + Args: + job_id: 任务ID,为None时返回所有任务状态 + + Returns: + List[Dict]: 任务状态列表 + """ + if job_id: + status = self._job_status.get(job_id) + return [self._status_to_dict(status)] if status else [] + + return [self._status_to_dict(s) for s in self._job_status.values()] + + def _status_to_dict(self, status: JobStatusInfo) -> Dict: + """将状态对象转换为字典""" + # 获取APScheduler中的任务信息 + next_run_time = None + if self.scheduler: + job = self.scheduler.get_job(status.job_id) + if job and job.next_run_time: + next_run_time = job.next_run_time.isoformat() + + return { + "job_id": status.job_id, + "name": status.name, + "enabled": status.enabled, + "is_running": status.is_running, + "last_run_time": status.last_run_time.isoformat() if status.last_run_time else None, + "next_run_time": next_run_time, + "run_count": status.run_count, + "success_count": status.success_count, + "fail_count": status.fail_count, + "last_error": status.last_error + } + + def get_scheduled_jobs(self) -> List[Dict]: + """获取所有已调度的任务""" + if not self.scheduler: + return [] + + return [ + { + "id": job.id, + "name": job.name, + "next_run_time": job.next_run_time.isoformat() if job.next_run_time else None, + "trigger": str(job.trigger) + } + for job in self.scheduler.get_jobs() + ] + + def update_job_config(self, job_id: str, **kwargs): + """ + 更新任务配置 + + Args: + job_id: 任务ID + **kwargs: 配置参数 + """ + if job_id == self.account_sync_config.job_id: + config = self.account_sync_config + elif job_id == self.resume_process_config.job_id: + config = self.resume_process_config + else: + print(f"[{datetime.now()}] 未知的任务ID: {job_id}") + return + + # 更新配置 + for key, value in kwargs.items(): + if hasattr(config, key): + setattr(config, key, value) + + # 如果调度器正在运行,重新注册任务 + if self._running and self.scheduler: + self.scheduler.remove_job(job_id) + + if job_id == self.account_sync_config.job_id: + self._register_account_sync_job() + else: + self._register_resume_process_job() + + print(f"[{datetime.now()}] 任务 {job_id} 配置已更新") + + async def run_job_now(self, job_id: str): + """ + 立即执行指定任务 + + Args: + job_id: 任务ID + """ + if job_id == self.account_sync_config.job_id: + await self._execute_account_sync() + elif job_id == self.resume_process_config.job_id: + await self._execute_resume_process() + else: + print(f"[{datetime.now()}] 未知的任务ID: {job_id}") diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/job/resume_process_job.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/job/resume_process_job.py new file mode 100644 index 0000000..38b7275 --- /dev/null +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/job/resume_process_job.py @@ -0,0 +1,365 @@ +""" +简历处理定时任务 + +负责处理简历信息入库操作: +1. 遍历所有活跃账号的职位 +2. 获取职位下的候选人列表 +3. 获取候选人简历详情 +4. 将简历信息入库 + +入库更新操作: +- 插入/更新 candidates 表 +- 插入/更新 resumes 表 +""" +from datetime import datetime +from typing import List, Optional, Dict, Any +from dataclasses import dataclass + +from ..domain.candidate import Candidate, CandidateSource, CandidateStatus +from ..domain.resume import Resume +from ..domain.job import Job +from ..domain.recruiter import Recruiter, RecruiterStatus +from ..service.recruiter_service import RecruiterService +from ..service.ingestion.unified_ingestion_service import UnifiedIngestionService +from ..service.crawler.base_crawler import BaseCrawler + + +@dataclass +class ResumeProcessResult: + """简历处理结果""" + job_id: str + job_title: str + recruiter_id: str + recruiter_name: str + candidates_processed: int = 0 + candidates_new: int = 0 + candidates_updated: int = 0 + candidates_failed: int = 0 + success: bool = True + message: str = "" + timestamp: datetime = None + + def __post_init__(self): + if self.timestamp is None: + self.timestamp = datetime.now() + + +class ResumeProcessJob: + """ + 简历处理定时任务 + + 定时执行简历爬取和入库: + - 获取所有活跃账号 + - 遍历账号下的职位 + - 获取候选人列表和简历详情 + - 统一入库处理 + """ + + def __init__( + self, + recruiter_service: RecruiterService, + ingestion_service: UnifiedIngestionService + ): + self.recruiter_service = recruiter_service + self.ingestion_service = ingestion_service + # 每个职位最多处理的候选人数量 + self.max_candidates_per_job = 20 + + async def execute(self) -> List[ResumeProcessResult]: + """ + 执行简历处理任务 + + Returns: + List[ResumeProcessResult]: 各职位的处理结果 + """ + print(f"[{datetime.now()}] 开始执行简历处理任务...") + + results = [] + + # 获取所有活跃且同步成功的账号 + recruiters = self._get_eligible_recruiters() + + if not recruiters: + print(f"[{datetime.now()}] 没有符合条件的招聘者账号(需要ACTIVE状态且同步成功)") + return results + + print(f"[{datetime.now()}] 找到 {len(recruiters)} 个符合条件的账号") + + for recruiter in recruiters: + try: + job_results = await self._process_recruiter_jobs(recruiter) + results.extend(job_results) + except Exception as e: + print(f"[{datetime.now()}] 处理账号 {recruiter.name} 时发生异常: {e}") + + # 统计结果 + total_processed = sum(r.candidates_processed for r in results) + total_new = sum(r.candidates_new for r in results) + total_failed = sum(r.candidates_failed for r in results) + + print(f"[{datetime.now()}] 简历处理任务完成:") + print(f" - 处理职位数: {len(results)}") + print(f" - 处理候选人数: {total_processed}") + print(f" - 新增候选人数: {total_new}") + print(f" - 失败数: {total_failed}") + + return results + + def _get_eligible_recruiters(self) -> List[Recruiter]: + """ + 获取符合条件的招聘者账号 + + 条件: + - 状态为ACTIVE + - 上次同步成功 + - 有剩余简历查看次数 + + Returns: + List[Recruiter]: 符合条件的账号列表 + """ + eligible_recruiters = [] + + # 获取所有活跃账号 + all_recruiters = self.recruiter_service.find_all_recruiters() + + for recruiter in all_recruiters: + # 检查状态 + if recruiter.status != RecruiterStatus.ACTIVE: + continue + + # 检查同步状态(可选,如果从未同步过也可以处理) + if recruiter.last_sync_at and not recruiter.is_sync_success(): + print(f"[{datetime.now()}] 账号 {recruiter.name} 上次同步失败,跳过") + continue + + # 检查是否有查看次数(可选,有些平台可能不限制) + if recruiter.privilege and recruiter.privilege.resume_view_count == 0: + print(f"[{datetime.now()}] 账号 {recruiter.name} 简历查看次数已用完,跳过") + continue + + eligible_recruiters.append(recruiter) + + return eligible_recruiters + + async def _process_recruiter_jobs(self, recruiter: Recruiter) -> List[ResumeProcessResult]: + """ + 处理单个账号下的所有职位 + + Args: + recruiter: 招聘者账号 + + Returns: + List[ResumeProcessResult]: 各职位的处理结果 + """ + results = [] + + # 创建爬虫 + crawler = self.recruiter_service.create_crawler_for_recruiter(recruiter) + if not crawler: + print(f"[{datetime.now()}] 无法为账号 {recruiter.name} 创建爬虫") + return results + + # 获取该账号的职位列表 + jobs = self.recruiter_service.find_jobs_by_recruiter(recruiter.id) + + if not jobs: + print(f"[{datetime.now()}] 账号 {recruiter.name} 没有职位") + return results + + print(f"[{datetime.now()}] 账号 {recruiter.name} 有 {len(jobs)} 个职位需要处理") + + for job in jobs: + try: + result = await self._process_single_job(recruiter, crawler, job) + results.append(result) + except Exception as e: + error_result = ResumeProcessResult( + job_id=job.id or "", + job_title=job.title, + recruiter_id=recruiter.id or "", + recruiter_name=recruiter.name, + success=False, + message=f"处理职位时发生异常: {str(e)}" + ) + results.append(error_result) + print(f"[{datetime.now()}] 处理职位 {job.title} 时发生异常: {e}") + + # 标记账号已使用 + self.recruiter_service.mark_recruiter_used(recruiter.id) + + return results + + async def _process_single_job( + self, + recruiter: Recruiter, + crawler: BaseCrawler, + job: Job + ) -> ResumeProcessResult: + """ + 处理单个职位的候选人 + + Args: + recruiter: 招聘者账号 + crawler: 爬虫实例 + job: 职位 + + Returns: + ResumeProcessResult: 处理结果 + """ + result = ResumeProcessResult( + job_id=job.id or "", + job_title=job.title, + recruiter_id=recruiter.id or "", + recruiter_name=recruiter.name + ) + + print(f"[{datetime.now()}] 处理职位: {job.title} (ID: {job.source_id})") + + # 获取候选人列表 + candidates = crawler.get_candidates(job.source_id, page=1) + + if not candidates: + result.message = "该职位下没有候选人" + print(f"[{datetime.now()}] 职位 {job.title} 没有候选人") + return result + + print(f"[{datetime.now()}] 职位 {job.title} 找到 {len(candidates)} 个候选人") + + # 限制处理数量 + candidates_to_process = candidates[:self.max_candidates_per_job] + + for candidate in candidates_to_process: + try: + process_result = await self._process_single_candidate( + recruiter, crawler, job, candidate + ) + + result.candidates_processed += 1 + + if process_result == "new": + result.candidates_new += 1 + elif process_result == "updated": + result.candidates_updated += 1 + elif process_result == "failed": + result.candidates_failed += 1 + + except Exception as e: + result.candidates_processed += 1 + result.candidates_failed += 1 + print(f"[{datetime.now()}] 处理候选人 {candidate.name} 失败: {e}") + + result.success = result.candidates_failed == 0 + result.message = ( + f"处理完成: 新增{result.candidates_new}, " + f"更新{result.candidates_updated}, " + f"失败{result.candidates_failed}" + ) + + print(f"[{datetime.now()}] 职位 {job.title} 处理完成: {result.message}") + + return result + + async def _process_single_candidate( + self, + recruiter: Recruiter, + crawler: BaseCrawler, + job: Job, + candidate: Candidate + ) -> str: + """ + 处理单个候选人 + + Args: + recruiter: 招聘者账号 + crawler: 爬虫实例 + job: 职位 + candidate: 候选人 + + Returns: + str: 处理结果类型 ("new"/"updated"/"failed"/"skipped") + """ + print(f"[{datetime.now()}] 处理候选人: {candidate.name}") + + try: + # 获取简历详情 + resume = crawler.get_resume_detail(candidate) + + if not resume: + print(f"[{datetime.now()}] 候选人 {candidate.name} 无法获取简历详情") + return "failed" + + # 构建原始数据 + raw_data = self._build_raw_data(candidate, resume, job) + + # 统一入库 + ingestion_result = self.ingestion_service.ingest( + source=candidate.source, + raw_data=raw_data + ) + + if not ingestion_result.success: + print(f"[{datetime.now()}] 候选人 {candidate.name} 入库失败: {ingestion_result.message}") + return "failed" + + # 判断是新增还是更新 + if ingestion_result.is_new: + print(f"[{datetime.now()}] 候选人 {candidate.name} 新增入库成功") + return "new" + else: + print(f"[{datetime.now()}] 候选人 {candidate.name} 更新入库成功") + return "updated" + + except Exception as e: + print(f"[{datetime.now()}] 处理候选人 {candidate.name} 时发生异常: {e}") + return "failed" + + def _build_raw_data( + self, + candidate: Candidate, + resume: Resume, + job: Job + ) -> Dict[str, Any]: + """ + 构建原始数据用于入库 + + Args: + candidate: 候选人 + resume: 简历 + job: 职位 + + Returns: + Dict[str, Any]: 原始数据字典 + """ + raw_data = { + # 基本信息 + "geekId": candidate.source_id, + "name": candidate.name, + "phone": candidate.phone, + "email": candidate.email, + "age": candidate.age, + "gender": candidate.gender.value if candidate.gender else 0, + + # 工作信息 + "company": candidate.current_company, + "position": candidate.current_position, + "workYears": candidate.work_years, + "education": candidate.education, + "school": candidate.school, + + # 薪资期望 + "salaryMin": candidate.salary_expectation.min_salary if candidate.salary_expectation else None, + "salaryMax": candidate.salary_expectation.max_salary if candidate.salary_expectation else None, + + # 简历内容 + "resumeText": resume.raw_content, + "parsedContent": resume.parsed_content.to_dict() if resume.parsed_content else None, + + # 职位关联 + "jobId": job.source_id, + "jobTitle": job.title, + + # 原始数据保留 + "rawCandidateData": candidate.raw_data if hasattr(candidate, 'raw_data') else None, + } + + return raw_data diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/mapper/recruiter_mapper.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/mapper/recruiter_mapper.py index 2e2bccf..d8bce96 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/mapper/recruiter_mapper.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/mapper/recruiter_mapper.py @@ -6,7 +6,7 @@ import uuid from sqlalchemy import select, update, delete from sqlalchemy.orm import Session -from ..domain.recruiter import Recruiter, RecruiterStatus +from ..domain.recruiter import Recruiter, RecruiterStatus, RecruiterPrivilege, SyncStatus from ..domain.candidate import CandidateSource from ..config.database import get_db_manager, RecruiterModel @@ -29,6 +29,17 @@ class RecruiterMapper: source_value = model.source.lower() if model.source else "boss" # 处理 status 的大小写 status_value = model.status.lower() if model.status else "active" + # 处理 sync_status 的大小写 + sync_status_value = model.sync_status.lower() if model.sync_status else "pending" + + # 构建权益信息 + privilege = RecruiterPrivilege( + vip_level=model.vip_level, + vip_status=model.vip_status, + vip_expire_at=model.vip_expire_at, + resume_view_count=model.resume_view_count or 0, + resume_view_total=model.resume_view_total or 0 + ) return Recruiter( id=model.id, @@ -37,6 +48,10 @@ class RecruiterMapper: wt_token=model.wt_token, status=RecruiterStatus(status_value), last_used_at=model.last_used_at, + privilege=privilege, + last_sync_at=model.last_sync_at, + sync_status=SyncStatus(sync_status_value), + sync_error=model.sync_error, created_at=model.created_at, updated_at=model.updated_at ) @@ -49,7 +64,15 @@ class RecruiterMapper: source=entity.source.value, wt_token=entity.wt_token, status=entity.status.value, - last_used_at=entity.last_used_at + last_used_at=entity.last_used_at, + vip_level=entity.privilege.vip_level if entity.privilege else None, + vip_status=entity.privilege.vip_status if entity.privilege else None, + vip_expire_at=entity.privilege.vip_expire_at if entity.privilege else None, + resume_view_count=entity.privilege.resume_view_count if entity.privilege else 0, + resume_view_total=entity.privilege.resume_view_total if entity.privilege else 0, + last_sync_at=entity.last_sync_at, + sync_status=entity.sync_status.value if entity.sync_status else 'pending', + sync_error=entity.sync_error ) def save(self, recruiter: Recruiter) -> Recruiter: @@ -66,7 +89,15 @@ class RecruiterMapper: source=recruiter.source.value, wt_token=recruiter.wt_token, status=recruiter.status.value, - last_used_at=recruiter.last_used_at + last_used_at=recruiter.last_used_at, + vip_level=recruiter.privilege.vip_level if recruiter.privilege else None, + vip_status=recruiter.privilege.vip_status if recruiter.privilege else None, + vip_expire_at=recruiter.privilege.vip_expire_at if recruiter.privilege else None, + resume_view_count=recruiter.privilege.resume_view_count if recruiter.privilege else 0, + resume_view_total=recruiter.privilege.resume_view_total if recruiter.privilege else 0, + last_sync_at=recruiter.last_sync_at, + sync_status=recruiter.sync_status.value if recruiter.sync_status else 'pending', + sync_error=recruiter.sync_error ) ) session.execute(stmt) @@ -174,3 +205,44 @@ class RecruiterMapper: return result.rowcount > 0 finally: session.close() + + def update_sync_status(self, recruiter_id: str, sync_status: SyncStatus, error: Optional[str] = None) -> bool: + """更新同步状态""" + session = self._get_session() + try: + stmt = ( + update(RecruiterModel) + .where(RecruiterModel.id == recruiter_id) + .values( + sync_status=sync_status.value, + sync_error=error, + last_sync_at=datetime.now() + ) + ) + result = session.execute(stmt) + session.commit() + return result.rowcount > 0 + finally: + session.close() + + def update_privilege(self, recruiter_id: str, privilege: RecruiterPrivilege) -> bool: + """更新账号权益信息""" + session = self._get_session() + try: + stmt = ( + update(RecruiterModel) + .where(RecruiterModel.id == recruiter_id) + .values( + vip_level=privilege.vip_level, + vip_status=privilege.vip_status, + vip_expire_at=privilege.vip_expire_at, + resume_view_count=privilege.resume_view_count, + resume_view_total=privilege.resume_view_total, + updated_at=datetime.now() + ) + ) + result = session.execute(stmt) + session.commit() + return result.rowcount > 0 + finally: + session.close() diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/recruiter_service.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/recruiter_service.py index f0af9e2..384d9d2 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/recruiter_service.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/recruiter_service.py @@ -1,8 +1,9 @@ """Recruiter service - Manage recruiter accounts""" from typing import List, Optional -from ..domain.recruiter import Recruiter, RecruiterStatus +from ..domain.recruiter import Recruiter, RecruiterStatus, RecruiterPrivilege from ..domain.candidate import CandidateSource +from ..domain.job import Job from ..mapper.recruiter_mapper import RecruiterMapper from ..service.crawler import BossCrawler, CrawlerFactory @@ -118,3 +119,42 @@ class RecruiterService: print(f"Registered crawler for recruiter: {recruiter.name}") return count + + # ==================== Job模块支持的方法 ==================== + + def find_all_recruiters(self) -> List[Recruiter]: + """获取所有招聘者账号(用于Job模块)""" + return self.mapper.find_all() + + def save_recruiter(self, recruiter: Recruiter) -> Recruiter: + """保存招聘者账号(用于Job模块更新)""" + return self.mapper.save(recruiter) + + def update_recruiter_privilege(self, recruiter_id: str, privilege: RecruiterPrivilege) -> bool: + """更新账号权益信息""" + return self.mapper.update_privilege(recruiter_id, privilege) + + def find_jobs_by_recruiter(self, recruiter_id: str) -> List[Job]: + """ + 获取指定招聘者的职位列表 + + TODO: 需要实现JobMapper + 目前返回空列表,后续需要添加Job数据访问层 + """ + # 暂时返回空列表,后续需要实现JobMapper + # from ..mapper.job_mapper import JobMapper + # return JobMapper().find_by_recruiter(recruiter_id) + return [] + + def save_job(self, job: Job) -> Job: + """ + 保存职位信息 + + TODO: 需要实现JobMapper + 目前仅打印日志,后续需要添加Job数据访问层 + """ + # 暂时仅打印日志,后续需要实现JobMapper + # from ..mapper.job_mapper import JobMapper + # return JobMapper().save(job) + print(f"[Job] 保存职位: {job.title} (ID: {job.source_id})") + return job diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/scheduler.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/scheduler.py index a152a60..232cdba 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/scheduler.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/scheduler.py @@ -9,6 +9,7 @@ from apscheduler.triggers.interval import IntervalTrigger from ..domain.candidate import CandidateSource from ..main import get_app +from ..job import AccountSyncJob, ResumeProcessJob, JobScheduler class CrawlScheduler: @@ -16,12 +17,16 @@ class CrawlScheduler: 爬虫定时任务调度器 定时执行简历爬取任务 + 集成新的Job模块:账号同步任务和简历处理任务 """ def __init__(self): self.scheduler: Optional[AsyncIOScheduler] = None self.app = None self._running = False + + # 新的Job调度器 + self.job_scheduler: Optional[JobScheduler] = None def start(self): """启动调度器""" @@ -37,9 +42,30 @@ class CrawlScheduler: print(f"[{datetime.now()}] 爬虫调度器已启动") - # 注册定时任务 + # 启动新的Job调度器(如果应用支持) + self._start_job_scheduler() + + # 注册旧版定时任务(保持兼容) self._register_jobs() + def _start_job_scheduler(self): + """启动新的Job调度器""" + try: + # 创建Job实例 + account_sync_job = AccountSyncJob(self.app.recruiter_service) + resume_process_job = ResumeProcessJob( + self.app.recruiter_service, + self.app.ingestion_service + ) + + # 创建并启动Job调度器 + self.job_scheduler = JobScheduler(account_sync_job, resume_process_job) + self.job_scheduler.start() + + print(f"[{datetime.now()}] Job调度器已启动(账号同步 + 简历处理)") + except Exception as e: + print(f"[{datetime.now()}] 启动Job调度器失败: {e}") + def _register_jobs(self): """注册定时任务""" # 每30分钟爬取一次 Boss 直聘 @@ -147,8 +173,13 @@ class CrawlScheduler: """停止调度器""" if self.scheduler: self.scheduler.shutdown() - self._running = False - print(f"[{datetime.now()}] 爬虫调度器已停止") + + # 停止Job调度器 + if self.job_scheduler: + self.job_scheduler.stop() + + self._running = False + print(f"[{datetime.now()}] 爬虫调度器已停止") def pause_job(self, job_id: str): """暂停指定任务""" @@ -164,17 +195,58 @@ class CrawlScheduler: def get_jobs(self): """获取所有任务""" + jobs = [] + + # 获取旧版任务 if self.scheduler: - return [ + jobs.extend([ { "id": job.id, "name": job.name, "next_run_time": job.next_run_time.isoformat() if job.next_run_time else None, - "trigger": str(job.trigger) + "trigger": str(job.trigger), + "type": "legacy" } for job in self.scheduler.get_jobs() - ] + ]) + + # 获取新版Job任务 + if self.job_scheduler: + jobs.extend([ + { + "id": job["id"], + "name": job["name"], + "next_run_time": job["next_run_time"], + "trigger": job["trigger"], + "type": "job" + } + for job in self.job_scheduler.get_scheduled_jobs() + ]) + + return jobs + + def get_job_status(self, job_id: Optional[str] = None): + """ + 获取Job任务状态 + + Args: + job_id: Job任务ID,为None时返回所有Job任务状态 + """ + if self.job_scheduler: + return self.job_scheduler.get_job_status(job_id) return [] + + async def run_job_now(self, job_id: str): + """ + 立即执行指定的Job任务 + + Args: + job_id: Job任务ID (account_sync 或 resume_process) + """ + if self.job_scheduler: + await self.job_scheduler.run_job_now(job_id) + else: + print(f"[{datetime.now()}] Job调度器未启动") # 全局调度器实例