feat(job): 添加账号与职位同步及简历处理定时任务

- 在recruiters与jobs表中新增账号权益和统计字段,添加对应索引和外键约束
- 扩展Recruiter和Job领域模型,支持权益、同步状态及职位统计信息
- 实现账号同步定时任务,定期检查账号状态、同步权益及职位数据
- 实现简历处理定时任务,遍历活跃账号职位,抓取候选人简历并统一入库
- 引入Job调度器,集中管理账号同步和简历处理任务的调度、启停及状态监控
- 添加.gitignore规则忽略.idea目录配置文件
This commit is contained in:
2026-03-24 17:00:58 +08:00
parent b6afe82d2f
commit 49cd8682d0
12 changed files with 1372 additions and 13 deletions

3
.gitignore vendored
View File

@@ -9,6 +9,9 @@ __pycache__/
#qoder
**/.qoder/**
#idea
**/.idea/**
# Distribution / packaging
.Python
build/

View File

@@ -11,10 +11,25 @@ CREATE TABLE IF NOT EXISTS recruiters (
wt_token VARCHAR(512) NOT NULL, -- WT Token (加密存储)
status VARCHAR(32) DEFAULT 'ACTIVE', -- ACTIVE, INACTIVE, EXPIRED
last_used_at TIMESTAMP, -- 最后使用时间
-- 账号权益信息
vip_level VARCHAR(32), -- VIP等级
vip_status VARCHAR(32), -- VIP状态
vip_expire_at TIMESTAMP, -- VIP过期时间
resume_view_count INT DEFAULT 0, -- 剩余简历查看次数
resume_view_total INT DEFAULT 0, -- 总简历查看次数
-- 账号统计信息
last_sync_at TIMESTAMP, -- 最后同步时间
sync_status VARCHAR(32), -- 同步状态: SUCCESS, FAILED, PENDING
sync_error TEXT, -- 同步错误信息
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_source_token (source, wt_token(255)),
INDEX idx_status (status)
INDEX idx_status (status),
INDEX idx_sync_status (sync_status),
INDEX idx_last_sync_at (last_sync_at)
);
-- ============================================
@@ -73,6 +88,7 @@ CREATE TABLE IF NOT EXISTS jobs (
id VARCHAR(64) PRIMARY KEY,
source VARCHAR(32) NOT NULL, -- BOSS, LIEPIN, etc.
source_id VARCHAR(128) NOT NULL,
recruiter_id VARCHAR(64), -- 关联的招聘者账号ID
title VARCHAR(256) NOT NULL,
department VARCHAR(128),
location VARCHAR(128),
@@ -81,10 +97,19 @@ CREATE TABLE IF NOT EXISTS jobs (
requirements TEXT, -- 职位要求JSON
description TEXT, -- 职位描述
status VARCHAR(32) DEFAULT 'ACTIVE', -- ACTIVE, PAUSED, CLOSED, ARCHIVED
-- 职位统计信息
candidate_count INT DEFAULT 0, -- 候选人数量
new_candidate_count INT DEFAULT 0, -- 新候选人数量
last_sync_at TIMESTAMP, -- 最后同步时间
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_source_source_id (source, source_id),
INDEX idx_status (status)
INDEX idx_status (status),
INDEX idx_recruiter_id (recruiter_id),
INDEX idx_last_sync_at (last_sync_at),
FOREIGN KEY (recruiter_id) REFERENCES recruiters(id) ON DELETE SET NULL
);
-- ============================================

View File

@@ -17,6 +17,19 @@ class RecruiterModel(Base):
wt_token = Column(String(512), nullable=False)
status = Column(String(32), default='active')
last_used_at = Column(DateTime)
# 账号权益信息
vip_level = Column(String(32))
vip_status = Column(String(32))
vip_expire_at = Column(DateTime)
resume_view_count = Column(Integer, default=0)
resume_view_total = Column(Integer, default=0)
# 同步信息
last_sync_at = Column(DateTime)
sync_status = Column(String(32))
sync_error = Column(Text)
created_at = Column(DateTime, server_default=func.now())
updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())
@@ -69,6 +82,7 @@ class JobModel(Base):
id = Column(String(64), primary_key=True)
source = Column(String(32), nullable=False)
source_id = Column(String(128), nullable=False)
recruiter_id = Column(String(64), ForeignKey('recruiters.id'))
title = Column(String(256), nullable=False)
department = Column(String(128))
location = Column(String(128))
@@ -77,6 +91,12 @@ class JobModel(Base):
requirements = Column(Text)
description = Column(Text)
status = Column(String(32), default='ACTIVE')
# 统计信息
candidate_count = Column(Integer, default=0)
new_candidate_count = Column(Integer, default=0)
last_sync_at = Column(DateTime)
created_at = Column(DateTime, server_default=func.now())
updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())

View File

@@ -31,6 +31,7 @@ class Job:
id: Optional[str] = None
source: CandidateSource = CandidateSource.BOSS
source_id: str = ""
recruiter_id: Optional[str] = None # 关联的招聘者账号ID
# 职位信息
title: str = ""
@@ -48,6 +49,11 @@ class Job:
# 状态
status: JobStatus = JobStatus.ACTIVE
# 统计信息
candidate_count: int = 0 # 候选人数量
new_candidate_count: int = 0 # 新候选人数量
last_sync_at: Optional[datetime] = None # 最后同步时间
# 元数据
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None

View File

@@ -1,5 +1,5 @@
"""Recruiter entity definitions"""
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from enum import Enum
@@ -14,6 +14,23 @@ class RecruiterStatus(Enum):
EXPIRED = "expired" # Token过期
class SyncStatus(Enum):
"""同步状态"""
PENDING = "pending"
SUCCESS = "success"
FAILED = "failed"
@dataclass
class RecruiterPrivilege:
"""招聘者账号权益信息"""
vip_level: Optional[str] = None # VIP等级
vip_status: Optional[str] = None # VIP状态
vip_expire_at: Optional[datetime] = None # VIP过期时间
resume_view_count: int = 0 # 剩余简历查看次数
resume_view_total: int = 0 # 总简历查看次数
@dataclass
class Recruiter:
"""招聘者账号实体"""
@@ -23,6 +40,15 @@ class Recruiter:
wt_token: str = "" # WT Token
status: RecruiterStatus = RecruiterStatus.ACTIVE
last_used_at: Optional[datetime] = None
# 账号权益信息
privilege: RecruiterPrivilege = field(default_factory=RecruiterPrivilege)
# 同步信息
last_sync_at: Optional[datetime] = None
sync_status: SyncStatus = SyncStatus.PENDING
sync_error: Optional[str] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
@@ -31,11 +57,36 @@ class Recruiter:
self.created_at = datetime.now()
if self.updated_at is None:
self.updated_at = datetime.now()
if self.privilege is None:
self.privilege = RecruiterPrivilege()
def is_active(self) -> bool:
"""检查账号是否活跃"""
return self.status == RecruiterStatus.ACTIVE
def is_sync_success(self) -> bool:
"""检查上次同步是否成功"""
return self.sync_status == SyncStatus.SUCCESS
def can_view_resume(self) -> bool:
"""检查是否可以查看简历"""
return (
self.is_active() and
self.privilege.resume_view_count > 0
)
def mark_used(self):
"""标记为已使用"""
self.last_used_at = datetime.now()
def mark_sync_success(self):
"""标记同步成功"""
self.last_sync_at = datetime.now()
self.sync_status = SyncStatus.SUCCESS
self.sync_error = None
def mark_sync_failed(self, error: str):
"""标记同步失败"""
self.last_sync_at = datetime.now()
self.sync_status = SyncStatus.FAILED
self.sync_error = error

View File

@@ -0,0 +1,24 @@
"""
定时任务模块
该模块包含所有定时任务的实现:
- account_sync_job: 账号同步任务(状态、权益、职位)
- resume_process_job: 简历处理任务
使用方式:
from ..job import AccountSyncJob, ResumeProcessJob
# 创建任务实例
account_job = AccountSyncJob(recruiter_service)
resume_job = ResumeProcessJob(ingestion_service)
"""
from .account_sync_job import AccountSyncJob
from .resume_process_job import ResumeProcessJob
from .job_scheduler import JobScheduler
__all__ = [
"AccountSyncJob",
"ResumeProcessJob",
"JobScheduler"
]

View File

@@ -0,0 +1,309 @@
"""
账号同步定时任务
负责同步招聘者账号的以下信息:
1. 账号状态检查Token是否有效
2. 账号权益信息VIP等级、剩余查看次数等
3. 正在招聘的职位列表
入库更新操作:
- 更新 recruiters 表的账号状态和权益信息
- 更新 jobs 表的职位信息
"""
from datetime import datetime
from typing import List, Optional, Dict, Any
import uuid
from ..domain.recruiter import Recruiter, RecruiterStatus, RecruiterPrivilege, SyncStatus
from ..domain.job import Job, JobStatus
from ..domain.candidate import CandidateSource
from ..service.recruiter_service import RecruiterService
class AccountSyncResult:
"""账号同步结果"""
def __init__(self, recruiter_id: str, success: bool, message: str = ""):
self.recruiter_id = recruiter_id
self.success = success
self.message = message
self.jobs_synced = 0
self.timestamp = datetime.now()
class AccountSyncJob:
"""
账号同步定时任务
定时执行账号信息同步,包括:
- 检查账号登录状态
- 同步账号权益信息
- 同步职位列表
"""
def __init__(self, recruiter_service: RecruiterService):
self.recruiter_service = recruiter_service
async def execute(self) -> List[AccountSyncResult]:
"""
执行账号同步任务
Returns:
List[AccountSyncResult]: 各账号的同步结果
"""
print(f"[{datetime.now()}] 开始执行账号同步任务...")
results = []
# 获取所有账号(不限于活跃账号,需要检查所有账号状态)
recruiters = self.recruiter_service.find_all_recruiters()
if not recruiters:
print(f"[{datetime.now()}] 没有配置任何招聘者账号")
return results
print(f"[{datetime.now()}] 找到 {len(recruiters)} 个招聘者账号需要同步")
for recruiter in recruiters:
try:
result = await self._sync_single_recruiter(recruiter)
results.append(result)
except Exception as e:
error_msg = f"同步账号 {recruiter.name} 时发生异常: {str(e)}"
print(f"[{datetime.now()}] {error_msg}")
# 标记同步失败
recruiter.mark_sync_failed(str(e))
self.recruiter_service.save_recruiter(recruiter)
results.append(AccountSyncResult(
recruiter_id=recruiter.id or "",
success=False,
message=error_msg
))
# 统计结果
success_count = sum(1 for r in results if r.success)
print(f"[{datetime.now()}] 账号同步任务完成: {success_count}/{len(results)} 个账号同步成功")
return results
async def _sync_single_recruiter(self, recruiter: Recruiter) -> AccountSyncResult:
"""
同步单个账号的信息
Args:
recruiter: 招聘者账号
Returns:
AccountSyncResult: 同步结果
"""
print(f"[{datetime.now()}] 开始同步账号: {recruiter.name} (ID: {recruiter.id})")
result = AccountSyncResult(
recruiter_id=recruiter.id or "",
success=False
)
# 1. 检查账号登录状态
is_valid = await self._check_login_status(recruiter)
if not is_valid:
# Token失效更新状态为EXPIRED
recruiter.status = RecruiterStatus.EXPIRED
recruiter.mark_sync_failed("Token已失效或账号已过期")
self.recruiter_service.save_recruiter(recruiter)
result.success = False
result.message = "Token已失效账号状态已更新为EXPIRED"
print(f"[{datetime.now()}] 账号 {recruiter.name} Token已失效")
return result
# 账号有效确保状态为ACTIVE
if recruiter.status == RecruiterStatus.EXPIRED:
recruiter.status = RecruiterStatus.ACTIVE
# 2. 同步账号权益信息
try:
await self._sync_privilege_info(recruiter)
except Exception as e:
print(f"[{datetime.now()}] 同步账号 {recruiter.name} 权益信息失败: {e}")
# 3. 同步职位列表
try:
jobs_count = await self._sync_jobs(recruiter)
result.jobs_synced = jobs_count
except Exception as e:
print(f"[{datetime.now()}] 同步账号 {recruiter.name} 职位列表失败: {e}")
# 标记同步成功
recruiter.mark_sync_success()
self.recruiter_service.save_recruiter(recruiter)
result.success = True
result.message = f"同步成功,职位数: {result.jobs_synced}"
print(f"[{datetime.now()}] 账号 {recruiter.name} 同步完成: {result.message}")
return result
async def _check_login_status(self, recruiter: Recruiter) -> bool:
"""
检查账号登录状态
Args:
recruiter: 招聘者账号
Returns:
bool: 账号是否有效
"""
try:
# 创建爬虫实例
crawler = self.recruiter_service.create_crawler_for_recruiter(recruiter)
if not crawler:
print(f"[{datetime.now()}] 无法为账号 {recruiter.name} 创建爬虫")
return False
# 检查登录状态 - 使用SDK的check_login_status方法
if hasattr(crawler, 'client') and hasattr(crawler.client, 'check_login_status'):
is_valid = crawler.client.check_login_status()
print(f"[{datetime.now()}] 账号 {recruiter.name} 登录状态: {'有效' if is_valid else '无效'}")
return is_valid
else:
# 如果SDK没有提供该方法尝试调用get_account_info作为替代
print(f"[{datetime.now()}] SDK未提供check_login_status尝试获取账号信息验证...")
try:
if hasattr(crawler.client, 'get_account_info'):
crawler.client.get_account_info()
return True
except Exception:
return False
return True
except Exception as e:
print(f"[{datetime.now()}] 检查账号 {recruiter.name} 登录状态失败: {e}")
return False
async def _sync_privilege_info(self, recruiter: Recruiter) -> bool:
"""
同步账号权益信息
Args:
recruiter: 招聘者账号
Returns:
bool: 是否同步成功
"""
try:
crawler = self.recruiter_service.create_crawler_for_recruiter(recruiter)
if not crawler:
return False
# 获取账号权益详情 - 使用SDK的get_account_detail方法
if hasattr(crawler, 'client') and hasattr(crawler.client, 'get_account_detail'):
privilege_data = crawler.client.get_account_detail()
# 解析权益数据并更新到recruiter
self._parse_privilege_data(recruiter, privilege_data)
print(f"[{datetime.now()}] 账号 {recruiter.name} 权益信息同步完成: "
f"VIP={recruiter.privilege.vip_status}, "
f"剩余查看次数={recruiter.privilege.resume_view_count}")
return True
else:
print(f"[{datetime.now()}] SDK未提供get_account_detail方法")
return False
except Exception as e:
print(f"[{datetime.now()}] 同步账号 {recruiter.name} 权益信息失败: {e}")
return False
def _parse_privilege_data(self, recruiter: Recruiter, privilege_data: Any):
"""
解析权益数据并更新到recruiter对象
Args:
recruiter: 招聘者账号
privilege_data: SDK返回的权益数据
"""
try:
# 根据SDK返回的数据结构解析
# 这里假设返回的是BossVipResponse对象根据实际情况调整
if hasattr(privilege_data, 'zpData'):
data = privilege_data.zpData
else:
data = privilege_data
# 解析VIP信息
if hasattr(data, 'vipLevel'):
recruiter.privilege.vip_level = str(data.vipLevel)
elif isinstance(data, dict) and 'vipLevel' in data:
recruiter.privilege.vip_level = str(data['vipLevel'])
if hasattr(data, 'vipStatus'):
recruiter.privilege.vip_status = str(data.vipStatus)
elif isinstance(data, dict) and 'vipStatus' in data:
recruiter.privilege.vip_status = str(data['vipStatus'])
if hasattr(data, 'vipExpireTime'):
# 解析时间戳
expire_time = data.vipExpireTime
if isinstance(expire_time, (int, float)):
from datetime import datetime
recruiter.privilege.vip_expire_at = datetime.fromtimestamp(expire_time / 1000)
elif isinstance(data, dict) and 'vipExpireTime' in data:
expire_time = data['vipExpireTime']
if isinstance(expire_time, (int, float)):
from datetime import datetime
recruiter.privilege.vip_expire_at = datetime.fromtimestamp(expire_time / 1000)
# 解析简历查看次数
if hasattr(data, 'remainCount'):
recruiter.privilege.resume_view_count = int(data.remainCount)
elif isinstance(data, dict) and 'remainCount' in data:
recruiter.privilege.resume_view_count = int(data['remainCount'])
if hasattr(data, 'totalCount'):
recruiter.privilege.resume_view_total = int(data.totalCount)
elif isinstance(data, dict) and 'totalCount' in data:
recruiter.privilege.resume_view_total = int(data['totalCount'])
except Exception as e:
print(f"[{datetime.now()}] 解析权益数据失败: {e}")
async def _sync_jobs(self, recruiter: Recruiter) -> int:
"""
同步职位列表
Args:
recruiter: 招聘者账号
Returns:
int: 同步的职位数量
"""
try:
crawler = self.recruiter_service.create_crawler_for_recruiter(recruiter)
if not crawler:
return 0
# 获取职位列表
jobs = crawler.get_jobs()
if not jobs:
print(f"[{datetime.now()}] 账号 {recruiter.name} 没有正在招聘的职位")
return 0
# 关联recruiter_id并保存职位
for job in jobs:
job.recruiter_id = recruiter.id
job.source = recruiter.source
job.last_sync_at = datetime.now()
# 保存或更新职位
self.recruiter_service.save_job(job)
print(f"[{datetime.now()}] 账号 {recruiter.name} 同步了 {len(jobs)} 个职位")
return len(jobs)
except Exception as e:
print(f"[{datetime.now()}] 同步账号 {recruiter.name} 职位列表失败: {e}")
return 0

View File

@@ -0,0 +1,372 @@
"""
Job调度器
统一管理所有定时任务的调度:
- 账号同步任务 (AccountSyncJob)
- 简历处理任务 (ResumeProcessJob)
提供任务配置、启停控制、状态监控等功能
"""
from datetime import datetime
from typing import Optional, Dict, List, Callable
from dataclasses import dataclass, field
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.job import Job as APJob
from .account_sync_job import AccountSyncJob
from .resume_process_job import ResumeProcessJob
@dataclass
class JobConfig:
"""任务配置"""
job_id: str
name: str
enabled: bool = True
trigger_type: str = "interval" # "interval" or "cron"
# interval参数
interval_minutes: int = 1
# cron参数
cron_expression: str = ""
# 额外参数
max_instances: int = 1
misfire_grace_time: int = 3600 # 1小时
@dataclass
class JobStatusInfo:
"""任务状态信息"""
job_id: str
name: str
enabled: bool
is_running: bool
last_run_time: Optional[datetime] = None
next_run_time: Optional[datetime] = None
run_count: int = 0
success_count: int = 0
fail_count: int = 0
last_error: Optional[str] = None
class JobScheduler:
"""
Job调度器
管理所有定时任务的调度和执行
"""
# 默认任务配置
DEFAULT_ACCOUNT_SYNC_CONFIG = JobConfig(
job_id="account_sync",
name="账号同步任务",
enabled=True,
trigger_type="interval",
interval_minutes=1, # 每1分钟同步一次账号信息
)
DEFAULT_RESUME_PROCESS_CONFIG = JobConfig(
job_id="resume_process",
name="简历处理任务",
enabled=True,
trigger_type="interval",
interval_minutes=2, # 每1分钟处理一次简历
)
def __init__(
self,
account_sync_job: AccountSyncJob,
resume_process_job: ResumeProcessJob
):
self.scheduler: Optional[AsyncIOScheduler] = None
self._running = False
# 任务实例
self.account_sync_job = account_sync_job
self.resume_process_job = resume_process_job
# 任务配置
self.account_sync_config = self.DEFAULT_ACCOUNT_SYNC_CONFIG
self.resume_process_config = self.DEFAULT_RESUME_PROCESS_CONFIG
# 任务状态跟踪
self._job_status: Dict[str, JobStatusInfo] = {}
self._init_status_tracking()
def _init_status_tracking(self):
"""初始化状态跟踪"""
self._job_status[self.account_sync_config.job_id] = JobStatusInfo(
job_id=self.account_sync_config.job_id,
name=self.account_sync_config.name,
enabled=self.account_sync_config.enabled,
is_running=False
)
self._job_status[self.resume_process_config.job_id] = JobStatusInfo(
job_id=self.resume_process_config.job_id,
name=self.resume_process_config.name,
enabled=self.resume_process_config.enabled,
is_running=False
)
def start(self):
"""启动调度器"""
if self._running:
print(f"[{datetime.now()}] Job调度器已经在运行中")
return
self.scheduler = AsyncIOScheduler()
self.scheduler.start()
self._running = True
print(f"[{datetime.now()}] Job调度器已启动")
# 注册任务
self._register_jobs()
def _register_jobs(self):
"""注册所有定时任务"""
# 注册账号同步任务
if self.account_sync_config.enabled:
self._register_account_sync_job()
# 注册简历处理任务
if self.resume_process_config.enabled:
self._register_resume_process_job()
print(f"[{datetime.now()}] 已注册定时任务:")
for job in self.scheduler.get_jobs():
print(f" - {job.name}: {job.trigger}")
def _register_account_sync_job(self):
"""注册账号同步任务"""
config = self.account_sync_config
if config.trigger_type == "interval":
trigger = IntervalTrigger(minutes=config.interval_minutes)
else:
trigger = CronTrigger.from_crontab(config.cron_expression)
self.scheduler.add_job(
self._execute_account_sync,
trigger=trigger,
id=config.job_id,
name=config.name,
replace_existing=True,
max_instances=config.max_instances,
misfire_grace_time=config.misfire_grace_time
)
print(f"[{datetime.now()}] 已注册账号同步任务,间隔: {config.interval_minutes}分钟")
def _register_resume_process_job(self):
"""注册简历处理任务"""
config = self.resume_process_config
if config.trigger_type == "interval":
trigger = IntervalTrigger(minutes=config.interval_minutes)
else:
trigger = CronTrigger.from_crontab(config.cron_expression)
self.scheduler.add_job(
self._execute_resume_process,
trigger=trigger,
id=config.job_id,
name=config.name,
replace_existing=True,
max_instances=config.max_instances,
misfire_grace_time=config.misfire_grace_time
)
print(f"[{datetime.now()}] 已注册简历处理任务,间隔: {config.interval_minutes}分钟")
async def _execute_account_sync(self):
"""执行账号同步任务(包装方法)"""
status = self._job_status[self.account_sync_config.job_id]
if status.is_running:
print(f"[{datetime.now()}] 账号同步任务正在运行中,跳过本次执行")
return
status.is_running = True
status.last_run_time = datetime.now()
status.run_count += 1
print(f"[{datetime.now()}] 开始执行账号同步任务...")
try:
results = await self.account_sync_job.execute()
# 统计结果
success_count = sum(1 for r in results if r.success)
status.success_count += success_count
status.fail_count += len(results) - success_count
status.last_error = None
print(f"[{datetime.now()}] 账号同步任务执行完成")
except Exception as e:
error_msg = str(e)
status.fail_count += 1
status.last_error = error_msg
print(f"[{datetime.now()}] 账号同步任务执行失败: {error_msg}")
finally:
status.is_running = False
async def _execute_resume_process(self):
"""执行简历处理任务(包装方法)"""
status = self._job_status[self.resume_process_config.job_id]
if status.is_running:
print(f"[{datetime.now()}] 简历处理任务正在运行中,跳过本次执行")
return
status.is_running = True
status.last_run_time = datetime.now()
status.run_count += 1
print(f"[{datetime.now()}] 开始执行简历处理任务...")
try:
results = await self.resume_process_job.execute()
# 统计结果
total_processed = sum(r.candidates_processed for r in results)
status.success_count += total_processed
status.fail_count += sum(r.candidates_failed for r in results)
status.last_error = None
print(f"[{datetime.now()}] 简历处理任务执行完成,处理了 {total_processed} 个候选人")
except Exception as e:
error_msg = str(e)
status.fail_count += 1
status.last_error = error_msg
print(f"[{datetime.now()}] 简历处理任务执行失败: {error_msg}")
finally:
status.is_running = False
def stop(self):
"""停止调度器"""
if self.scheduler:
self.scheduler.shutdown()
self._running = False
print(f"[{datetime.now()}] Job调度器已停止")
def pause_job(self, job_id: str):
"""暂停指定任务"""
if self.scheduler:
self.scheduler.pause_job(job_id)
if job_id in self._job_status:
self._job_status[job_id].enabled = False
print(f"[{datetime.now()}] 任务 {job_id} 已暂停")
def resume_job(self, job_id: str):
"""恢复指定任务"""
if self.scheduler:
self.scheduler.resume_job(job_id)
if job_id in self._job_status:
self._job_status[job_id].enabled = True
print(f"[{datetime.now()}] 任务 {job_id} 已恢复")
def get_job_status(self, job_id: Optional[str] = None) -> List[Dict]:
"""
获取任务状态
Args:
job_id: 任务ID为None时返回所有任务状态
Returns:
List[Dict]: 任务状态列表
"""
if job_id:
status = self._job_status.get(job_id)
return [self._status_to_dict(status)] if status else []
return [self._status_to_dict(s) for s in self._job_status.values()]
def _status_to_dict(self, status: JobStatusInfo) -> Dict:
"""将状态对象转换为字典"""
# 获取APScheduler中的任务信息
next_run_time = None
if self.scheduler:
job = self.scheduler.get_job(status.job_id)
if job and job.next_run_time:
next_run_time = job.next_run_time.isoformat()
return {
"job_id": status.job_id,
"name": status.name,
"enabled": status.enabled,
"is_running": status.is_running,
"last_run_time": status.last_run_time.isoformat() if status.last_run_time else None,
"next_run_time": next_run_time,
"run_count": status.run_count,
"success_count": status.success_count,
"fail_count": status.fail_count,
"last_error": status.last_error
}
def get_scheduled_jobs(self) -> List[Dict]:
"""获取所有已调度的任务"""
if not self.scheduler:
return []
return [
{
"id": job.id,
"name": job.name,
"next_run_time": job.next_run_time.isoformat() if job.next_run_time else None,
"trigger": str(job.trigger)
}
for job in self.scheduler.get_jobs()
]
def update_job_config(self, job_id: str, **kwargs):
"""
更新任务配置
Args:
job_id: 任务ID
**kwargs: 配置参数
"""
if job_id == self.account_sync_config.job_id:
config = self.account_sync_config
elif job_id == self.resume_process_config.job_id:
config = self.resume_process_config
else:
print(f"[{datetime.now()}] 未知的任务ID: {job_id}")
return
# 更新配置
for key, value in kwargs.items():
if hasattr(config, key):
setattr(config, key, value)
# 如果调度器正在运行,重新注册任务
if self._running and self.scheduler:
self.scheduler.remove_job(job_id)
if job_id == self.account_sync_config.job_id:
self._register_account_sync_job()
else:
self._register_resume_process_job()
print(f"[{datetime.now()}] 任务 {job_id} 配置已更新")
async def run_job_now(self, job_id: str):
"""
立即执行指定任务
Args:
job_id: 任务ID
"""
if job_id == self.account_sync_config.job_id:
await self._execute_account_sync()
elif job_id == self.resume_process_config.job_id:
await self._execute_resume_process()
else:
print(f"[{datetime.now()}] 未知的任务ID: {job_id}")

View File

@@ -0,0 +1,365 @@
"""
简历处理定时任务
负责处理简历信息入库操作:
1. 遍历所有活跃账号的职位
2. 获取职位下的候选人列表
3. 获取候选人简历详情
4. 将简历信息入库
入库更新操作:
- 插入/更新 candidates 表
- 插入/更新 resumes 表
"""
from datetime import datetime
from typing import List, Optional, Dict, Any
from dataclasses import dataclass
from ..domain.candidate import Candidate, CandidateSource, CandidateStatus
from ..domain.resume import Resume
from ..domain.job import Job
from ..domain.recruiter import Recruiter, RecruiterStatus
from ..service.recruiter_service import RecruiterService
from ..service.ingestion.unified_ingestion_service import UnifiedIngestionService
from ..service.crawler.base_crawler import BaseCrawler
@dataclass
class ResumeProcessResult:
"""简历处理结果"""
job_id: str
job_title: str
recruiter_id: str
recruiter_name: str
candidates_processed: int = 0
candidates_new: int = 0
candidates_updated: int = 0
candidates_failed: int = 0
success: bool = True
message: str = ""
timestamp: datetime = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now()
class ResumeProcessJob:
"""
简历处理定时任务
定时执行简历爬取和入库:
- 获取所有活跃账号
- 遍历账号下的职位
- 获取候选人列表和简历详情
- 统一入库处理
"""
def __init__(
self,
recruiter_service: RecruiterService,
ingestion_service: UnifiedIngestionService
):
self.recruiter_service = recruiter_service
self.ingestion_service = ingestion_service
# 每个职位最多处理的候选人数量
self.max_candidates_per_job = 20
async def execute(self) -> List[ResumeProcessResult]:
"""
执行简历处理任务
Returns:
List[ResumeProcessResult]: 各职位的处理结果
"""
print(f"[{datetime.now()}] 开始执行简历处理任务...")
results = []
# 获取所有活跃且同步成功的账号
recruiters = self._get_eligible_recruiters()
if not recruiters:
print(f"[{datetime.now()}] 没有符合条件的招聘者账号需要ACTIVE状态且同步成功")
return results
print(f"[{datetime.now()}] 找到 {len(recruiters)} 个符合条件的账号")
for recruiter in recruiters:
try:
job_results = await self._process_recruiter_jobs(recruiter)
results.extend(job_results)
except Exception as e:
print(f"[{datetime.now()}] 处理账号 {recruiter.name} 时发生异常: {e}")
# 统计结果
total_processed = sum(r.candidates_processed for r in results)
total_new = sum(r.candidates_new for r in results)
total_failed = sum(r.candidates_failed for r in results)
print(f"[{datetime.now()}] 简历处理任务完成:")
print(f" - 处理职位数: {len(results)}")
print(f" - 处理候选人数: {total_processed}")
print(f" - 新增候选人数: {total_new}")
print(f" - 失败数: {total_failed}")
return results
def _get_eligible_recruiters(self) -> List[Recruiter]:
"""
获取符合条件的招聘者账号
条件:
- 状态为ACTIVE
- 上次同步成功
- 有剩余简历查看次数
Returns:
List[Recruiter]: 符合条件的账号列表
"""
eligible_recruiters = []
# 获取所有活跃账号
all_recruiters = self.recruiter_service.find_all_recruiters()
for recruiter in all_recruiters:
# 检查状态
if recruiter.status != RecruiterStatus.ACTIVE:
continue
# 检查同步状态(可选,如果从未同步过也可以处理)
if recruiter.last_sync_at and not recruiter.is_sync_success():
print(f"[{datetime.now()}] 账号 {recruiter.name} 上次同步失败,跳过")
continue
# 检查是否有查看次数(可选,有些平台可能不限制)
if recruiter.privilege and recruiter.privilege.resume_view_count == 0:
print(f"[{datetime.now()}] 账号 {recruiter.name} 简历查看次数已用完,跳过")
continue
eligible_recruiters.append(recruiter)
return eligible_recruiters
async def _process_recruiter_jobs(self, recruiter: Recruiter) -> List[ResumeProcessResult]:
"""
处理单个账号下的所有职位
Args:
recruiter: 招聘者账号
Returns:
List[ResumeProcessResult]: 各职位的处理结果
"""
results = []
# 创建爬虫
crawler = self.recruiter_service.create_crawler_for_recruiter(recruiter)
if not crawler:
print(f"[{datetime.now()}] 无法为账号 {recruiter.name} 创建爬虫")
return results
# 获取该账号的职位列表
jobs = self.recruiter_service.find_jobs_by_recruiter(recruiter.id)
if not jobs:
print(f"[{datetime.now()}] 账号 {recruiter.name} 没有职位")
return results
print(f"[{datetime.now()}] 账号 {recruiter.name}{len(jobs)} 个职位需要处理")
for job in jobs:
try:
result = await self._process_single_job(recruiter, crawler, job)
results.append(result)
except Exception as e:
error_result = ResumeProcessResult(
job_id=job.id or "",
job_title=job.title,
recruiter_id=recruiter.id or "",
recruiter_name=recruiter.name,
success=False,
message=f"处理职位时发生异常: {str(e)}"
)
results.append(error_result)
print(f"[{datetime.now()}] 处理职位 {job.title} 时发生异常: {e}")
# 标记账号已使用
self.recruiter_service.mark_recruiter_used(recruiter.id)
return results
async def _process_single_job(
self,
recruiter: Recruiter,
crawler: BaseCrawler,
job: Job
) -> ResumeProcessResult:
"""
处理单个职位的候选人
Args:
recruiter: 招聘者账号
crawler: 爬虫实例
job: 职位
Returns:
ResumeProcessResult: 处理结果
"""
result = ResumeProcessResult(
job_id=job.id or "",
job_title=job.title,
recruiter_id=recruiter.id or "",
recruiter_name=recruiter.name
)
print(f"[{datetime.now()}] 处理职位: {job.title} (ID: {job.source_id})")
# 获取候选人列表
candidates = crawler.get_candidates(job.source_id, page=1)
if not candidates:
result.message = "该职位下没有候选人"
print(f"[{datetime.now()}] 职位 {job.title} 没有候选人")
return result
print(f"[{datetime.now()}] 职位 {job.title} 找到 {len(candidates)} 个候选人")
# 限制处理数量
candidates_to_process = candidates[:self.max_candidates_per_job]
for candidate in candidates_to_process:
try:
process_result = await self._process_single_candidate(
recruiter, crawler, job, candidate
)
result.candidates_processed += 1
if process_result == "new":
result.candidates_new += 1
elif process_result == "updated":
result.candidates_updated += 1
elif process_result == "failed":
result.candidates_failed += 1
except Exception as e:
result.candidates_processed += 1
result.candidates_failed += 1
print(f"[{datetime.now()}] 处理候选人 {candidate.name} 失败: {e}")
result.success = result.candidates_failed == 0
result.message = (
f"处理完成: 新增{result.candidates_new}, "
f"更新{result.candidates_updated}, "
f"失败{result.candidates_failed}"
)
print(f"[{datetime.now()}] 职位 {job.title} 处理完成: {result.message}")
return result
async def _process_single_candidate(
self,
recruiter: Recruiter,
crawler: BaseCrawler,
job: Job,
candidate: Candidate
) -> str:
"""
处理单个候选人
Args:
recruiter: 招聘者账号
crawler: 爬虫实例
job: 职位
candidate: 候选人
Returns:
str: 处理结果类型 ("new"/"updated"/"failed"/"skipped")
"""
print(f"[{datetime.now()}] 处理候选人: {candidate.name}")
try:
# 获取简历详情
resume = crawler.get_resume_detail(candidate)
if not resume:
print(f"[{datetime.now()}] 候选人 {candidate.name} 无法获取简历详情")
return "failed"
# 构建原始数据
raw_data = self._build_raw_data(candidate, resume, job)
# 统一入库
ingestion_result = self.ingestion_service.ingest(
source=candidate.source,
raw_data=raw_data
)
if not ingestion_result.success:
print(f"[{datetime.now()}] 候选人 {candidate.name} 入库失败: {ingestion_result.message}")
return "failed"
# 判断是新增还是更新
if ingestion_result.is_new:
print(f"[{datetime.now()}] 候选人 {candidate.name} 新增入库成功")
return "new"
else:
print(f"[{datetime.now()}] 候选人 {candidate.name} 更新入库成功")
return "updated"
except Exception as e:
print(f"[{datetime.now()}] 处理候选人 {candidate.name} 时发生异常: {e}")
return "failed"
def _build_raw_data(
self,
candidate: Candidate,
resume: Resume,
job: Job
) -> Dict[str, Any]:
"""
构建原始数据用于入库
Args:
candidate: 候选人
resume: 简历
job: 职位
Returns:
Dict[str, Any]: 原始数据字典
"""
raw_data = {
# 基本信息
"geekId": candidate.source_id,
"name": candidate.name,
"phone": candidate.phone,
"email": candidate.email,
"age": candidate.age,
"gender": candidate.gender.value if candidate.gender else 0,
# 工作信息
"company": candidate.current_company,
"position": candidate.current_position,
"workYears": candidate.work_years,
"education": candidate.education,
"school": candidate.school,
# 薪资期望
"salaryMin": candidate.salary_expectation.min_salary if candidate.salary_expectation else None,
"salaryMax": candidate.salary_expectation.max_salary if candidate.salary_expectation else None,
# 简历内容
"resumeText": resume.raw_content,
"parsedContent": resume.parsed_content.to_dict() if resume.parsed_content else None,
# 职位关联
"jobId": job.source_id,
"jobTitle": job.title,
# 原始数据保留
"rawCandidateData": candidate.raw_data if hasattr(candidate, 'raw_data') else None,
}
return raw_data

View File

@@ -6,7 +6,7 @@ import uuid
from sqlalchemy import select, update, delete
from sqlalchemy.orm import Session
from ..domain.recruiter import Recruiter, RecruiterStatus
from ..domain.recruiter import Recruiter, RecruiterStatus, RecruiterPrivilege, SyncStatus
from ..domain.candidate import CandidateSource
from ..config.database import get_db_manager, RecruiterModel
@@ -29,6 +29,17 @@ class RecruiterMapper:
source_value = model.source.lower() if model.source else "boss"
# 处理 status 的大小写
status_value = model.status.lower() if model.status else "active"
# 处理 sync_status 的大小写
sync_status_value = model.sync_status.lower() if model.sync_status else "pending"
# 构建权益信息
privilege = RecruiterPrivilege(
vip_level=model.vip_level,
vip_status=model.vip_status,
vip_expire_at=model.vip_expire_at,
resume_view_count=model.resume_view_count or 0,
resume_view_total=model.resume_view_total or 0
)
return Recruiter(
id=model.id,
@@ -37,6 +48,10 @@ class RecruiterMapper:
wt_token=model.wt_token,
status=RecruiterStatus(status_value),
last_used_at=model.last_used_at,
privilege=privilege,
last_sync_at=model.last_sync_at,
sync_status=SyncStatus(sync_status_value),
sync_error=model.sync_error,
created_at=model.created_at,
updated_at=model.updated_at
)
@@ -49,7 +64,15 @@ class RecruiterMapper:
source=entity.source.value,
wt_token=entity.wt_token,
status=entity.status.value,
last_used_at=entity.last_used_at
last_used_at=entity.last_used_at,
vip_level=entity.privilege.vip_level if entity.privilege else None,
vip_status=entity.privilege.vip_status if entity.privilege else None,
vip_expire_at=entity.privilege.vip_expire_at if entity.privilege else None,
resume_view_count=entity.privilege.resume_view_count if entity.privilege else 0,
resume_view_total=entity.privilege.resume_view_total if entity.privilege else 0,
last_sync_at=entity.last_sync_at,
sync_status=entity.sync_status.value if entity.sync_status else 'pending',
sync_error=entity.sync_error
)
def save(self, recruiter: Recruiter) -> Recruiter:
@@ -66,7 +89,15 @@ class RecruiterMapper:
source=recruiter.source.value,
wt_token=recruiter.wt_token,
status=recruiter.status.value,
last_used_at=recruiter.last_used_at
last_used_at=recruiter.last_used_at,
vip_level=recruiter.privilege.vip_level if recruiter.privilege else None,
vip_status=recruiter.privilege.vip_status if recruiter.privilege else None,
vip_expire_at=recruiter.privilege.vip_expire_at if recruiter.privilege else None,
resume_view_count=recruiter.privilege.resume_view_count if recruiter.privilege else 0,
resume_view_total=recruiter.privilege.resume_view_total if recruiter.privilege else 0,
last_sync_at=recruiter.last_sync_at,
sync_status=recruiter.sync_status.value if recruiter.sync_status else 'pending',
sync_error=recruiter.sync_error
)
)
session.execute(stmt)
@@ -174,3 +205,44 @@ class RecruiterMapper:
return result.rowcount > 0
finally:
session.close()
def update_sync_status(self, recruiter_id: str, sync_status: SyncStatus, error: Optional[str] = None) -> bool:
"""更新同步状态"""
session = self._get_session()
try:
stmt = (
update(RecruiterModel)
.where(RecruiterModel.id == recruiter_id)
.values(
sync_status=sync_status.value,
sync_error=error,
last_sync_at=datetime.now()
)
)
result = session.execute(stmt)
session.commit()
return result.rowcount > 0
finally:
session.close()
def update_privilege(self, recruiter_id: str, privilege: RecruiterPrivilege) -> bool:
"""更新账号权益信息"""
session = self._get_session()
try:
stmt = (
update(RecruiterModel)
.where(RecruiterModel.id == recruiter_id)
.values(
vip_level=privilege.vip_level,
vip_status=privilege.vip_status,
vip_expire_at=privilege.vip_expire_at,
resume_view_count=privilege.resume_view_count,
resume_view_total=privilege.resume_view_total,
updated_at=datetime.now()
)
)
result = session.execute(stmt)
session.commit()
return result.rowcount > 0
finally:
session.close()

View File

@@ -1,8 +1,9 @@
"""Recruiter service - Manage recruiter accounts"""
from typing import List, Optional
from ..domain.recruiter import Recruiter, RecruiterStatus
from ..domain.recruiter import Recruiter, RecruiterStatus, RecruiterPrivilege
from ..domain.candidate import CandidateSource
from ..domain.job import Job
from ..mapper.recruiter_mapper import RecruiterMapper
from ..service.crawler import BossCrawler, CrawlerFactory
@@ -118,3 +119,42 @@ class RecruiterService:
print(f"Registered crawler for recruiter: {recruiter.name}")
return count
# ==================== Job模块支持的方法 ====================
def find_all_recruiters(self) -> List[Recruiter]:
"""获取所有招聘者账号用于Job模块"""
return self.mapper.find_all()
def save_recruiter(self, recruiter: Recruiter) -> Recruiter:
"""保存招聘者账号用于Job模块更新"""
return self.mapper.save(recruiter)
def update_recruiter_privilege(self, recruiter_id: str, privilege: RecruiterPrivilege) -> bool:
"""更新账号权益信息"""
return self.mapper.update_privilege(recruiter_id, privilege)
def find_jobs_by_recruiter(self, recruiter_id: str) -> List[Job]:
"""
获取指定招聘者的职位列表
TODO: 需要实现JobMapper
目前返回空列表后续需要添加Job数据访问层
"""
# 暂时返回空列表后续需要实现JobMapper
# from ..mapper.job_mapper import JobMapper
# return JobMapper().find_by_recruiter(recruiter_id)
return []
def save_job(self, job: Job) -> Job:
"""
保存职位信息
TODO: 需要实现JobMapper
目前仅打印日志后续需要添加Job数据访问层
"""
# 暂时仅打印日志后续需要实现JobMapper
# from ..mapper.job_mapper import JobMapper
# return JobMapper().save(job)
print(f"[Job] 保存职位: {job.title} (ID: {job.source_id})")
return job

View File

@@ -9,6 +9,7 @@ from apscheduler.triggers.interval import IntervalTrigger
from ..domain.candidate import CandidateSource
from ..main import get_app
from ..job import AccountSyncJob, ResumeProcessJob, JobScheduler
class CrawlScheduler:
@@ -16,12 +17,16 @@ class CrawlScheduler:
爬虫定时任务调度器
定时执行简历爬取任务
集成新的Job模块账号同步任务和简历处理任务
"""
def __init__(self):
self.scheduler: Optional[AsyncIOScheduler] = None
self.app = None
self._running = False
# 新的Job调度器
self.job_scheduler: Optional[JobScheduler] = None
def start(self):
"""启动调度器"""
@@ -37,9 +42,30 @@ class CrawlScheduler:
print(f"[{datetime.now()}] 爬虫调度器已启动")
# 注册定时任务
# 启动新的Job调度器如果应用支持
self._start_job_scheduler()
# 注册旧版定时任务(保持兼容)
self._register_jobs()
def _start_job_scheduler(self):
"""启动新的Job调度器"""
try:
# 创建Job实例
account_sync_job = AccountSyncJob(self.app.recruiter_service)
resume_process_job = ResumeProcessJob(
self.app.recruiter_service,
self.app.ingestion_service
)
# 创建并启动Job调度器
self.job_scheduler = JobScheduler(account_sync_job, resume_process_job)
self.job_scheduler.start()
print(f"[{datetime.now()}] Job调度器已启动账号同步 + 简历处理)")
except Exception as e:
print(f"[{datetime.now()}] 启动Job调度器失败: {e}")
def _register_jobs(self):
"""注册定时任务"""
# 每30分钟爬取一次 Boss 直聘
@@ -147,8 +173,13 @@ class CrawlScheduler:
"""停止调度器"""
if self.scheduler:
self.scheduler.shutdown()
self._running = False
print(f"[{datetime.now()}] 爬虫调度器已停止")
# 停止Job调度器
if self.job_scheduler:
self.job_scheduler.stop()
self._running = False
print(f"[{datetime.now()}] 爬虫调度器已停止")
def pause_job(self, job_id: str):
"""暂停指定任务"""
@@ -164,17 +195,58 @@ class CrawlScheduler:
def get_jobs(self):
"""获取所有任务"""
jobs = []
# 获取旧版任务
if self.scheduler:
return [
jobs.extend([
{
"id": job.id,
"name": job.name,
"next_run_time": job.next_run_time.isoformat() if job.next_run_time else None,
"trigger": str(job.trigger)
"trigger": str(job.trigger),
"type": "legacy"
}
for job in self.scheduler.get_jobs()
]
])
# 获取新版Job任务
if self.job_scheduler:
jobs.extend([
{
"id": job["id"],
"name": job["name"],
"next_run_time": job["next_run_time"],
"trigger": job["trigger"],
"type": "job"
}
for job in self.job_scheduler.get_scheduled_jobs()
])
return jobs
def get_job_status(self, job_id: Optional[str] = None):
"""
获取Job任务状态
Args:
job_id: Job任务ID为None时返回所有Job任务状态
"""
if self.job_scheduler:
return self.job_scheduler.get_job_status(job_id)
return []
async def run_job_now(self, job_id: str):
"""
立即执行指定的Job任务
Args:
job_id: Job任务ID (account_sync 或 resume_process)
"""
if self.job_scheduler:
await self.job_scheduler.run_job_now(job_id)
else:
print(f"[{datetime.now()}] Job调度器未启动")
# 全局调度器实例