feat(job): 增加职位评价方案及管理接口

- 在数据库jobs表新增evaluation_schema_id字段及外键约束
- ORM模型JobModel新增evaluation_schema_id关联字段
- 扩展配置项,增加评分阈值相关参数及简历处理配置
- 新增职位管理路由job_router,支持职位CRUD及评价方案关联功能
- 职位API支持列表查询、筛选、详情查看、创建、更新、删除及评价方案绑定和获取
- recruiter路由统一响应格式为BaseResponse,完善异常处理及提示信息
- scheduler路由任务接口统一响应格式,新增单个任务状态查询接口
- scheduler路由中支持任务立即运行、暂停、恢复和配置更新操作,增加异常捕获和提示
- 其他内部细节优化API异常处理及返回统一格式
This commit is contained in:
2026-03-24 19:09:11 +08:00
parent 1343561979
commit 6f1f438159
18 changed files with 1966 additions and 281 deletions

View File

@@ -95,6 +95,7 @@ CREATE TABLE IF NOT EXISTS jobs (
source VARCHAR(32) NOT NULL, -- BOSS, LIEPIN, etc.
source_id VARCHAR(128) NOT NULL,
recruiter_id VARCHAR(64), -- 关联的招聘者账号ID
evaluation_schema_id VARCHAR(64), -- 关联的评价方案ID
title VARCHAR(256) NOT NULL,
department VARCHAR(128),
location VARCHAR(128),
@@ -114,6 +115,7 @@ CREATE TABLE IF NOT EXISTS jobs (
UNIQUE KEY uk_source_source_id (source, source_id),
INDEX idx_status (status),
INDEX idx_recruiter_id (recruiter_id),
INDEX idx_evaluation_schema_id (evaluation_schema_id),
INDEX idx_last_sync_at (last_sync_at),
FOREIGN KEY (recruiter_id) REFERENCES recruiters(id) ON DELETE SET NULL
);
@@ -200,6 +202,7 @@ ON DUPLICATE KEY UPDATE
dimensions = VALUES(dimensions),
weights = VALUES(weights);
INSERT INTO evaluation_schemas (id, name, description, dimensions, weights) VALUES
('java_backend', 'Java后端工程师评价方案', '针对Java后端开发岗位的综合评价方案',
'[

View File

@@ -86,6 +86,7 @@ class JobModel(Base):
source = Column(String(32), nullable=False)
source_id = Column(String(128), nullable=False)
recruiter_id = Column(String(64), ForeignKey('recruiters.id'))
evaluation_schema_id = Column(String(64), ForeignKey('evaluation_schemas.id')) # 关联的评价方案ID
title = Column(String(256), nullable=False)
department = Column(String(128))
location = Column(String(128))

View File

@@ -44,6 +44,20 @@ class Settings(BaseSettings):
notify_email_from: Optional[str] = Field(default=None, description="发件人地址")
notify_email_to: Optional[str] = Field(default=None, description="收件人地址")
# 评分阈值配置 (前缀: SCORE_)
score_threshold_greet: float = Field(default=70.0, description="触发打招呼的最低分数")
score_threshold_notify: float = Field(default=70.0, description="触发HR通知的最低分数")
auto_greet_enabled: bool = Field(default=True, description="是否启用自动打招呼")
auto_notify_enabled: bool = Field(default=True, description="是否启用自动通知HR")
greet_message_template: str = Field(
default="您好,我们对您的简历很感兴趣,期待与您进一步交流!",
description="打招呼消息模板"
)
# 简历处理配置 (前缀: RESUME_)
resume_process_delay: float = Field(default=1.0, description="处理每个候选人后的延迟秒数,避免请求过快")
resume_max_per_job: int = Field(default=20, description="每个职位最多处理的候选人数量")
class Config:
env_file = ".env"
env_file_encoding = "utf-8"

View File

@@ -10,7 +10,7 @@ FastAPI主应用入口
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from .routes import recruiter_router, scheduler_router, candidate_router
from .routes import recruiter_router, scheduler_router, candidate_router, job_router
from .routes.system import router as system_router
@@ -46,6 +46,9 @@ def create_app() -> FastAPI:
# 候选人管理路由
app.include_router(candidate_router)
# 职位管理路由
app.include_router(job_router)
return app

View File

@@ -29,9 +29,16 @@ except ImportError:
from fastapi import APIRouter
candidate_router = APIRouter()
try:
from .job import router as job_router
except ImportError:
from fastapi import APIRouter
job_router = APIRouter()
__all__ = [
"recruiter_router",
"scheduler_router",
"system_router",
"candidate_router"
"candidate_router",
"job_router"
]

View File

@@ -0,0 +1,409 @@
"""
职位管理 API 路由
提供职位CRUD、关联评价方案等功能
"""
from typing import Optional
from fastapi import APIRouter, Query
from ..schemas import (
BaseResponse,
JobPositionResponse, JobPositionListResponse,
JobPositionCreateRequest, JobPositionUpdateRequest,
JobPositionFilterRequest, JobBindSchemaRequest,
EvaluationSchemaResponse, EvaluationSchemaListResponse
)
from ...domain.job import Job
from ...domain.enums import CandidateSource
from ...mapper.job_mapper import JobMapper
from ...mapper.evaluation_mapper import EvaluationMapper
router = APIRouter(prefix="/api/jobs", tags=["职位管理"])
def _job_to_response(job: Job) -> JobPositionResponse:
"""将领域实体转换为响应模型"""
return JobPositionResponse(
id=job.id,
title=job.title,
source=job.source.value if job.source else "",
status=job.status.value if job.status else "",
recruiter_id=job.recruiter_id,
evaluation_schema_id=job.evaluation_schema_id,
department=job.department,
location=job.location,
salary_min=job.salary_min,
salary_max=job.salary_max,
requirements=job.requirements,
description=job.description,
candidate_count=job.candidate_count,
new_candidate_count=job.new_candidate_count,
last_sync_at=job.last_sync_at,
created_at=job.created_at,
updated_at=job.updated_at
)
@router.get("", response_model=BaseResponse[JobPositionListResponse])
async def list_jobs(
source: Optional[str] = Query(None, description="平台来源"),
recruiter_id: Optional[str] = Query(None, description="招聘者账号ID"),
evaluation_schema_id: Optional[str] = Query(None, description="评价方案ID"),
status: Optional[str] = Query(None, description="状态: ACTIVE, PAUSED, CLOSED, ARCHIVED"),
keyword: Optional[str] = Query(None, description="关键词搜索(标题/部门)"),
page: int = Query(1, ge=1, description="页码"),
page_size: int = Query(20, ge=1, le=100, description="每页数量")
):
"""
获取职位列表
Args:
source: 平台来源
recruiter_id: 招聘者账号ID
evaluation_schema_id: 评价方案ID
status: 职位状态
keyword: 关键词搜索
page: 页码
page_size: 每页数量
Returns:
BaseResponse[JobPositionListResponse]: 统一响应格式的职位列表
"""
try:
mapper = JobMapper()
jobs, total = mapper.find_filtered_jobs(
source=source,
recruiter_id=recruiter_id,
evaluation_schema_id=evaluation_schema_id,
status=status,
keyword=keyword,
page=page,
page_size=page_size
)
response = JobPositionListResponse(
total=total,
items=[_job_to_response(job) for job in jobs]
)
return BaseResponse.success(data=response, msg="获取职位列表成功")
except Exception as e:
return BaseResponse.error(msg=f"获取职位列表失败: {str(e)}")
@router.post("/filter", response_model=BaseResponse[JobPositionListResponse])
async def filter_jobs(request: JobPositionFilterRequest):
"""
筛选查询职位
支持多条件组合筛选
Returns:
BaseResponse[JobPositionListResponse]: 统一响应格式的职位列表
"""
try:
mapper = JobMapper()
jobs, total = mapper.find_filtered_jobs(
source=request.source,
recruiter_id=request.recruiter_id,
evaluation_schema_id=request.evaluation_schema_id,
status=request.status,
keyword=request.keyword,
page=request.page,
page_size=request.page_size
)
response = JobPositionListResponse(
total=total,
items=[_job_to_response(job) for job in jobs]
)
return BaseResponse.success(data=response, msg="筛选职位成功")
except Exception as e:
return BaseResponse.error(msg=f"筛选职位失败: {str(e)}")
@router.get("/{job_id}", response_model=BaseResponse[JobPositionResponse])
async def get_job_detail(job_id: str):
"""
获取职位详情
Args:
job_id: 职位ID
Returns:
BaseResponse[JobPositionResponse]: 统一响应格式的职位详情
"""
try:
mapper = JobMapper()
job = mapper.find_by_id(job_id)
if not job:
return BaseResponse.error(msg="职位不存在", code=404)
return BaseResponse.success(
data=_job_to_response(job),
msg="获取职位详情成功"
)
except Exception as e:
return BaseResponse.error(msg=f"获取职位详情失败: {str(e)}")
@router.post("", response_model=BaseResponse[JobPositionResponse])
async def create_job(request: JobPositionCreateRequest):
"""
创建职位
Args:
request: 创建职位请求
Returns:
BaseResponse[JobPositionResponse]: 统一响应格式的新建职位详情
"""
try:
from ...domain.job import JobStatus
import uuid
mapper = JobMapper()
# 验证source
try:
source = CandidateSource(request.source.lower())
except ValueError:
return BaseResponse.error(msg=f"无效的平台来源: {request.source}")
# 创建职位实体
job = Job(
id=str(uuid.uuid4()),
source=source,
source_id=request.source_id,
recruiter_id=request.recruiter_id,
evaluation_schema_id=request.evaluation_schema_id,
title=request.title,
department=request.department,
location=request.location,
salary_min=request.salary_min,
salary_max=request.salary_max,
requirements=request.requirements,
description=request.description,
status=JobStatus.ACTIVE
)
# 保存到数据库
saved_job = mapper.save(job)
return BaseResponse.success(
data=_job_to_response(saved_job),
msg="职位创建成功"
)
except Exception as e:
return BaseResponse.error(msg=f"创建职位失败: {str(e)}")
@router.put("/{job_id}", response_model=BaseResponse[JobPositionResponse])
async def update_job(job_id: str, request: JobPositionUpdateRequest):
"""
更新职位
Args:
job_id: 职位ID
request: 更新职位请求
Returns:
BaseResponse[JobPositionResponse]: 统一响应格式的更新后职位详情
"""
try:
from ...domain.job import JobStatus
mapper = JobMapper()
job = mapper.find_by_id(job_id)
if not job:
return BaseResponse.error(msg="职位不存在", code=404)
# 更新字段
if request.title is not None:
job.title = request.title
if request.evaluation_schema_id is not None:
job.evaluation_schema_id = request.evaluation_schema_id
if request.department is not None:
job.department = request.department
if request.location is not None:
job.location = request.location
if request.salary_min is not None:
job.salary_min = request.salary_min
if request.salary_max is not None:
job.salary_max = request.salary_max
if request.requirements is not None:
job.requirements = request.requirements
if request.description is not None:
job.description = request.description
if request.status is not None:
try:
job.status = JobStatus(request.status.upper())
except ValueError:
return BaseResponse.error(msg=f"无效的状态: {request.status}")
# 保存更新
updated_job = mapper.save(job)
return BaseResponse.success(
data=_job_to_response(updated_job),
msg="职位更新成功"
)
except Exception as e:
return BaseResponse.error(msg=f"更新职位失败: {str(e)}")
@router.delete("/{job_id}", response_model=BaseResponse[dict])
async def delete_job(job_id: str):
"""
删除职位
Args:
job_id: 职位ID
Returns:
BaseResponse[dict]: 统一响应格式
"""
try:
mapper = JobMapper()
job = mapper.find_by_id(job_id)
if not job:
return BaseResponse.error(msg="职位不存在", code=404)
# 软删除将状态改为ARCHIVED
from ...domain.job import JobStatus
job.status = JobStatus.ARCHIVED
mapper.save(job)
return BaseResponse.success(
data={"deleted_id": job_id},
msg="职位删除成功"
)
except Exception as e:
return BaseResponse.error(msg=f"删除职位失败: {str(e)}")
@router.post("/{job_id}/bind-schema", response_model=BaseResponse[JobPositionResponse])
async def bind_evaluation_schema(job_id: str, request: JobBindSchemaRequest):
"""
职位关联评价方案
Args:
job_id: 职位ID
request: 关联评价方案请求
Returns:
BaseResponse[JobPositionResponse]: 统一响应格式的更新后职位详情
"""
try:
job_mapper = JobMapper()
schema_mapper = EvaluationMapper()
# 验证职位存在
job = job_mapper.find_by_id(job_id)
if not job:
return BaseResponse.error(msg="职位不存在", code=404)
# 验证评价方案存在
schema = schema_mapper.find_schema_by_id(request.evaluation_schema_id)
if not schema:
return BaseResponse.error(msg="评价方案不存在", code=404)
# 更新关联
job.evaluation_schema_id = request.evaluation_schema_id
updated_job = job_mapper.save(job)
return BaseResponse.success(
data=_job_to_response(updated_job),
msg=f"职位已成功关联评价方案: {schema.name}"
)
except Exception as e:
return BaseResponse.error(msg=f"关联评价方案失败: {str(e)}")
@router.get("/{job_id}/schema", response_model=BaseResponse[EvaluationSchemaResponse])
async def get_job_evaluation_schema(job_id: str):
"""
获取职位关联的评价方案
Args:
job_id: 职位ID
Returns:
BaseResponse[EvaluationSchemaResponse]: 统一响应格式的评价方案详情
"""
try:
job_mapper = JobMapper()
schema_mapper = EvaluationMapper()
# 验证职位存在
job = job_mapper.find_by_id(job_id)
if not job:
return BaseResponse.error(msg="职位不存在", code=404)
# 检查是否有关联的评价方案
if not job.evaluation_schema_id:
return BaseResponse.error(msg="该职位未关联评价方案", code=404)
# 获取评价方案
schema = schema_mapper.find_schema_by_id(job.evaluation_schema_id)
if not schema:
return BaseResponse.error(msg="关联的评价方案不存在", code=404)
response = EvaluationSchemaResponse(
id=schema.id,
name=schema.name,
description=schema.description,
dimensions=schema.dimensions,
weights=schema.weights,
is_default=schema.is_default,
created_at=schema.created_at,
updated_at=schema.updated_at
)
return BaseResponse.success(data=response, msg="获取评价方案成功")
except Exception as e:
return BaseResponse.error(msg=f"获取评价方案失败: {str(e)}")
@router.get("/schemas/list", response_model=BaseResponse[EvaluationSchemaListResponse])
async def list_evaluation_schemas(
page: int = Query(1, ge=1, description="页码"),
page_size: int = Query(20, ge=1, le=100, description="每页数量")
):
"""
获取评价方案列表
Args:
page: 页码
page_size: 每页数量
Returns:
BaseResponse[EvaluationSchemaListResponse]: 统一响应格式的评价方案列表
"""
try:
schema_mapper = EvaluationMapper()
schemas, total = schema_mapper.find_all_schemas(page=page, page_size=page_size)
items = [
EvaluationSchemaResponse(
id=schema.id,
name=schema.name,
description=schema.description,
dimensions=schema.dimensions,
weights=schema.weights,
is_default=schema.is_default,
created_at=schema.created_at,
updated_at=schema.updated_at
)
for schema in schemas
]
response = EvaluationSchemaListResponse(total=total, items=items)
return BaseResponse.success(data=response, msg="获取评价方案列表成功")
except Exception as e:
return BaseResponse.error(msg=f"获取评价方案列表失败: {str(e)}")

View File

@@ -6,13 +6,13 @@
from typing import List, Optional
from datetime import datetime
from fastapi import APIRouter, HTTPException, Depends
from fastapi import APIRouter, Depends
from ..schemas import (
RecruiterCreate, RecruiterRegister, RecruiterUpdate,
RecruiterResponse, RecruiterListResponse, RecruiterRegisterResponse,
RecruiterPrivilegeInfo, RecruiterSyncInfo, RecruiterSourceInfo,
APIResponse
BaseResponse
)
from ...domain.candidate import CandidateSource
from ...domain.recruiter import Recruiter, RecruiterStatus
@@ -68,10 +68,10 @@ def _build_recruiter_response(recruiter: Recruiter) -> RecruiterResponse:
)
@router.get("/sources", response_model=List[RecruiterSourceInfo])
@router.get("/sources", response_model=BaseResponse[List[RecruiterSourceInfo]])
async def get_recruiter_sources():
"""获取支持的平台来源列表"""
return [
sources = [
RecruiterSourceInfo(
value="boss",
label="Boss直聘",
@@ -88,9 +88,10 @@ async def get_recruiter_sources():
description="智联招聘平台(预留)"
)
]
return BaseResponse.success(data=sources, msg="获取平台来源列表成功")
@router.get("", response_model=RecruiterListResponse)
@router.get("", response_model=BaseResponse[RecruiterListResponse])
async def list_recruiters(
source: Optional[str] = None,
service: RecruiterService = Depends(get_recruiter_service)
@@ -101,54 +102,64 @@ async def list_recruiters(
Args:
source: 按平台筛选 (boss, liepin, etc.)
"""
if source:
try:
candidate_source = CandidateSource(source.lower())
recruiters = service.list_recruiters(candidate_source)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid source: {source}")
else:
recruiters = service.list_recruiters()
items = [_build_recruiter_response(r) for r in recruiters]
return RecruiterListResponse(total=len(items), items=items)
try:
if source:
try:
candidate_source = CandidateSource(source.lower())
recruiters = service.list_recruiters(candidate_source)
except ValueError:
return BaseResponse.error(msg=f"无效的来源: {source}", code=400)
else:
recruiters = service.list_recruiters()
items = [_build_recruiter_response(r) for r in recruiters]
response = RecruiterListResponse(total=len(items), items=items)
return BaseResponse.success(data=response, msg="获取招聘者列表成功")
except Exception as e:
return BaseResponse.error(msg=f"获取招聘者列表失败: {str(e)}")
@router.get("/{recruiter_id}", response_model=RecruiterResponse)
@router.get("/{recruiter_id}", response_model=BaseResponse[RecruiterResponse])
async def get_recruiter(
recruiter_id: str,
service: RecruiterService = Depends(get_recruiter_service)
):
"""获取单个招聘者账号详情"""
recruiter = service.get_recruiter(recruiter_id)
if not recruiter:
raise HTTPException(status_code=404, detail="Recruiter not found")
return _build_recruiter_response(recruiter)
try:
recruiter = service.get_recruiter(recruiter_id)
if not recruiter:
return BaseResponse.error(msg="招聘者不存在", code=404)
return BaseResponse.success(data=_build_recruiter_response(recruiter), msg="获取招聘者详情成功")
except Exception as e:
return BaseResponse.error(msg=f"获取招聘者详情失败: {str(e)}")
@router.post("", response_model=RecruiterResponse)
@router.post("", response_model=BaseResponse[RecruiterResponse])
async def create_recruiter(
data: RecruiterCreate,
service: RecruiterService = Depends(get_recruiter_service)
):
"""手动创建招聘者账号"""
try:
source = CandidateSource(data.source.lower())
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid source: {data.source}")
recruiter = service.add_recruiter(
name=data.name,
source=source,
wt_token=data.wt_token
)
return _build_recruiter_response(recruiter)
try:
source = CandidateSource(data.source.lower())
except ValueError:
return BaseResponse.error(msg=f"无效的来源: {data.source}", code=400)
recruiter = service.add_recruiter(
name=data.name,
source=source,
wt_token=data.wt_token
)
return BaseResponse.success(data=_build_recruiter_response(recruiter), msg="创建招聘者成功")
except Exception as e:
return BaseResponse.error(msg=f"创建招聘者失败: {str(e)}")
@router.post("/register", response_model=RecruiterRegisterResponse)
@router.post("/register", response_model=BaseResponse[RecruiterRegisterResponse])
async def register_recruiter(
data: RecruiterRegister,
service: RecruiterService = Depends(get_recruiter_service)
@@ -158,20 +169,17 @@ async def register_recruiter(
根据Token自动获取账号信息并注册
"""
# 验证平台来源
try:
source = CandidateSource(data.source.lower())
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid source: {data.source}")
# 目前仅支持Boss平台
if source != CandidateSource.BOSS:
raise HTTPException(
status_code=400,
detail="Auto register only supports 'boss' source currently"
)
try:
# 验证平台来源
try:
source = CandidateSource(data.source.lower())
except ValueError:
return BaseResponse.error(msg=f"无效的来源: {data.source}", code=400)
# 目前仅支持Boss平台
if source != CandidateSource.BOSS:
return BaseResponse.error(msg="自动注册仅支持Boss平台", code=400)
# 1. 使用Token创建临时爬虫获取账号信息
crawler = BossCrawler(wt_token=data.wt_token)
@@ -179,10 +187,7 @@ async def register_recruiter(
if hasattr(crawler.client, 'check_login_status'):
is_valid = crawler.client.check_login_status()
if not is_valid:
return RecruiterRegisterResponse(
success=False,
message="Token无效或账号已过期请检查Token是否正确"
)
return BaseResponse.error(msg="Token无效或账号已过期请检查Token是否正确")
# 3. 获取账号信息
account_info = None
@@ -190,10 +195,7 @@ async def register_recruiter(
account_info = crawler.client.get_account_info()
if not account_info:
return RecruiterRegisterResponse(
success=False,
message="无法获取账号信息请检查Token是否有效"
)
return BaseResponse.error(msg="无法获取账号信息请检查Token是否有效")
# 4. 提取账号名称
account_name = ""
@@ -212,11 +214,12 @@ async def register_recruiter(
existing_recruiters = service.list_recruiters(source)
for existing in existing_recruiters:
if existing.wt_token == data.wt_token:
return RecruiterRegisterResponse(
response_data = RecruiterRegisterResponse(
success=False,
message=f"该Token已注册为账号: {existing.name}",
recruiter=_build_recruiter_response(existing)
)
return BaseResponse.success(data=response_data, msg="账号已存在")
# 6. 创建账号
recruiter = service.add_recruiter(
@@ -239,94 +242,98 @@ async def register_recruiter(
# 权益信息获取失败不影响注册
print(f"[Register] 获取权益信息失败: {e}")
return RecruiterRegisterResponse(
response_data = RecruiterRegisterResponse(
success=True,
message=f"账号注册成功: {account_name}",
recruiter=_build_recruiter_response(recruiter)
)
return BaseResponse.success(data=response_data, msg="账号注册成功")
except ImportError as e:
return RecruiterRegisterResponse(
success=False,
message=f"SDK未安装或导入失败: {str(e)}"
)
return BaseResponse.error(msg=f"SDK未安装或导入失败: {str(e)}")
except Exception as e:
return RecruiterRegisterResponse(
success=False,
message=f"注册失败: {str(e)}"
)
return BaseResponse.error(msg=f"注册失败: {str(e)}")
@router.put("/{recruiter_id}", response_model=RecruiterResponse)
@router.put("/{recruiter_id}", response_model=BaseResponse[RecruiterResponse])
async def update_recruiter(
recruiter_id: str,
data: RecruiterUpdate,
service: RecruiterService = Depends(get_recruiter_service)
):
"""更新招聘者账号"""
recruiter = service.get_recruiter(recruiter_id)
if not recruiter:
raise HTTPException(status_code=404, detail="Recruiter not found")
# 更新字段
if data.name:
recruiter.name = data.name
if data.wt_token:
recruiter.wt_token = data.wt_token
if data.status:
try:
recruiter.status = RecruiterStatus(data.status.lower())
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid status: {data.status}"
)
updated = service.mapper.save(recruiter)
return _build_recruiter_response(updated)
try:
recruiter = service.get_recruiter(recruiter_id)
if not recruiter:
return BaseResponse.error(msg="招聘者不存在", code=404)
# 更新字段
if data.name:
recruiter.name = data.name
if data.wt_token:
recruiter.wt_token = data.wt_token
if data.status:
try:
recruiter.status = RecruiterStatus(data.status.lower())
except ValueError:
return BaseResponse.error(msg=f"无效的状态: {data.status}", code=400)
updated = service.mapper.save(recruiter)
return BaseResponse.success(data=_build_recruiter_response(updated), msg="更新招聘者成功")
except Exception as e:
return BaseResponse.error(msg=f"更新招聘者失败: {str(e)}")
@router.delete("/{recruiter_id}")
@router.delete("/{recruiter_id}", response_model=BaseResponse[dict])
async def delete_recruiter(
recruiter_id: str,
service: RecruiterService = Depends(get_recruiter_service)
):
"""删除招聘者账号"""
success = service.delete_recruiter(recruiter_id)
if not success:
raise HTTPException(status_code=404, detail="Recruiter not found")
return APIResponse(success=True, message="Recruiter deleted successfully")
try:
success = service.delete_recruiter(recruiter_id)
if not success:
return BaseResponse.error(msg="招聘者不存在", code=404)
return BaseResponse.success(data={"deleted_id": recruiter_id}, msg="删除招聘者成功")
except Exception as e:
return BaseResponse.error(msg=f"删除招聘者失败: {str(e)}")
@router.post("/{recruiter_id}/activate")
@router.post("/{recruiter_id}/activate", response_model=BaseResponse[dict])
async def activate_recruiter(
recruiter_id: str,
service: RecruiterService = Depends(get_recruiter_service)
):
"""启用招聘者账号"""
success = service.activate_recruiter(recruiter_id)
if not success:
raise HTTPException(status_code=404, detail="Recruiter not found")
return APIResponse(success=True, message="Recruiter activated")
try:
success = service.activate_recruiter(recruiter_id)
if not success:
return BaseResponse.error(msg="招聘者不存在", code=404)
return BaseResponse.success(data={"recruiter_id": recruiter_id}, msg="启用招聘者成功")
except Exception as e:
return BaseResponse.error(msg=f"启用招聘者失败: {str(e)}")
@router.post("/{recruiter_id}/deactivate")
@router.post("/{recruiter_id}/deactivate", response_model=BaseResponse[dict])
async def deactivate_recruiter(
recruiter_id: str,
service: RecruiterService = Depends(get_recruiter_service)
):
"""停用招聘者账号"""
success = service.deactivate_recruiter(recruiter_id)
if not success:
raise HTTPException(status_code=404, detail="Recruiter not found")
return APIResponse(success=True, message="Recruiter deactivated")
try:
success = service.deactivate_recruiter(recruiter_id)
if not success:
return BaseResponse.error(msg="招聘者不存在", code=404)
return BaseResponse.success(data={"recruiter_id": recruiter_id}, msg="停用招聘者成功")
except Exception as e:
return BaseResponse.error(msg=f"停用招聘者失败: {str(e)}")
@router.post("/{recruiter_id}/sync")
@router.post("/{recruiter_id}/sync", response_model=BaseResponse[dict])
async def sync_recruiter(
recruiter_id: str,
service: RecruiterService = Depends(get_recruiter_service)
@@ -336,14 +343,17 @@ async def sync_recruiter(
触发账号状态、权益信息、职位列表的同步
"""
recruiter = service.get_recruiter(recruiter_id)
if not recruiter:
raise HTTPException(status_code=404, detail="Recruiter not found")
# TODO: 触发同步任务
# 可以调用 AccountSyncJob 来执行同步
return APIResponse(
success=True,
message=f"Sync triggered for recruiter: {recruiter.name}"
)
try:
recruiter = service.get_recruiter(recruiter_id)
if not recruiter:
return BaseResponse.error(msg="招聘者不存在", code=404)
# TODO: 触发同步任务
# 可以调用 AccountSyncJob 来执行同步
return BaseResponse.success(
data={"recruiter_id": recruiter_id, "recruiter_name": recruiter.name},
msg=f"已触发账号同步: {recruiter.name}"
)
except Exception as e:
return BaseResponse.error(msg=f"同步账号失败: {str(e)}")

View File

@@ -5,11 +5,11 @@
"""
from typing import List, Optional
from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks
from fastapi import APIRouter, Depends, BackgroundTasks
from ..schemas import (
JobInfo, JobStatusInfo, JobConfigUpdate,
APIResponse
BaseResponse
)
from ...service.scheduler import get_scheduler
@@ -22,32 +22,68 @@ def get_scheduler_instance():
return get_scheduler()
@router.get("/jobs", response_model=List[JobInfo])
@router.get("/jobs", response_model=BaseResponse[List[JobInfo]])
async def list_jobs(
scheduler=Depends(get_scheduler_instance)
):
"""获取所有定时任务列表"""
jobs = scheduler.get_jobs()
return [
JobInfo(
id=job["id"],
name=job["name"],
next_run_time=job.get("next_run_time"),
trigger=job["trigger"],
type=job.get("type")
)
for job in jobs
]
try:
jobs = scheduler.get_jobs()
items = [
JobInfo(
id=job["id"],
name=job["name"],
next_run_time=job.get("next_run_time"),
trigger=job["trigger"],
type=job.get("type")
)
for job in jobs
]
return BaseResponse.success(data=items, msg="获取任务列表成功")
except Exception as e:
return BaseResponse.error(msg=f"获取任务列表失败: {str(e)}")
@router.get("/jobs/status", response_model=List[JobStatusInfo])
@router.get("/jobs/status", response_model=BaseResponse[List[JobStatusInfo]])
async def get_jobs_status(
scheduler=Depends(get_scheduler_instance)
):
"""获取所有Job任务状态"""
status_list = scheduler.get_job_status()
return [
JobStatusInfo(
try:
status_list = scheduler.get_job_status()
items = [
JobStatusInfo(
job_id=s["job_id"],
name=s["name"],
enabled=s["enabled"],
is_running=s["is_running"],
last_run_time=s.get("last_run_time"),
next_run_time=s.get("next_run_time"),
run_count=s["run_count"],
success_count=s["success_count"],
fail_count=s["fail_count"],
last_error=s.get("last_error")
)
for s in status_list
]
return BaseResponse.success(data=items, msg="获取任务状态成功")
except Exception as e:
return BaseResponse.error(msg=f"获取任务状态失败: {str(e)}")
@router.get("/jobs/{job_id}/status", response_model=BaseResponse[JobStatusInfo])
async def get_job_status(
job_id: str,
scheduler=Depends(get_scheduler_instance)
):
"""获取指定Job任务状态"""
try:
status_list = scheduler.get_job_status(job_id)
if not status_list:
return BaseResponse.error(msg="任务不存在", code=404)
s = status_list[0]
data = JobStatusInfo(
job_id=s["job_id"],
name=s["name"],
enabled=s["enabled"],
@@ -59,36 +95,12 @@ async def get_jobs_status(
fail_count=s["fail_count"],
last_error=s.get("last_error")
)
for s in status_list
]
return BaseResponse.success(data=data, msg="获取任务状态成功")
except Exception as e:
return BaseResponse.error(msg=f"获取任务状态失败: {str(e)}")
@router.get("/jobs/{job_id}/status", response_model=JobStatusInfo)
async def get_job_status(
job_id: str,
scheduler=Depends(get_scheduler_instance)
):
"""获取指定Job任务状态"""
status_list = scheduler.get_job_status(job_id)
if not status_list:
raise HTTPException(status_code=404, detail="Job not found")
s = status_list[0]
return JobStatusInfo(
job_id=s["job_id"],
name=s["name"],
enabled=s["enabled"],
is_running=s["is_running"],
last_run_time=s.get("last_run_time"),
next_run_time=s.get("next_run_time"),
run_count=s["run_count"],
success_count=s["success_count"],
fail_count=s["fail_count"],
last_error=s.get("last_error")
)
@router.post("/jobs/{job_id}/run")
@router.post("/jobs/{job_id}/run", response_model=BaseResponse[dict])
async def run_job_now(
job_id: str,
background_tasks: BackgroundTasks,
@@ -100,21 +112,24 @@ async def run_job_now(
Args:
job_id: 任务ID (account_sync 或 resume_process)
"""
# 验证任务是否存在
valid_jobs = ["account_sync", "resume_process", "crawl_boss", "analyze_pending"]
if job_id not in valid_jobs:
raise HTTPException(status_code=400, detail=f"Invalid job_id. Valid: {valid_jobs}")
# 在后台执行
background_tasks.add_task(scheduler.run_job_now, job_id)
return APIResponse(
success=True,
message=f"Job {job_id} is running in background"
)
try:
# 验证任务是否存在
valid_jobs = ["account_sync", "resume_process", "crawl_boss", "analyze_pending"]
if job_id not in valid_jobs:
return BaseResponse.error(msg=f"无效的任务ID有效值: {valid_jobs}", code=400)
# 在后台执行
background_tasks.add_task(scheduler.run_job_now, job_id)
return BaseResponse.success(
data={"job_id": job_id},
msg=f"任务 {job_id} 已在后台开始执行"
)
except Exception as e:
return BaseResponse.error(msg=f"执行任务失败: {str(e)}")
@router.post("/jobs/{job_id}/pause")
@router.post("/jobs/{job_id}/pause", response_model=BaseResponse[dict])
async def pause_job(
job_id: str,
scheduler=Depends(get_scheduler_instance)
@@ -122,12 +137,12 @@ async def pause_job(
"""暂停指定任务"""
try:
scheduler.pause_job(job_id)
return APIResponse(success=True, message=f"Job {job_id} paused")
return BaseResponse.success(data={"job_id": job_id}, msg=f"任务 {job_id} 已暂停")
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
return BaseResponse.error(msg=f"暂停任务失败: {str(e)}")
@router.post("/jobs/{job_id}/resume")
@router.post("/jobs/{job_id}/resume", response_model=BaseResponse[dict])
async def resume_job(
job_id: str,
scheduler=Depends(get_scheduler_instance)
@@ -135,12 +150,12 @@ async def resume_job(
"""恢复指定任务"""
try:
scheduler.resume_job(job_id)
return APIResponse(success=True, message=f"Job {job_id} resumed")
return BaseResponse.success(data={"job_id": job_id}, msg=f"任务 {job_id} 已恢复")
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
return BaseResponse.error(msg=f"恢复任务失败: {str(e)}")
@router.put("/jobs/{job_id}/config")
@router.put("/jobs/{job_id}/config", response_model=BaseResponse[dict])
async def update_job_config(
job_id: str,
config: JobConfigUpdate,
@@ -153,75 +168,78 @@ async def update_job_config(
job_id: 任务ID
config: 配置更新
"""
# 验证任务ID
valid_job_ids = ["account_sync", "resume_process"]
if job_id not in valid_job_ids:
raise HTTPException(
status_code=400,
detail=f"Can only config Job module tasks: {valid_job_ids}"
)
# 构建更新参数
update_kwargs = {}
if config.enabled is not None:
update_kwargs["enabled"] = config.enabled
if config.interval_minutes is not None:
update_kwargs["interval_minutes"] = config.interval_minutes
if not update_kwargs:
raise HTTPException(status_code=400, detail="No config to update")
try:
# 验证任务ID
valid_job_ids = ["account_sync", "resume_process"]
if job_id not in valid_job_ids:
return BaseResponse.error(
msg=f"只能配置Job模块任务: {valid_job_ids}",
code=400
)
# 构建更新参数
update_kwargs = {}
if config.enabled is not None:
update_kwargs["enabled"] = config.enabled
if config.interval_minutes is not None:
update_kwargs["interval_minutes"] = config.interval_minutes
if not update_kwargs:
return BaseResponse.error(msg="没有要更新的配置", code=400)
scheduler.job_scheduler.update_job_config(job_id, **update_kwargs)
return APIResponse(
success=True,
message=f"Job {job_id} config updated",
data=update_kwargs
return BaseResponse.success(
data={"job_id": job_id, "config": update_kwargs},
msg=f"任务 {job_id} 配置已更新"
)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
return BaseResponse.error(msg=f"更新任务配置失败: {str(e)}")
@router.get("/status")
@router.get("/status", response_model=BaseResponse[dict])
async def get_scheduler_status(
scheduler=Depends(get_scheduler_instance)
):
"""获取调度器整体状态"""
jobs = scheduler.get_jobs()
job_status = scheduler.get_job_status()
return {
"running": scheduler._running if hasattr(scheduler, '_running') else False,
"total_jobs": len(jobs),
"job_scheduler_jobs": len([j for j in jobs if j.get("type") == "job"]),
"legacy_jobs": len([j for j in jobs if j.get("type") == "legacy"]),
"job_status_summary": {
"total": len(job_status),
"running": sum(1 for s in job_status if s["is_running"]),
"enabled": sum(1 for s in job_status if s["enabled"])
try:
jobs = scheduler.get_jobs()
job_status = scheduler.get_job_status()
data = {
"running": scheduler._running if hasattr(scheduler, '_running') else False,
"total_jobs": len(jobs),
"job_scheduler_jobs": len([j for j in jobs if j.get("type") == "job"]),
"legacy_jobs": len([j for j in jobs if j.get("type") == "legacy"]),
"job_status_summary": {
"total": len(job_status),
"running": sum(1 for s in job_status if s["is_running"]),
"enabled": sum(1 for s in job_status if s["enabled"])
}
}
}
return BaseResponse.success(data=data, msg="获取调度器状态成功")
except Exception as e:
return BaseResponse.error(msg=f"获取调度器状态失败: {str(e)}")
@router.post("/start")
@router.post("/start", response_model=BaseResponse[dict])
async def start_scheduler(
scheduler=Depends(get_scheduler_instance)
):
"""启动调度器"""
try:
scheduler.start()
return APIResponse(success=True, message="Scheduler started")
return BaseResponse.success(data={}, msg="调度器已启动")
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
return BaseResponse.error(msg=f"启动调度器失败: {str(e)}")
@router.post("/stop")
@router.post("/stop", response_model=BaseResponse[dict])
async def stop_scheduler(
scheduler=Depends(get_scheduler_instance)
):
"""停止调度器"""
try:
scheduler.stop()
return APIResponse(success=True, message="Scheduler stopped")
return BaseResponse.success(data={}, msg="调度器已停止")
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
return BaseResponse.error(msg=f"停止调度器失败: {str(e)}")

View File

@@ -5,49 +5,53 @@
"""
from fastapi import APIRouter
from ..schemas import APIResponse
from ..schemas import BaseResponse
router = APIRouter(tags=["系统"])
@router.get("/")
@router.get("/", response_model=BaseResponse[dict])
async def root():
"""API根路径"""
return {
data = {
"name": "简历智能体 API",
"version": "0.1.0",
"docs": "/docs",
"endpoints": {
"recruiters": "/api/recruiters",
"jobs": "/api/jobs",
"candidates": "/candidates",
"scheduler": "/api/scheduler",
"health": "/health"
}
}
return BaseResponse.success(data=data, msg="简历智能体 API 运行中")
@router.get("/health")
@router.get("/health", response_model=BaseResponse[dict])
async def health_check():
"""健康检查"""
return {
data = {
"status": "healthy",
"service": "hr-agent-api",
"version": "0.1.0"
}
return BaseResponse.success(data=data, msg="服务运行正常")
@router.get("/api/status")
@router.get("/api/status", response_model=BaseResponse[dict])
async def api_status():
"""获取API状态信息"""
return APIResponse(
success=True,
message="API is running",
data={
"version": "0.1.0",
"features": [
"recruiter_management",
"scheduler_management",
"auto_register"
]
}
)
data = {
"version": "0.1.0",
"features": [
"recruiter_management",
"job_management",
"candidate_management",
"scheduler_management",
"auto_register",
"llm_evaluation"
]
}
return BaseResponse.success(data=data, msg="API运行中")

View File

@@ -228,16 +228,89 @@ class CandidateUpdateScoreRequest(BaseModel):
# ============== 职位(预留) ==============
class JobPositionResponse(BaseModel):
"""职位响应(预留)"""
"""职位响应"""
id: str
title: str
source: str
status: str
recruiter_id: Optional[str] = None
evaluation_schema_id: Optional[str] = Field(None, description="关联的评价方案ID")
department: Optional[str] = None
location: Optional[str] = None
salary_min: Optional[int] = None
salary_max: Optional[int] = None
requirements: Optional[Dict[str, Any]] = None
description: Optional[str] = None
candidate_count: int = Field(default=0, description="候选人数量")
new_candidate_count: int = Field(default=0, description="新候选人数量")
last_sync_at: Optional[datetime] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class JobPositionListResponse(BaseModel):
"""职位列表响应(预留)"""
"""职位列表响应"""
total: int
items: List[JobPositionResponse]
class JobPositionCreateRequest(BaseModel):
"""创建职位请求"""
title: str = Field(..., description="职位标题")
source: str = Field(default="boss", description="平台来源: boss, liepin, etc.")
source_id: str = Field(..., description="来源平台ID")
recruiter_id: Optional[str] = Field(None, description="招聘者账号ID")
evaluation_schema_id: Optional[str] = Field(None, description="关联的评价方案ID")
department: Optional[str] = Field(None, description="部门")
location: Optional[str] = Field(None, description="地点")
salary_min: Optional[int] = Field(None, description="薪资下限(K)")
salary_max: Optional[int] = Field(None, description="薪资上限(K)")
requirements: Optional[Dict[str, Any]] = Field(None, description="职位要求JSON")
description: Optional[str] = Field(None, description="职位描述")
class JobPositionUpdateRequest(BaseModel):
"""更新职位请求"""
title: Optional[str] = Field(None, description="职位标题")
evaluation_schema_id: Optional[str] = Field(None, description="关联的评价方案ID")
department: Optional[str] = Field(None, description="部门")
location: Optional[str] = Field(None, description="地点")
salary_min: Optional[int] = Field(None, description="薪资下限(K)")
salary_max: Optional[int] = Field(None, description="薪资上限(K)")
requirements: Optional[Dict[str, Any]] = Field(None, description="职位要求JSON")
description: Optional[str] = Field(None, description="职位描述")
status: Optional[str] = Field(None, description="状态: ACTIVE, PAUSED, CLOSED, ARCHIVED")
class JobPositionFilterRequest(BaseModel):
"""职位筛选请求"""
source: Optional[str] = Field(None, description="平台来源")
recruiter_id: Optional[str] = Field(None, description="招聘者账号ID")
evaluation_schema_id: Optional[str] = Field(None, description="评价方案ID")
status: Optional[str] = Field(None, description="状态: ACTIVE, PAUSED, CLOSED, ARCHIVED")
keyword: Optional[str] = Field(None, description="关键词搜索(标题/部门)")
page: int = Field(default=1, ge=1, description="页码")
page_size: int = Field(default=20, ge=1, le=100, description="每页数量")
class JobBindSchemaRequest(BaseModel):
"""职位关联评价方案请求"""
evaluation_schema_id: str = Field(..., description="评价方案ID")
class EvaluationSchemaResponse(BaseModel):
"""评价方案响应"""
id: str
name: str
description: Optional[str] = None
dimensions: Optional[List[Dict[str, Any]]] = None
weights: Optional[Dict[str, float]] = None
is_default: bool = False
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class EvaluationSchemaListResponse(BaseModel):
"""评价方案列表响应"""
total: int
items: List[EvaluationSchemaResponse]

View File

@@ -32,6 +32,7 @@ class Job:
source: CandidateSource = CandidateSource.BOSS
source_id: str = ""
recruiter_id: Optional[str] = None # 关联的招聘者账号ID
evaluation_schema_id: Optional[str] = None # 关联的评价方案ID
# 职位信息
title: str = ""

View File

@@ -4,16 +4,18 @@
负责处理简历信息入库操作:
1. 遍历所有活跃账号的职位
2. 获取职位下的候选人列表
3. 获取候选人简历详情
4. 将简历信息入库
3. 获取候选人简历详情同步进行LLM评分
4. 将简历信息和评分结果入库
5. 超过阈值触发打招呼和HR通知
入库更新操作
- 插入/更新 candidates 表
- 插入/更新 resumes 表
架构特点
- 多账号并行处理使用asyncio.gather
- 每个账号内部串行处理候选人确保同一账号下操作的顺序性避免Boss平台检测
"""
import asyncio
from datetime import datetime
from typing import List, Optional, Dict, Any
from dataclasses import dataclass
from dataclasses import dataclass, field
from ..domain.candidate import Candidate, CandidateSource, CandidateStatus
from ..domain.resume import Resume
@@ -21,7 +23,9 @@ from ..domain.job import Job
from ..domain.recruiter import Recruiter, RecruiterStatus
from ..service.recruiter_service import RecruiterService
from ..service.ingestion.unified_ingestion_service import UnifiedIngestionService
from ..service.resume_evaluation_service import ResumeEvaluationService, ProcessResult
from ..service.crawler.base_crawler import BaseCrawler
from ..config.settings import Settings, get_settings
@dataclass
@@ -35,9 +39,13 @@ class ResumeProcessResult:
candidates_new: int = 0
candidates_updated: int = 0
candidates_failed: int = 0
candidates_greeted: int = 0 # 打招呼成功数
candidates_notified: int = 0 # 通知HR成功数
high_score_count: int = 0 # 高分候选人数量
success: bool = True
message: str = ""
timestamp: datetime = None
process_results: List[ProcessResult] = field(default_factory=list) # 详细处理结果
def __post_init__(self):
if self.timestamp is None:
@@ -48,62 +56,88 @@ class ResumeProcessJob:
"""
简历处理定时任务
定时执行简历爬取和入库:
定时执行简历爬取、LLM评分和入库:
- 获取所有活跃账号
- 遍历账号下的职位
- 获取候选人列表和简历详情
- 统一入库处理
- 多账号并行处理asyncio.gather
- 单账号内串行处理候选人(简历获取+LLM分析同步避免Boss检测
- 超阈值触发打招呼和HR通知
"""
def __init__(
self,
recruiter_service: RecruiterService,
ingestion_service: UnifiedIngestionService
ingestion_service: UnifiedIngestionService,
evaluation_service: Optional[ResumeEvaluationService] = None,
settings: Optional[Settings] = None
):
self.recruiter_service = recruiter_service
self.ingestion_service = ingestion_service
# 每个职位最多处理的候选人数量
self.max_candidates_per_job = 20
self.evaluation_service = evaluation_service
self.settings = settings or get_settings()
# 从配置中读取参数
self.max_candidates_per_job = self.settings.resume_max_per_job
self.process_delay = self.settings.resume_process_delay
async def execute(self) -> List[ResumeProcessResult]:
"""
执行简历处理任务
架构:
- 多个账号并行处理使用asyncio.gather
- 每个账号内部串行处理候选人(确保同一账号下操作的顺序性)
Returns:
List[ResumeProcessResult]: 各职位的处理结果
"""
print(f"[{datetime.now()}] 开始执行简历处理任务...")
results = []
print(f"[{datetime.now()}] 配置: 评分阈值={self.settings.score_threshold_greet}, "
f"自动打招呼={'ON' if self.settings.auto_greet_enabled else 'OFF'}, "
f"自动通知={'ON' if self.settings.auto_notify_enabled else 'OFF'}")
# 获取所有活跃且同步成功的账号
recruiters = self._get_eligible_recruiters()
if not recruiters:
print(f"[{datetime.now()}] 没有符合条件的招聘者账号需要ACTIVE状态且同步成功")
return results
return []
print(f"[{datetime.now()}] 找到 {len(recruiters)} 个符合条件的账号")
print(f"[{datetime.now()}] 找到 {len(recruiters)} 个符合条件的账号,开始并行处理...")
for recruiter in recruiters:
try:
job_results = await self._process_recruiter_jobs(recruiter)
results.extend(job_results)
except Exception as e:
print(f"[{datetime.now()}] 处理账号 {recruiter.name} 时发生异常: {e}")
# 多账号并行处理
tasks = [
self._process_recruiter_jobs_with_lock(recruiter)
for recruiter in recruiters
]
results_list = await asyncio.gather(*tasks, return_exceptions=True)
# 汇总结果
all_results = []
for i, result in enumerate(results_list):
if isinstance(result, Exception):
print(f"[{datetime.now()}] 账号 {recruiters[i].name} 处理异常: {result}")
elif isinstance(result, list):
all_results.extend(result)
# 统计结果
total_processed = sum(r.candidates_processed for r in results)
total_new = sum(r.candidates_new for r in results)
total_failed = sum(r.candidates_failed for r in results)
total_processed = sum(r.candidates_processed for r in all_results)
total_new = sum(r.candidates_new for r in all_results)
total_failed = sum(r.candidates_failed for r in all_results)
total_greeted = sum(r.candidates_greeted for r in all_results)
total_notified = sum(r.candidates_notified for r in all_results)
total_high_score = sum(r.high_score_count for r in all_results)
print(f"[{datetime.now()}] 简历处理任务完成:")
print(f" - 处理职位数: {len(results)}")
print(f" - 处理职位数: {len(all_results)}")
print(f" - 处理候选人数: {total_processed}")
print(f" - 新增候选人数: {total_new}")
print(f" - 高分候选人数: {total_high_score}")
print(f" - 打招呼成功数: {total_greeted}")
print(f" - HR通知成功数: {total_notified}")
print(f" - 失败数: {total_failed}")
return results
return all_results
def _get_eligible_recruiters(self) -> List[Recruiter]:
"""
@@ -136,9 +170,15 @@ class ResumeProcessJob:
return eligible_recruiters
async def _process_recruiter_jobs(self, recruiter: Recruiter) -> List[ResumeProcessResult]:
async def _process_recruiter_jobs_with_lock(
self,
recruiter: Recruiter
) -> List[ResumeProcessResult]:
"""
处理单个账号下的所有职位
处理单个账号下的所有职位(内部串行)
关键同一账号下的简历获取和LLM分析必须串行执行
避免Boss平台检测到异常操作模式
Args:
recruiter: 招聘者账号
@@ -165,7 +205,9 @@ class ResumeProcessJob:
for job in jobs:
try:
result = await self._process_single_job(recruiter, crawler, job)
result = await self._process_single_job_with_evaluation(
recruiter, crawler, job
)
results.append(result)
except Exception as e:
error_result = ResumeProcessResult(
@@ -184,6 +226,111 @@ class ResumeProcessJob:
return results
async def _process_single_job_with_evaluation(
self,
recruiter: Recruiter,
crawler: BaseCrawler,
job: Job
) -> ResumeProcessResult:
"""
处理单个职位的候选人集成LLM评分
Args:
recruiter: 招聘者账号
crawler: 爬虫实例
job: 职位
Returns:
ResumeProcessResult: 处理结果
"""
result = ResumeProcessResult(
job_id=job.id or "",
job_title=job.title,
recruiter_id=recruiter.id or "",
recruiter_name=recruiter.name
)
print(f"[{datetime.now()}] 处理职位: {job.title} (ID: {job.source_id}, "
f"评价方案: {job.evaluation_schema_id or 'general'})")
# 获取候选人列表
candidates = crawler.get_candidates(job.source_id, page=1)
if not candidates:
result.message = "该职位下没有候选人"
print(f"[{datetime.now()}] 职位 {job.title} 没有候选人")
return result
print(f"[{datetime.now()}] 职位 {job.title} 找到 {len(candidates)} 个候选人")
# 限制处理数量
candidates_to_process = candidates[:self.max_candidates_per_job]
# 串行处理每个候选人(关键:不能并行!)
for candidate in candidates_to_process:
try:
# 使用评分服务同步处理(获取简历 -> LLM分析 -> 入库 -> 触发动作)
if self.evaluation_service:
process_result = await self.evaluation_service.process_candidate_sync(
crawler=crawler,
candidate=candidate,
job=job,
schema_id=job.evaluation_schema_id
)
result.candidates_processed += 1
result.process_results.append(process_result)
if process_result.success:
result.candidates_new += 1
# 统计高分候选人
if process_result.overall_score >= self.settings.score_threshold_greet:
result.high_score_count += 1
# 统计打招呼和通知
if process_result.greeting_sent:
result.candidates_greeted += 1
if process_result.notification_sent:
result.candidates_notified += 1
else:
result.candidates_failed += 1
else:
# 回退到旧的处理方式(仅入库,不评分)
process_status = await self._process_single_candidate(
recruiter, crawler, job, candidate
)
result.candidates_processed += 1
if process_status == "new":
result.candidates_new += 1
elif process_status == "updated":
result.candidates_updated += 1
elif process_status == "failed":
result.candidates_failed += 1
# 添加延迟避免请求过快被Boss平台拦截
await asyncio.sleep(self.process_delay)
except Exception as e:
result.candidates_processed += 1
result.candidates_failed += 1
print(f"[{datetime.now()}] 处理候选人 {candidate.name} 失败: {e}")
result.success = result.candidates_failed == 0
result.message = (
f"处理完成: 新增{result.candidates_new}, "
f"高分{result.high_score_count}, "
f"打招呼{result.candidates_greeted}, "
f"通知{result.candidates_notified}, "
f"失败{result.candidates_failed}"
)
print(f"[{datetime.now()}] 职位 {job.title} 处理完成: {result.message}")
return result
async def _process_single_job(
self,
recruiter: Recruiter,
@@ -191,7 +338,9 @@ class ResumeProcessJob:
job: Job
) -> ResumeProcessResult:
"""
处理单个职位的候选人
处理单个职位的候选人(旧版本,仅入库)
已废弃,请使用 _process_single_job_with_evaluation
Args:
recruiter: 招聘者账号

View File

@@ -1 +1,15 @@
"""Mapper layer - Data access"""
from .candidate_mapper import CandidateMapper
from .job_mapper import JobMapper
from .recruiter_mapper import RecruiterMapper
from .resume_mapper import ResumeMapper
from .evaluation_mapper import EvaluationMapper
__all__ = [
'CandidateMapper',
'JobMapper',
'RecruiterMapper',
'ResumeMapper',
'EvaluationMapper',
]

View File

@@ -0,0 +1,376 @@
"""Evaluation data mapper using SQLAlchemy"""
from typing import List, Optional
from datetime import datetime
import uuid
import json
from sqlalchemy import select, update, delete
from sqlalchemy.orm import Session
from ..domain.evaluation import Evaluation, DimensionScore, EvaluationSchema
from ..domain.enums import Recommendation
from ..config.database import get_db_manager, EvaluationModel, EvaluationSchemaModel
class EvaluationMapper:
"""评价记录数据访问 - SQLAlchemy实现"""
def __init__(self, db_url: Optional[str] = None):
self.db_manager = get_db_manager(db_url)
# 确保表存在
self.db_manager.create_tables()
def _get_session(self) -> Session:
"""获取数据库会话"""
return self.db_manager.get_session()
def _model_to_entity(self, model: EvaluationModel) -> Evaluation:
"""将模型转换为实体"""
# 解析维度评分
dimension_scores = []
if model.dimension_scores:
ds_list = model.dimension_scores if isinstance(model.dimension_scores, list) else json.loads(model.dimension_scores)
for ds_data in ds_list:
dimension_scores.append(DimensionScore(
dimension_id=ds_data.get('dimension_id', ''),
dimension_name=ds_data.get('dimension_name', ''),
score=float(ds_data.get('score', 0)),
weight=float(ds_data.get('weight', 1.0)),
comment=ds_data.get('comment')
))
# 解析推荐意见
recommendation = None
if model.recommendation:
try:
recommendation = Recommendation(model.recommendation.lower())
except ValueError:
recommendation = None
# 解析tags, strengths, weaknesses
tags = model.tags if isinstance(model.tags, list) else (json.loads(model.tags) if model.tags else [])
strengths = model.strengths if isinstance(model.strengths, list) else (json.loads(model.strengths) if model.strengths else [])
weaknesses = model.weaknesses if isinstance(model.weaknesses, list) else (json.loads(model.weaknesses) if model.weaknesses else [])
return Evaluation(
id=model.id,
candidate_id=model.candidate_id,
schema_id=model.schema_id,
job_id=model.job_id,
overall_score=float(model.overall_score) if model.overall_score else 0.0,
dimension_scores=dimension_scores,
tags=tags,
summary=model.summary,
strengths=strengths,
weaknesses=weaknesses,
recommendation=recommendation,
raw_response=model.raw_response,
created_at=model.created_at
)
def _entity_to_model(self, entity: Evaluation) -> EvaluationModel:
"""将实体转换为模型"""
# 序列化维度评分
dimension_scores_json = [
{
'dimension_id': ds.dimension_id,
'dimension_name': ds.dimension_name,
'score': ds.score,
'weight': ds.weight,
'comment': ds.comment
}
for ds in entity.dimension_scores
] if entity.dimension_scores else []
return EvaluationModel(
id=entity.id or str(uuid.uuid4()),
candidate_id=entity.candidate_id,
schema_id=entity.schema_id,
job_id=entity.job_id,
overall_score=entity.overall_score,
dimension_scores=dimension_scores_json,
tags=entity.tags or [],
summary=entity.summary,
strengths=entity.strengths or [],
weaknesses=entity.weaknesses or [],
recommendation=entity.recommendation.value if entity.recommendation else None,
raw_response=entity.raw_response
)
def save(self, evaluation: Evaluation) -> Evaluation:
"""保存评价记录(插入或更新)"""
session = self._get_session()
try:
if evaluation.id:
# 检查是否已存在
existing = session.execute(
select(EvaluationModel).where(EvaluationModel.id == evaluation.id)
).scalar_one_or_none()
if existing:
# 更新现有记录
existing.overall_score = evaluation.overall_score
existing.dimension_scores = [
{
'dimension_id': ds.dimension_id,
'dimension_name': ds.dimension_name,
'score': ds.score,
'weight': ds.weight,
'comment': ds.comment
}
for ds in evaluation.dimension_scores
] if evaluation.dimension_scores else []
existing.tags = evaluation.tags or []
existing.summary = evaluation.summary
existing.strengths = evaluation.strengths or []
existing.weaknesses = evaluation.weaknesses or []
existing.recommendation = evaluation.recommendation.value if evaluation.recommendation else None
existing.raw_response = evaluation.raw_response
session.commit()
return evaluation
# 插入新记录
evaluation.id = evaluation.id or str(uuid.uuid4())
model = self._entity_to_model(evaluation)
session.add(model)
session.commit()
return evaluation
finally:
session.close()
def find_by_id(self, evaluation_id: str) -> Optional[Evaluation]:
"""根据ID查询评价记录"""
session = self._get_session()
try:
result = session.execute(
select(EvaluationModel).where(EvaluationModel.id == evaluation_id)
)
model = result.scalar_one_or_none()
return self._model_to_entity(model) if model else None
finally:
session.close()
def find_by_candidate_id(self, candidate_id: str) -> List[Evaluation]:
"""根据候选人ID查询评价记录"""
session = self._get_session()
try:
result = session.execute(
select(EvaluationModel)
.where(EvaluationModel.candidate_id == candidate_id)
.order_by(EvaluationModel.created_at.desc())
)
models = result.scalars().all()
return [self._model_to_entity(m) for m in models]
finally:
session.close()
def find_latest_by_candidate_id(self, candidate_id: str) -> Optional[Evaluation]:
"""获取候选人最新的评价记录"""
session = self._get_session()
try:
result = session.execute(
select(EvaluationModel)
.where(EvaluationModel.candidate_id == candidate_id)
.order_by(EvaluationModel.created_at.desc())
.limit(1)
)
model = result.scalar_one_or_none()
return self._model_to_entity(model) if model else None
finally:
session.close()
def find_by_job_id(self, job_id: str) -> List[Evaluation]:
"""根据职位ID查询评价记录"""
session = self._get_session()
try:
result = session.execute(
select(EvaluationModel)
.where(EvaluationModel.job_id == job_id)
.order_by(EvaluationModel.created_at.desc())
)
models = result.scalars().all()
return [self._model_to_entity(m) for m in models]
finally:
session.close()
def find_by_schema_id(self, schema_id: str) -> List[Evaluation]:
"""根据评价方案ID查询评价记录"""
session = self._get_session()
try:
result = session.execute(
select(EvaluationModel)
.where(EvaluationModel.schema_id == schema_id)
.order_by(EvaluationModel.created_at.desc())
)
models = result.scalars().all()
return [self._model_to_entity(m) for m in models]
finally:
session.close()
def find_by_candidate_and_job(self, candidate_id: str, job_id: str) -> Optional[Evaluation]:
"""根据候选人ID和职位ID查询评价记录"""
session = self._get_session()
try:
result = session.execute(
select(EvaluationModel)
.where(EvaluationModel.candidate_id == candidate_id)
.where(EvaluationModel.job_id == job_id)
.order_by(EvaluationModel.created_at.desc())
.limit(1)
)
model = result.scalar_one_or_none()
return self._model_to_entity(model) if model else None
finally:
session.close()
def find_high_score_candidates(
self,
min_score: float,
job_id: Optional[str] = None,
page: int = 1,
page_size: int = 20
) -> tuple:
"""
查询高分候选人
Args:
min_score: 最低评分
job_id: 职位ID可选
page: 页码
page_size: 每页数量
Returns:
tuple: (评价列表, 总记录数)
"""
session = self._get_session()
try:
from sqlalchemy import func
stmt = select(EvaluationModel).where(EvaluationModel.overall_score >= min_score)
count_stmt = select(func.count()).select_from(EvaluationModel).where(
EvaluationModel.overall_score >= min_score
)
if job_id:
stmt = stmt.where(EvaluationModel.job_id == job_id)
count_stmt = count_stmt.where(EvaluationModel.job_id == job_id)
# 获取总数
total = session.execute(count_stmt).scalar()
# 分页查询,按分数降序
stmt = stmt.order_by(EvaluationModel.overall_score.desc())
stmt = stmt.offset((page - 1) * page_size).limit(page_size)
results = session.execute(stmt).scalars().all()
return [self._model_to_entity(r) for r in results], total
finally:
session.close()
def delete(self, evaluation_id: str) -> bool:
"""删除评价记录"""
session = self._get_session()
try:
stmt = delete(EvaluationModel).where(EvaluationModel.id == evaluation_id)
result = session.execute(stmt)
session.commit()
return result.rowcount > 0
finally:
session.close()
def delete_by_candidate_id(self, candidate_id: str) -> int:
"""删除候选人的所有评价记录"""
session = self._get_session()
try:
stmt = delete(EvaluationModel).where(EvaluationModel.candidate_id == candidate_id)
result = session.execute(stmt)
session.commit()
return result.rowcount
finally:
session.close()
# ============== 评价方案查询方法 ==============
def _schema_model_to_entity(self, model: EvaluationSchemaModel) -> EvaluationSchema:
"""将评价方案模型转换为实体"""
dimensions = model.dimensions if isinstance(model.dimensions, list) else (
json.loads(model.dimensions) if model.dimensions else []
)
weights = model.weights if isinstance(model.weights, dict) else (
json.loads(model.weights) if model.weights else {}
)
return EvaluationSchema(
id=model.id,
name=model.name,
description=model.description,
dimensions=dimensions,
weights=weights,
prompt_template=model.prompt_template,
is_default=model.is_default,
created_at=model.created_at,
updated_at=model.updated_at
)
def find_schema_by_id(self, schema_id: str) -> Optional[EvaluationSchema]:
"""根据ID查询评价方案"""
session = self._get_session()
try:
result = session.execute(
select(EvaluationSchemaModel).where(EvaluationSchemaModel.id == schema_id)
)
model = result.scalar_one_or_none()
return self._schema_model_to_entity(model) if model else None
finally:
session.close()
def find_default_schema(self) -> Optional[EvaluationSchema]:
"""查询默认评价方案"""
session = self._get_session()
try:
result = session.execute(
select(EvaluationSchemaModel)
.where(EvaluationSchemaModel.is_default == True)
.limit(1)
)
model = result.scalar_one_or_none()
return self._schema_model_to_entity(model) if model else None
finally:
session.close()
def find_all_schemas(
self,
page: int = 1,
page_size: int = 20
) -> tuple:
"""
查询所有评价方案
Args:
page: 页码
page_size: 每页数量
Returns:
tuple: (评价方案列表, 总记录数)
"""
from sqlalchemy import func
session = self._get_session()
try:
stmt = select(EvaluationSchemaModel)
count_stmt = select(func.count()).select_from(EvaluationSchemaModel)
# 获取总数
total = session.execute(count_stmt).scalar()
# 分页查询,默认方案在前,按创建时间降序
stmt = stmt.order_by(
EvaluationSchemaModel.is_default.desc(),
EvaluationSchemaModel.created_at.desc()
)
stmt = stmt.offset((page - 1) * page_size).limit(page_size)
results = session.execute(stmt).scalars().all()
return [self._schema_model_to_entity(r) for r in results], total
finally:
session.close()

View File

@@ -45,6 +45,7 @@ class JobMapper:
source=CandidateSource(source_value),
source_id=model.source_id,
recruiter_id=model.recruiter_id,
evaluation_schema_id=model.evaluation_schema_id,
title=model.title,
department=model.department,
location=model.location,
@@ -79,6 +80,7 @@ class JobMapper:
source=entity.source.value,
source_id=entity.source_id,
recruiter_id=entity.recruiter_id,
evaluation_schema_id=entity.evaluation_schema_id,
title=entity.title,
department=entity.department,
location=entity.location,
@@ -114,6 +116,7 @@ class JobMapper:
description=job.description,
status=job.status.value,
recruiter_id=job.recruiter_id,
evaluation_schema_id=job.evaluation_schema_id,
candidate_count=job.candidate_count,
new_candidate_count=job.new_candidate_count,
last_sync_at=job.last_sync_at or datetime.now(),
@@ -269,3 +272,69 @@ class JobMapper:
return result.rowcount
finally:
session.close()
def find_filtered_jobs(
self,
source: Optional[str] = None,
recruiter_id: Optional[str] = None,
evaluation_schema_id: Optional[str] = None,
status: Optional[str] = None,
keyword: Optional[str] = None,
page: int = 1,
page_size: int = 20
) -> tuple:
"""
多条件筛选查询职位
Args:
source: 平台来源
recruiter_id: 招聘者账号ID
evaluation_schema_id: 评价方案ID
status: 职位状态
keyword: 关键词搜索(标题/部门)
page: 页码
page_size: 每页数量
Returns:
tuple: (职位列表, 总记录数)
"""
from sqlalchemy import func
session = self._get_session()
try:
stmt = select(JobModel)
count_stmt = select(func.count()).select_from(JobModel)
# 动态添加筛选条件
conditions = []
if source:
conditions.append(JobModel.source == source.lower())
if recruiter_id:
conditions.append(JobModel.recruiter_id == recruiter_id)
if evaluation_schema_id:
conditions.append(JobModel.evaluation_schema_id == evaluation_schema_id)
if status:
conditions.append(JobModel.status == status.lower())
if keyword:
conditions.append(
(JobModel.title.ilike(f"%{keyword}%")) |
(JobModel.department.ilike(f"%{keyword}%"))
)
# 应用条件
if conditions:
for condition in conditions:
stmt = stmt.where(condition)
count_stmt = count_stmt.where(condition)
# 获取总数
total = session.execute(count_stmt).scalar()
# 分页查询,按最后同步时间降序
stmt = stmt.order_by(JobModel.last_sync_at.desc().nullslast())
stmt = stmt.offset((page - 1) * page_size).limit(page_size)
results = session.execute(stmt).scalars().all()
return [self._model_to_entity(r) for r in results], total
finally:
session.close()

View File

@@ -1 +1,10 @@
"""Service layer - Business logic"""
from .resume_evaluation_service import ResumeEvaluationService, ProcessResult
from .recruiter_service import RecruiterService
__all__ = [
'ResumeEvaluationService',
'ProcessResult',
'RecruiterService',
]

View File

@@ -307,3 +307,94 @@ class BossCrawler(BaseCrawler):
if highest:
return getattr(highest, 'schoolName', None)
return None
def send_greeting(self, candidate: Candidate, message: str) -> bool:
"""
向候选人发送打招呼消息
Args:
candidate: 候选人对象(需包含 raw_data
message: 打招呼消息内容
Returns:
bool: 是否发送成功
"""
try:
geek_data = getattr(candidate, 'raw_data', None)
if geek_data is None:
print(f"[BossCrawler] 候选人 {candidate.name} 缺少 raw_data无法发送打招呼")
return False
# 从 geek_data 中提取必要的参数
geek_card = getattr(geek_data, 'geekCard', None)
if not geek_card:
print(f"[BossCrawler] 候选人 {candidate.name} 缺少 geekCard无法发送打招呼")
return False
# 获取发送消息所需的参数
encrypt_geek_id = getattr(geek_card, 'encryptGeekId', '') or getattr(geek_data, 'encryptGeekId', '')
encrypt_job_id = getattr(geek_card, 'encryptJobId', '')
security_id = getattr(geek_card, 'securityId', '')
lid = getattr(geek_card, 'lid', '')
expect_id = getattr(geek_card, 'expectId', 0)
if not encrypt_geek_id:
print(f"[BossCrawler] 候选人 {candidate.name} 缺少 encryptGeekId无法发送打招呼")
return False
# 调用 SDK 的打招呼接口
# 注意:这里假设 SDK 提供了 send_message 或 greet 方法
# 实际方法名需要根据 ylhp-boss-hr SDK 的实际接口调整
if hasattr(self.client, 'send_message'):
result = self.client.send_message(
encryptGeekId=encrypt_geek_id,
encryptJobId=encrypt_job_id,
securityId=security_id,
lid=lid,
expectId=expect_id,
content=message
)
print(f"[BossCrawler] 向候选人 {candidate.name} 发送打招呼成功")
return True
elif hasattr(self.client, 'greet'):
result = self.client.greet(
encryptGeekId=encrypt_geek_id,
encryptJobId=encrypt_job_id,
securityId=security_id,
lid=lid,
expectId=expect_id,
content=message
)
print(f"[BossCrawler] 向候选人 {candidate.name} 发送打招呼成功")
return True
elif hasattr(self.client, 'say_hi'):
result = self.client.say_hi(
encryptGeekId=encrypt_geek_id,
encryptJobId=encrypt_job_id,
securityId=security_id,
lid=lid,
expectId=expect_id,
content=message
)
print(f"[BossCrawler] 向候选人 {candidate.name} 发送打招呼成功")
return True
else:
print(f"[BossCrawler] SDK 不支持打招呼功能,请检查 ylhp-boss-hr SDK 版本")
return False
except Exception as e:
print(f"[BossCrawler] 向候选人 {candidate.name} 发送打招呼失败: {e}")
return False
def can_send_greeting(self) -> bool:
"""
检查当前 SDK 是否支持打招呼功能
Returns:
bool: 是否支持
"""
return (
hasattr(self.client, 'send_message') or
hasattr(self.client, 'greet') or
hasattr(self.client, 'say_hi')
)

View File

@@ -0,0 +1,434 @@
"""
简历评分服务
负责同步执行:获取简历 -> LLM分析 -> 评分入库 -> 触发后续动作
确保在同一账号下串行执行避免Boss平台检测
"""
from dataclasses import dataclass, field
from datetime import datetime
from decimal import Decimal
from typing import Optional, Dict, Any, List
import json
from .analysis.resume_analyzer import ResumeAnalyzer
from .analysis.evaluation_schema import EvaluationSchemaService
from .notification.notification_service import NotificationService
from .crawler.base_crawler import BaseCrawler
from .crawler.boss_crawler import BossCrawler
from ..mapper.candidate_mapper import CandidateMapper
from ..mapper.evaluation_mapper import EvaluationMapper
from ..domain.candidate import Candidate, CandidateStatus
from ..domain.evaluation import Evaluation
from ..domain.job import Job
from ..domain.resume import Resume
from ..config.settings import Settings, get_settings
@dataclass
class ProcessResult:
"""候选人处理结果"""
candidate_id: str
candidate_name: str
job_id: str
job_title: str
# 处理状态
success: bool = True
status: str = "new" # new, updated, skipped, failed
message: str = ""
# 评分结果
evaluation_id: Optional[str] = None
overall_score: float = 0.0
# 触发动作结果
greeting_sent: bool = False
notification_sent: bool = False
# 时间戳
timestamp: datetime = field(default_factory=datetime.now)
def to_dict(self) -> Dict[str, Any]:
return {
"candidate_id": self.candidate_id,
"candidate_name": self.candidate_name,
"job_id": self.job_id,
"job_title": self.job_title,
"success": self.success,
"status": self.status,
"message": self.message,
"evaluation_id": self.evaluation_id,
"overall_score": self.overall_score,
"greeting_sent": self.greeting_sent,
"notification_sent": self.notification_sent,
"timestamp": self.timestamp.isoformat()
}
class ResumeEvaluationService:
"""
简历评分服务
负责同步执行:获取简历 -> LLM分析 -> 评分入库 -> 触发后续动作
确保在同一账号下串行执行避免Boss平台检测
"""
def __init__(
self,
resume_analyzer: ResumeAnalyzer,
candidate_mapper: CandidateMapper,
evaluation_mapper: EvaluationMapper,
notification_service: Optional[NotificationService] = None,
schema_service: Optional[EvaluationSchemaService] = None,
settings: Optional[Settings] = None
):
"""
初始化简历评分服务
Args:
resume_analyzer: 简历分析器
candidate_mapper: 候选人数据访问
evaluation_mapper: 评价数据访问
notification_service: 通知服务(可选)
schema_service: 评价方案服务(可选)
settings: 配置(可选)
"""
self.resume_analyzer = resume_analyzer
self.candidate_mapper = candidate_mapper
self.evaluation_mapper = evaluation_mapper
self.notification_service = notification_service
self.schema_service = schema_service or EvaluationSchemaService()
self.settings = settings or get_settings()
async def process_candidate_sync(
self,
crawler: BaseCrawler,
candidate: Candidate,
job: Job,
schema_id: Optional[str] = None
) -> ProcessResult:
"""
同步处理单个候选人(获取简历+LLM分析+入库+触发动作)
关键:必须在同一线程/协程中串行完成,不能并行
这是为了确保在查看简历页面时不会进行其他操作避免Boss平台检测
Args:
crawler: 爬虫实例
candidate: 候选人对象
job: 职位对象
schema_id: 评价方案ID可选优先使用job关联的方案
Returns:
ProcessResult: 处理结果
"""
result = ProcessResult(
candidate_id=candidate.id or "",
candidate_name=candidate.name,
job_id=job.id or "",
job_title=job.title
)
print(f"[ResumeEvaluationService] 开始处理候选人: {candidate.name}")
try:
# 1. 获取简历详情 - 同步操作
resume = self._get_resume(crawler, candidate)
if not resume:
result.success = False
result.status = "failed"
result.message = "无法获取简历详情"
print(f"[ResumeEvaluationService] 候选人 {candidate.name} 无法获取简历")
return result
# 2. 确定评价方案ID
effective_schema_id = schema_id or job.evaluation_schema_id or "general"
# 3. 调用LLM进行评分分析 - 同步操作await确保串行
evaluation = await self._analyze_resume(
candidate=candidate,
resume=resume,
job=job,
schema_id=effective_schema_id
)
if not evaluation or evaluation.overall_score == 0:
result.success = False
result.status = "failed"
result.message = "LLM分析失败"
print(f"[ResumeEvaluationService] 候选人 {candidate.name} LLM分析失败")
return result
result.evaluation_id = evaluation.id
result.overall_score = evaluation.overall_score
# 4. 保存评价结果到数据库
saved_evaluation = self.evaluation_mapper.save(evaluation)
print(f"[ResumeEvaluationService] 候选人 {candidate.name} 评价已保存, 分数: {evaluation.overall_score}")
# 5. 更新候选人的LLM评分字段
self._update_candidate_score(candidate, evaluation)
# 6. 检查是否超过阈值,触发后续动作
await self._trigger_actions_if_qualified(
crawler=crawler,
candidate=candidate,
evaluation=evaluation,
job=job,
result=result
)
result.success = True
result.status = "new"
result.message = f"处理成功, 分数: {evaluation.overall_score}"
print(f"[ResumeEvaluationService] 候选人 {candidate.name} 处理完成: {result.message}")
except Exception as e:
result.success = False
result.status = "failed"
result.message = f"处理异常: {str(e)}"
print(f"[ResumeEvaluationService] 候选人 {candidate.name} 处理异常: {e}")
return result
def _get_resume(self, crawler: BaseCrawler, candidate: Candidate) -> Optional[Resume]:
"""
获取简历详情
Args:
crawler: 爬虫实例
candidate: 候选人对象
Returns:
Resume: 简历对象
"""
try:
return crawler.get_resume_detail(candidate)
except Exception as e:
print(f"[ResumeEvaluationService] 获取简历失败: {e}")
return None
async def _analyze_resume(
self,
candidate: Candidate,
resume: Resume,
job: Job,
schema_id: str
) -> Optional[Evaluation]:
"""
调用LLM进行简历分析
Args:
candidate: 候选人
resume: 简历
job: 职位
schema_id: 评价方案ID
Returns:
Evaluation: 评价结果
"""
try:
# 确保candidate有ID
if not candidate.id:
import uuid
candidate.id = str(uuid.uuid4())
evaluation = await self.resume_analyzer.analyze(
candidate_id=candidate.id,
resume=resume,
schema_id=schema_id,
job_id=job.id
)
return evaluation
except Exception as e:
print(f"[ResumeEvaluationService] LLM分析失败: {e}")
return None
def _update_candidate_score(self, candidate: Candidate, evaluation: Evaluation) -> None:
"""
更新候选人的LLM评分字段
Args:
candidate: 候选人
evaluation: 评价结果
"""
try:
candidate.llm_filtered = evaluation.overall_score >= self.settings.score_threshold_greet
candidate.llm_score = Decimal(str(evaluation.overall_score))
candidate.llm_score_details = evaluation.to_dict()
candidate.status = CandidateStatus.ANALYZED
self.candidate_mapper.save(candidate)
except Exception as e:
print(f"[ResumeEvaluationService] 更新候选人评分失败: {e}")
async def _trigger_actions_if_qualified(
self,
crawler: BaseCrawler,
candidate: Candidate,
evaluation: Evaluation,
job: Job,
result: ProcessResult
) -> None:
"""
如果评分超过阈值,触发打招呼和通知
Args:
crawler: 爬虫实例
candidate: 候选人
evaluation: 评价结果
job: 职位
result: 处理结果(用于记录触发状态)
"""
score = evaluation.overall_score
# 6.1 触发打招呼(如果启用且超阈值)
if (self.settings.auto_greet_enabled and
score >= self.settings.score_threshold_greet):
greeting_success = await self._trigger_greeting(
crawler=crawler,
candidate=candidate,
evaluation=evaluation
)
result.greeting_sent = greeting_success
# 6.2 触发HR通知如果启用且超阈值
if (self.settings.auto_notify_enabled and
score >= self.settings.score_threshold_notify):
notify_success = await self._trigger_notification(
candidate=candidate,
evaluation=evaluation,
job=job
)
result.notification_sent = notify_success
async def _trigger_greeting(
self,
crawler: BaseCrawler,
candidate: Candidate,
evaluation: Evaluation
) -> bool:
"""
触发打招呼
Args:
crawler: 爬虫实例
candidate: 候选人
evaluation: 评价结果
Returns:
bool: 是否成功
"""
try:
# 检查爬虫是否支持打招呼
if isinstance(crawler, BossCrawler) and crawler.can_send_greeting():
message = self.settings.greet_message_template
success = crawler.send_greeting(candidate, message)
if success:
# 更新候选人状态为已联系
candidate.status = CandidateStatus.CONTACTED
self.candidate_mapper.save(candidate)
print(f"[ResumeEvaluationService] 向 {candidate.name} 发送打招呼成功")
return success
else:
print(f"[ResumeEvaluationService] 爬虫不支持打招呼功能")
return False
except Exception as e:
print(f"[ResumeEvaluationService] 触发打招呼失败: {e}")
return False
async def _trigger_notification(
self,
candidate: Candidate,
evaluation: Evaluation,
job: Job
) -> bool:
"""
触发HR通知
Args:
candidate: 候选人
evaluation: 评价结果
job: 职位
Returns:
bool: 是否成功
"""
try:
if not self.notification_service:
print(f"[ResumeEvaluationService] 通知服务未配置")
return False
# 发送通知
result = await self.notification_service.notify(
candidate=candidate,
evaluation=evaluation,
title=f"【人才推荐】{candidate.name} - {job.title}",
extra_data={
"job_id": job.id,
"job_title": job.title,
"score": evaluation.overall_score
}
)
if result.success:
# 更新候选人状态为已推送
if candidate.status != CandidateStatus.CONTACTED:
candidate.status = CandidateStatus.PUSHED
self.candidate_mapper.save(candidate)
print(f"[ResumeEvaluationService] 候选人 {candidate.name} 已推送给HR")
return result.success
except Exception as e:
print(f"[ResumeEvaluationService] 触发HR通知失败: {e}")
return False
def should_process_candidate(
self,
candidate: Candidate,
job: Job
) -> bool:
"""
判断是否应该处理该候选人
用于跳过已处理过的候选人
Args:
candidate: 候选人
job: 职位
Returns:
bool: 是否应该处理
"""
# 如果候选人已有评价记录,检查是否需要重新评价
if candidate.id:
existing = self.evaluation_mapper.find_by_candidate_and_job(
candidate.id, job.id
)
if existing:
print(f"[ResumeEvaluationService] 候选人 {candidate.name} 已有评价记录,跳过")
return False
return True
def get_evaluation_summary(self, candidate_id: str) -> Optional[Dict[str, Any]]:
"""
获取候选人评价摘要
Args:
candidate_id: 候选人ID
Returns:
评价摘要字典
"""
evaluation = self.evaluation_mapper.find_latest_by_candidate_id(candidate_id)
if evaluation:
return evaluation.to_dict()
return None