diff --git a/migrations/001_init_schema.sql b/migrations/001_init_schema.sql index 2fbb466..4b2a5b2 100644 --- a/migrations/001_init_schema.sql +++ b/migrations/001_init_schema.sql @@ -95,6 +95,7 @@ CREATE TABLE IF NOT EXISTS jobs ( source VARCHAR(32) NOT NULL, -- BOSS, LIEPIN, etc. source_id VARCHAR(128) NOT NULL, recruiter_id VARCHAR(64), -- 关联的招聘者账号ID + evaluation_schema_id VARCHAR(64), -- 关联的评价方案ID title VARCHAR(256) NOT NULL, department VARCHAR(128), location VARCHAR(128), @@ -114,6 +115,7 @@ CREATE TABLE IF NOT EXISTS jobs ( UNIQUE KEY uk_source_source_id (source, source_id), INDEX idx_status (status), INDEX idx_recruiter_id (recruiter_id), + INDEX idx_evaluation_schema_id (evaluation_schema_id), INDEX idx_last_sync_at (last_sync_at), FOREIGN KEY (recruiter_id) REFERENCES recruiters(id) ON DELETE SET NULL ); @@ -200,6 +202,7 @@ ON DUPLICATE KEY UPDATE dimensions = VALUES(dimensions), weights = VALUES(weights); + INSERT INTO evaluation_schemas (id, name, description, dimensions, weights) VALUES ('java_backend', 'Java后端工程师评价方案', '针对Java后端开发岗位的综合评价方案', '[ diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/config/database.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/config/database.py index e330c1a..a83ad9a 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/config/database.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/config/database.py @@ -86,6 +86,7 @@ class JobModel(Base): source = Column(String(32), nullable=False) source_id = Column(String(128), nullable=False) recruiter_id = Column(String(64), ForeignKey('recruiters.id')) + evaluation_schema_id = Column(String(64), ForeignKey('evaluation_schemas.id')) # 关联的评价方案ID title = Column(String(256), nullable=False) department = Column(String(128)) location = Column(String(128)) diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/config/settings.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/config/settings.py index fe951af..6a26d38 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/config/settings.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/config/settings.py @@ -44,6 +44,20 @@ class Settings(BaseSettings): notify_email_from: Optional[str] = Field(default=None, description="发件人地址") notify_email_to: Optional[str] = Field(default=None, description="收件人地址") + # 评分阈值配置 (前缀: SCORE_) + score_threshold_greet: float = Field(default=70.0, description="触发打招呼的最低分数") + score_threshold_notify: float = Field(default=70.0, description="触发HR通知的最低分数") + auto_greet_enabled: bool = Field(default=True, description="是否启用自动打招呼") + auto_notify_enabled: bool = Field(default=True, description="是否启用自动通知HR") + greet_message_template: str = Field( + default="您好,我们对您的简历很感兴趣,期待与您进一步交流!", + description="打招呼消息模板" + ) + + # 简历处理配置 (前缀: RESUME_) + resume_process_delay: float = Field(default=1.0, description="处理每个候选人后的延迟秒数,避免请求过快") + resume_max_per_job: int = Field(default=20, description="每个职位最多处理的候选人数量") + class Config: env_file = ".env" env_file_encoding = "utf-8" diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/api.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/api.py index 833c285..de26fcc 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/api.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/api.py @@ -10,7 +10,7 @@ FastAPI主应用入口 from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware -from .routes import recruiter_router, scheduler_router, candidate_router +from .routes import recruiter_router, scheduler_router, candidate_router, job_router from .routes.system import router as system_router @@ -46,6 +46,9 @@ def create_app() -> FastAPI: # 候选人管理路由 app.include_router(candidate_router) + # 职位管理路由 + app.include_router(job_router) + return app diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/__init__.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/__init__.py index 0ad3021..e79e73e 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/__init__.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/__init__.py @@ -29,9 +29,16 @@ except ImportError: from fastapi import APIRouter candidate_router = APIRouter() +try: + from .job import router as job_router +except ImportError: + from fastapi import APIRouter + job_router = APIRouter() + __all__ = [ "recruiter_router", "scheduler_router", "system_router", - "candidate_router" + "candidate_router", + "job_router" ] diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/job.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/job.py new file mode 100644 index 0000000..701f1fa --- /dev/null +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/job.py @@ -0,0 +1,409 @@ +""" +职位管理 API 路由 + +提供职位CRUD、关联评价方案等功能 +""" +from typing import Optional +from fastapi import APIRouter, Query + +from ..schemas import ( + BaseResponse, + JobPositionResponse, JobPositionListResponse, + JobPositionCreateRequest, JobPositionUpdateRequest, + JobPositionFilterRequest, JobBindSchemaRequest, + EvaluationSchemaResponse, EvaluationSchemaListResponse +) +from ...domain.job import Job +from ...domain.enums import CandidateSource +from ...mapper.job_mapper import JobMapper +from ...mapper.evaluation_mapper import EvaluationMapper + + +router = APIRouter(prefix="/api/jobs", tags=["职位管理"]) + + +def _job_to_response(job: Job) -> JobPositionResponse: + """将领域实体转换为响应模型""" + return JobPositionResponse( + id=job.id, + title=job.title, + source=job.source.value if job.source else "", + status=job.status.value if job.status else "", + recruiter_id=job.recruiter_id, + evaluation_schema_id=job.evaluation_schema_id, + department=job.department, + location=job.location, + salary_min=job.salary_min, + salary_max=job.salary_max, + requirements=job.requirements, + description=job.description, + candidate_count=job.candidate_count, + new_candidate_count=job.new_candidate_count, + last_sync_at=job.last_sync_at, + created_at=job.created_at, + updated_at=job.updated_at + ) + + +@router.get("", response_model=BaseResponse[JobPositionListResponse]) +async def list_jobs( + source: Optional[str] = Query(None, description="平台来源"), + recruiter_id: Optional[str] = Query(None, description="招聘者账号ID"), + evaluation_schema_id: Optional[str] = Query(None, description="评价方案ID"), + status: Optional[str] = Query(None, description="状态: ACTIVE, PAUSED, CLOSED, ARCHIVED"), + keyword: Optional[str] = Query(None, description="关键词搜索(标题/部门)"), + page: int = Query(1, ge=1, description="页码"), + page_size: int = Query(20, ge=1, le=100, description="每页数量") +): + """ + 获取职位列表 + + Args: + source: 平台来源 + recruiter_id: 招聘者账号ID + evaluation_schema_id: 评价方案ID + status: 职位状态 + keyword: 关键词搜索 + page: 页码 + page_size: 每页数量 + + Returns: + BaseResponse[JobPositionListResponse]: 统一响应格式的职位列表 + """ + try: + mapper = JobMapper() + jobs, total = mapper.find_filtered_jobs( + source=source, + recruiter_id=recruiter_id, + evaluation_schema_id=evaluation_schema_id, + status=status, + keyword=keyword, + page=page, + page_size=page_size + ) + + response = JobPositionListResponse( + total=total, + items=[_job_to_response(job) for job in jobs] + ) + + return BaseResponse.success(data=response, msg="获取职位列表成功") + except Exception as e: + return BaseResponse.error(msg=f"获取职位列表失败: {str(e)}") + + +@router.post("/filter", response_model=BaseResponse[JobPositionListResponse]) +async def filter_jobs(request: JobPositionFilterRequest): + """ + 筛选查询职位 + + 支持多条件组合筛选 + + Returns: + BaseResponse[JobPositionListResponse]: 统一响应格式的职位列表 + """ + try: + mapper = JobMapper() + jobs, total = mapper.find_filtered_jobs( + source=request.source, + recruiter_id=request.recruiter_id, + evaluation_schema_id=request.evaluation_schema_id, + status=request.status, + keyword=request.keyword, + page=request.page, + page_size=request.page_size + ) + + response = JobPositionListResponse( + total=total, + items=[_job_to_response(job) for job in jobs] + ) + + return BaseResponse.success(data=response, msg="筛选职位成功") + except Exception as e: + return BaseResponse.error(msg=f"筛选职位失败: {str(e)}") + + +@router.get("/{job_id}", response_model=BaseResponse[JobPositionResponse]) +async def get_job_detail(job_id: str): + """ + 获取职位详情 + + Args: + job_id: 职位ID + + Returns: + BaseResponse[JobPositionResponse]: 统一响应格式的职位详情 + """ + try: + mapper = JobMapper() + job = mapper.find_by_id(job_id) + + if not job: + return BaseResponse.error(msg="职位不存在", code=404) + + return BaseResponse.success( + data=_job_to_response(job), + msg="获取职位详情成功" + ) + except Exception as e: + return BaseResponse.error(msg=f"获取职位详情失败: {str(e)}") + + +@router.post("", response_model=BaseResponse[JobPositionResponse]) +async def create_job(request: JobPositionCreateRequest): + """ + 创建职位 + + Args: + request: 创建职位请求 + + Returns: + BaseResponse[JobPositionResponse]: 统一响应格式的新建职位详情 + """ + try: + from ...domain.job import JobStatus + import uuid + + mapper = JobMapper() + + # 验证source + try: + source = CandidateSource(request.source.lower()) + except ValueError: + return BaseResponse.error(msg=f"无效的平台来源: {request.source}") + + # 创建职位实体 + job = Job( + id=str(uuid.uuid4()), + source=source, + source_id=request.source_id, + recruiter_id=request.recruiter_id, + evaluation_schema_id=request.evaluation_schema_id, + title=request.title, + department=request.department, + location=request.location, + salary_min=request.salary_min, + salary_max=request.salary_max, + requirements=request.requirements, + description=request.description, + status=JobStatus.ACTIVE + ) + + # 保存到数据库 + saved_job = mapper.save(job) + + return BaseResponse.success( + data=_job_to_response(saved_job), + msg="职位创建成功" + ) + except Exception as e: + return BaseResponse.error(msg=f"创建职位失败: {str(e)}") + + +@router.put("/{job_id}", response_model=BaseResponse[JobPositionResponse]) +async def update_job(job_id: str, request: JobPositionUpdateRequest): + """ + 更新职位 + + Args: + job_id: 职位ID + request: 更新职位请求 + + Returns: + BaseResponse[JobPositionResponse]: 统一响应格式的更新后职位详情 + """ + try: + from ...domain.job import JobStatus + + mapper = JobMapper() + job = mapper.find_by_id(job_id) + + if not job: + return BaseResponse.error(msg="职位不存在", code=404) + + # 更新字段 + if request.title is not None: + job.title = request.title + if request.evaluation_schema_id is not None: + job.evaluation_schema_id = request.evaluation_schema_id + if request.department is not None: + job.department = request.department + if request.location is not None: + job.location = request.location + if request.salary_min is not None: + job.salary_min = request.salary_min + if request.salary_max is not None: + job.salary_max = request.salary_max + if request.requirements is not None: + job.requirements = request.requirements + if request.description is not None: + job.description = request.description + if request.status is not None: + try: + job.status = JobStatus(request.status.upper()) + except ValueError: + return BaseResponse.error(msg=f"无效的状态: {request.status}") + + # 保存更新 + updated_job = mapper.save(job) + + return BaseResponse.success( + data=_job_to_response(updated_job), + msg="职位更新成功" + ) + except Exception as e: + return BaseResponse.error(msg=f"更新职位失败: {str(e)}") + + +@router.delete("/{job_id}", response_model=BaseResponse[dict]) +async def delete_job(job_id: str): + """ + 删除职位 + + Args: + job_id: 职位ID + + Returns: + BaseResponse[dict]: 统一响应格式 + """ + try: + mapper = JobMapper() + job = mapper.find_by_id(job_id) + + if not job: + return BaseResponse.error(msg="职位不存在", code=404) + + # 软删除:将状态改为ARCHIVED + from ...domain.job import JobStatus + job.status = JobStatus.ARCHIVED + mapper.save(job) + + return BaseResponse.success( + data={"deleted_id": job_id}, + msg="职位删除成功" + ) + except Exception as e: + return BaseResponse.error(msg=f"删除职位失败: {str(e)}") + + +@router.post("/{job_id}/bind-schema", response_model=BaseResponse[JobPositionResponse]) +async def bind_evaluation_schema(job_id: str, request: JobBindSchemaRequest): + """ + 职位关联评价方案 + + Args: + job_id: 职位ID + request: 关联评价方案请求 + + Returns: + BaseResponse[JobPositionResponse]: 统一响应格式的更新后职位详情 + """ + try: + job_mapper = JobMapper() + schema_mapper = EvaluationMapper() + + # 验证职位存在 + job = job_mapper.find_by_id(job_id) + if not job: + return BaseResponse.error(msg="职位不存在", code=404) + + # 验证评价方案存在 + schema = schema_mapper.find_schema_by_id(request.evaluation_schema_id) + if not schema: + return BaseResponse.error(msg="评价方案不存在", code=404) + + # 更新关联 + job.evaluation_schema_id = request.evaluation_schema_id + updated_job = job_mapper.save(job) + + return BaseResponse.success( + data=_job_to_response(updated_job), + msg=f"职位已成功关联评价方案: {schema.name}" + ) + except Exception as e: + return BaseResponse.error(msg=f"关联评价方案失败: {str(e)}") + + +@router.get("/{job_id}/schema", response_model=BaseResponse[EvaluationSchemaResponse]) +async def get_job_evaluation_schema(job_id: str): + """ + 获取职位关联的评价方案 + + Args: + job_id: 职位ID + + Returns: + BaseResponse[EvaluationSchemaResponse]: 统一响应格式的评价方案详情 + """ + try: + job_mapper = JobMapper() + schema_mapper = EvaluationMapper() + + # 验证职位存在 + job = job_mapper.find_by_id(job_id) + if not job: + return BaseResponse.error(msg="职位不存在", code=404) + + # 检查是否有关联的评价方案 + if not job.evaluation_schema_id: + return BaseResponse.error(msg="该职位未关联评价方案", code=404) + + # 获取评价方案 + schema = schema_mapper.find_schema_by_id(job.evaluation_schema_id) + if not schema: + return BaseResponse.error(msg="关联的评价方案不存在", code=404) + + response = EvaluationSchemaResponse( + id=schema.id, + name=schema.name, + description=schema.description, + dimensions=schema.dimensions, + weights=schema.weights, + is_default=schema.is_default, + created_at=schema.created_at, + updated_at=schema.updated_at + ) + + return BaseResponse.success(data=response, msg="获取评价方案成功") + except Exception as e: + return BaseResponse.error(msg=f"获取评价方案失败: {str(e)}") + + +@router.get("/schemas/list", response_model=BaseResponse[EvaluationSchemaListResponse]) +async def list_evaluation_schemas( + page: int = Query(1, ge=1, description="页码"), + page_size: int = Query(20, ge=1, le=100, description="每页数量") +): + """ + 获取评价方案列表 + + Args: + page: 页码 + page_size: 每页数量 + + Returns: + BaseResponse[EvaluationSchemaListResponse]: 统一响应格式的评价方案列表 + """ + try: + schema_mapper = EvaluationMapper() + schemas, total = schema_mapper.find_all_schemas(page=page, page_size=page_size) + + items = [ + EvaluationSchemaResponse( + id=schema.id, + name=schema.name, + description=schema.description, + dimensions=schema.dimensions, + weights=schema.weights, + is_default=schema.is_default, + created_at=schema.created_at, + updated_at=schema.updated_at + ) + for schema in schemas + ] + + response = EvaluationSchemaListResponse(total=total, items=items) + return BaseResponse.success(data=response, msg="获取评价方案列表成功") + except Exception as e: + return BaseResponse.error(msg=f"获取评价方案列表失败: {str(e)}") diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/recruiter.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/recruiter.py index 263ab4c..e483bc9 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/recruiter.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/recruiter.py @@ -6,13 +6,13 @@ from typing import List, Optional from datetime import datetime -from fastapi import APIRouter, HTTPException, Depends +from fastapi import APIRouter, Depends from ..schemas import ( RecruiterCreate, RecruiterRegister, RecruiterUpdate, RecruiterResponse, RecruiterListResponse, RecruiterRegisterResponse, RecruiterPrivilegeInfo, RecruiterSyncInfo, RecruiterSourceInfo, - APIResponse + BaseResponse ) from ...domain.candidate import CandidateSource from ...domain.recruiter import Recruiter, RecruiterStatus @@ -68,10 +68,10 @@ def _build_recruiter_response(recruiter: Recruiter) -> RecruiterResponse: ) -@router.get("/sources", response_model=List[RecruiterSourceInfo]) +@router.get("/sources", response_model=BaseResponse[List[RecruiterSourceInfo]]) async def get_recruiter_sources(): """获取支持的平台来源列表""" - return [ + sources = [ RecruiterSourceInfo( value="boss", label="Boss直聘", @@ -88,9 +88,10 @@ async def get_recruiter_sources(): description="智联招聘平台(预留)" ) ] + return BaseResponse.success(data=sources, msg="获取平台来源列表成功") -@router.get("", response_model=RecruiterListResponse) +@router.get("", response_model=BaseResponse[RecruiterListResponse]) async def list_recruiters( source: Optional[str] = None, service: RecruiterService = Depends(get_recruiter_service) @@ -101,54 +102,64 @@ async def list_recruiters( Args: source: 按平台筛选 (boss, liepin, etc.) """ - if source: - try: - candidate_source = CandidateSource(source.lower()) - recruiters = service.list_recruiters(candidate_source) - except ValueError: - raise HTTPException(status_code=400, detail=f"Invalid source: {source}") - else: - recruiters = service.list_recruiters() - - items = [_build_recruiter_response(r) for r in recruiters] - - return RecruiterListResponse(total=len(items), items=items) + try: + if source: + try: + candidate_source = CandidateSource(source.lower()) + recruiters = service.list_recruiters(candidate_source) + except ValueError: + return BaseResponse.error(msg=f"无效的来源: {source}", code=400) + else: + recruiters = service.list_recruiters() + + items = [_build_recruiter_response(r) for r in recruiters] + + response = RecruiterListResponse(total=len(items), items=items) + return BaseResponse.success(data=response, msg="获取招聘者列表成功") + except Exception as e: + return BaseResponse.error(msg=f"获取招聘者列表失败: {str(e)}") -@router.get("/{recruiter_id}", response_model=RecruiterResponse) +@router.get("/{recruiter_id}", response_model=BaseResponse[RecruiterResponse]) async def get_recruiter( recruiter_id: str, service: RecruiterService = Depends(get_recruiter_service) ): """获取单个招聘者账号详情""" - recruiter = service.get_recruiter(recruiter_id) - if not recruiter: - raise HTTPException(status_code=404, detail="Recruiter not found") - - return _build_recruiter_response(recruiter) + try: + recruiter = service.get_recruiter(recruiter_id) + if not recruiter: + return BaseResponse.error(msg="招聘者不存在", code=404) + + return BaseResponse.success(data=_build_recruiter_response(recruiter), msg="获取招聘者详情成功") + except Exception as e: + return BaseResponse.error(msg=f"获取招聘者详情失败: {str(e)}") -@router.post("", response_model=RecruiterResponse) +@router.post("", response_model=BaseResponse[RecruiterResponse]) async def create_recruiter( data: RecruiterCreate, service: RecruiterService = Depends(get_recruiter_service) ): """手动创建招聘者账号""" try: - source = CandidateSource(data.source.lower()) - except ValueError: - raise HTTPException(status_code=400, detail=f"Invalid source: {data.source}") - - recruiter = service.add_recruiter( - name=data.name, - source=source, - wt_token=data.wt_token - ) - - return _build_recruiter_response(recruiter) + try: + source = CandidateSource(data.source.lower()) + except ValueError: + return BaseResponse.error(msg=f"无效的来源: {data.source}", code=400) + + recruiter = service.add_recruiter( + name=data.name, + source=source, + wt_token=data.wt_token + ) + + return BaseResponse.success(data=_build_recruiter_response(recruiter), msg="创建招聘者成功") + except Exception as e: + return BaseResponse.error(msg=f"创建招聘者失败: {str(e)}") -@router.post("/register", response_model=RecruiterRegisterResponse) +@router.post("/register", response_model=BaseResponse[RecruiterRegisterResponse]) async def register_recruiter( data: RecruiterRegister, service: RecruiterService = Depends(get_recruiter_service) @@ -158,20 +169,17 @@ async def register_recruiter( 根据Token自动获取账号信息并注册 """ - # 验证平台来源 - try: - source = CandidateSource(data.source.lower()) - except ValueError: - raise HTTPException(status_code=400, detail=f"Invalid source: {data.source}") - - # 目前仅支持Boss平台 - if source != CandidateSource.BOSS: - raise HTTPException( - status_code=400, - detail="Auto register only supports 'boss' source currently" - ) - try: + # 验证平台来源 + try: + source = CandidateSource(data.source.lower()) + except ValueError: + return BaseResponse.error(msg=f"无效的来源: {data.source}", code=400) + + # 目前仅支持Boss平台 + if source != CandidateSource.BOSS: + return BaseResponse.error(msg="自动注册仅支持Boss平台", code=400) + # 1. 使用Token创建临时爬虫获取账号信息 crawler = BossCrawler(wt_token=data.wt_token) @@ -179,10 +187,7 @@ async def register_recruiter( if hasattr(crawler.client, 'check_login_status'): is_valid = crawler.client.check_login_status() if not is_valid: - return RecruiterRegisterResponse( - success=False, - message="Token无效或账号已过期,请检查Token是否正确" - ) + return BaseResponse.error(msg="Token无效或账号已过期,请检查Token是否正确") # 3. 获取账号信息 account_info = None @@ -190,10 +195,7 @@ async def register_recruiter( account_info = crawler.client.get_account_info() if not account_info: - return RecruiterRegisterResponse( - success=False, - message="无法获取账号信息,请检查Token是否有效" - ) + return BaseResponse.error(msg="无法获取账号信息,请检查Token是否有效") # 4. 提取账号名称 account_name = "" @@ -212,11 +214,12 @@ async def register_recruiter( existing_recruiters = service.list_recruiters(source) for existing in existing_recruiters: if existing.wt_token == data.wt_token: - return RecruiterRegisterResponse( + response_data = RecruiterRegisterResponse( success=False, message=f"该Token已注册为账号: {existing.name}", recruiter=_build_recruiter_response(existing) ) + return BaseResponse.success(data=response_data, msg="账号已存在") # 6. 创建账号 recruiter = service.add_recruiter( @@ -239,94 +242,98 @@ async def register_recruiter( # 权益信息获取失败不影响注册 print(f"[Register] 获取权益信息失败: {e}") - return RecruiterRegisterResponse( + response_data = RecruiterRegisterResponse( success=True, message=f"账号注册成功: {account_name}", recruiter=_build_recruiter_response(recruiter) ) + return BaseResponse.success(data=response_data, msg="账号注册成功") except ImportError as e: - return RecruiterRegisterResponse( - success=False, - message=f"SDK未安装或导入失败: {str(e)}" - ) + return BaseResponse.error(msg=f"SDK未安装或导入失败: {str(e)}") except Exception as e: - return RecruiterRegisterResponse( - success=False, - message=f"注册失败: {str(e)}" - ) + return BaseResponse.error(msg=f"注册失败: {str(e)}") -@router.put("/{recruiter_id}", response_model=RecruiterResponse) +@router.put("/{recruiter_id}", response_model=BaseResponse[RecruiterResponse]) async def update_recruiter( recruiter_id: str, data: RecruiterUpdate, service: RecruiterService = Depends(get_recruiter_service) ): """更新招聘者账号""" - recruiter = service.get_recruiter(recruiter_id) - if not recruiter: - raise HTTPException(status_code=404, detail="Recruiter not found") - - # 更新字段 - if data.name: - recruiter.name = data.name - if data.wt_token: - recruiter.wt_token = data.wt_token - if data.status: - try: - recruiter.status = RecruiterStatus(data.status.lower()) - except ValueError: - raise HTTPException( - status_code=400, - detail=f"Invalid status: {data.status}" - ) - - updated = service.mapper.save(recruiter) - - return _build_recruiter_response(updated) + try: + recruiter = service.get_recruiter(recruiter_id) + if not recruiter: + return BaseResponse.error(msg="招聘者不存在", code=404) + + # 更新字段 + if data.name: + recruiter.name = data.name + if data.wt_token: + recruiter.wt_token = data.wt_token + if data.status: + try: + recruiter.status = RecruiterStatus(data.status.lower()) + except ValueError: + return BaseResponse.error(msg=f"无效的状态: {data.status}", code=400) + + updated = service.mapper.save(recruiter) + + return BaseResponse.success(data=_build_recruiter_response(updated), msg="更新招聘者成功") + except Exception as e: + return BaseResponse.error(msg=f"更新招聘者失败: {str(e)}") -@router.delete("/{recruiter_id}") +@router.delete("/{recruiter_id}", response_model=BaseResponse[dict]) async def delete_recruiter( recruiter_id: str, service: RecruiterService = Depends(get_recruiter_service) ): """删除招聘者账号""" - success = service.delete_recruiter(recruiter_id) - if not success: - raise HTTPException(status_code=404, detail="Recruiter not found") - - return APIResponse(success=True, message="Recruiter deleted successfully") + try: + success = service.delete_recruiter(recruiter_id) + if not success: + return BaseResponse.error(msg="招聘者不存在", code=404) + + return BaseResponse.success(data={"deleted_id": recruiter_id}, msg="删除招聘者成功") + except Exception as e: + return BaseResponse.error(msg=f"删除招聘者失败: {str(e)}") -@router.post("/{recruiter_id}/activate") +@router.post("/{recruiter_id}/activate", response_model=BaseResponse[dict]) async def activate_recruiter( recruiter_id: str, service: RecruiterService = Depends(get_recruiter_service) ): """启用招聘者账号""" - success = service.activate_recruiter(recruiter_id) - if not success: - raise HTTPException(status_code=404, detail="Recruiter not found") - - return APIResponse(success=True, message="Recruiter activated") + try: + success = service.activate_recruiter(recruiter_id) + if not success: + return BaseResponse.error(msg="招聘者不存在", code=404) + + return BaseResponse.success(data={"recruiter_id": recruiter_id}, msg="启用招聘者成功") + except Exception as e: + return BaseResponse.error(msg=f"启用招聘者失败: {str(e)}") -@router.post("/{recruiter_id}/deactivate") +@router.post("/{recruiter_id}/deactivate", response_model=BaseResponse[dict]) async def deactivate_recruiter( recruiter_id: str, service: RecruiterService = Depends(get_recruiter_service) ): """停用招聘者账号""" - success = service.deactivate_recruiter(recruiter_id) - if not success: - raise HTTPException(status_code=404, detail="Recruiter not found") - - return APIResponse(success=True, message="Recruiter deactivated") + try: + success = service.deactivate_recruiter(recruiter_id) + if not success: + return BaseResponse.error(msg="招聘者不存在", code=404) + + return BaseResponse.success(data={"recruiter_id": recruiter_id}, msg="停用招聘者成功") + except Exception as e: + return BaseResponse.error(msg=f"停用招聘者失败: {str(e)}") -@router.post("/{recruiter_id}/sync") +@router.post("/{recruiter_id}/sync", response_model=BaseResponse[dict]) async def sync_recruiter( recruiter_id: str, service: RecruiterService = Depends(get_recruiter_service) @@ -336,14 +343,17 @@ async def sync_recruiter( 触发账号状态、权益信息、职位列表的同步 """ - recruiter = service.get_recruiter(recruiter_id) - if not recruiter: - raise HTTPException(status_code=404, detail="Recruiter not found") - - # TODO: 触发同步任务 - # 可以调用 AccountSyncJob 来执行同步 - - return APIResponse( - success=True, - message=f"Sync triggered for recruiter: {recruiter.name}" - ) + try: + recruiter = service.get_recruiter(recruiter_id) + if not recruiter: + return BaseResponse.error(msg="招聘者不存在", code=404) + + # TODO: 触发同步任务 + # 可以调用 AccountSyncJob 来执行同步 + + return BaseResponse.success( + data={"recruiter_id": recruiter_id, "recruiter_name": recruiter.name}, + msg=f"已触发账号同步: {recruiter.name}" + ) + except Exception as e: + return BaseResponse.error(msg=f"同步账号失败: {str(e)}") diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/scheduler.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/scheduler.py index 29f47cb..e620d24 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/scheduler.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/scheduler.py @@ -5,11 +5,11 @@ """ from typing import List, Optional -from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks +from fastapi import APIRouter, Depends, BackgroundTasks from ..schemas import ( JobInfo, JobStatusInfo, JobConfigUpdate, - APIResponse + BaseResponse ) from ...service.scheduler import get_scheduler @@ -22,32 +22,68 @@ def get_scheduler_instance(): return get_scheduler() -@router.get("/jobs", response_model=List[JobInfo]) +@router.get("/jobs", response_model=BaseResponse[List[JobInfo]]) async def list_jobs( scheduler=Depends(get_scheduler_instance) ): """获取所有定时任务列表""" - jobs = scheduler.get_jobs() - return [ - JobInfo( - id=job["id"], - name=job["name"], - next_run_time=job.get("next_run_time"), - trigger=job["trigger"], - type=job.get("type") - ) - for job in jobs - ] + try: + jobs = scheduler.get_jobs() + items = [ + JobInfo( + id=job["id"], + name=job["name"], + next_run_time=job.get("next_run_time"), + trigger=job["trigger"], + type=job.get("type") + ) + for job in jobs + ] + return BaseResponse.success(data=items, msg="获取任务列表成功") + except Exception as e: + return BaseResponse.error(msg=f"获取任务列表失败: {str(e)}") -@router.get("/jobs/status", response_model=List[JobStatusInfo]) +@router.get("/jobs/status", response_model=BaseResponse[List[JobStatusInfo]]) async def get_jobs_status( scheduler=Depends(get_scheduler_instance) ): """获取所有Job任务状态""" - status_list = scheduler.get_job_status() - return [ - JobStatusInfo( + try: + status_list = scheduler.get_job_status() + items = [ + JobStatusInfo( + job_id=s["job_id"], + name=s["name"], + enabled=s["enabled"], + is_running=s["is_running"], + last_run_time=s.get("last_run_time"), + next_run_time=s.get("next_run_time"), + run_count=s["run_count"], + success_count=s["success_count"], + fail_count=s["fail_count"], + last_error=s.get("last_error") + ) + for s in status_list + ] + return BaseResponse.success(data=items, msg="获取任务状态成功") + except Exception as e: + return BaseResponse.error(msg=f"获取任务状态失败: {str(e)}") + + +@router.get("/jobs/{job_id}/status", response_model=BaseResponse[JobStatusInfo]) +async def get_job_status( + job_id: str, + scheduler=Depends(get_scheduler_instance) +): + """获取指定Job任务状态""" + try: + status_list = scheduler.get_job_status(job_id) + if not status_list: + return BaseResponse.error(msg="任务不存在", code=404) + + s = status_list[0] + data = JobStatusInfo( job_id=s["job_id"], name=s["name"], enabled=s["enabled"], @@ -59,36 +95,12 @@ async def get_jobs_status( fail_count=s["fail_count"], last_error=s.get("last_error") ) - for s in status_list - ] + return BaseResponse.success(data=data, msg="获取任务状态成功") + except Exception as e: + return BaseResponse.error(msg=f"获取任务状态失败: {str(e)}") -@router.get("/jobs/{job_id}/status", response_model=JobStatusInfo) -async def get_job_status( - job_id: str, - scheduler=Depends(get_scheduler_instance) -): - """获取指定Job任务状态""" - status_list = scheduler.get_job_status(job_id) - if not status_list: - raise HTTPException(status_code=404, detail="Job not found") - - s = status_list[0] - return JobStatusInfo( - job_id=s["job_id"], - name=s["name"], - enabled=s["enabled"], - is_running=s["is_running"], - last_run_time=s.get("last_run_time"), - next_run_time=s.get("next_run_time"), - run_count=s["run_count"], - success_count=s["success_count"], - fail_count=s["fail_count"], - last_error=s.get("last_error") - ) - - -@router.post("/jobs/{job_id}/run") +@router.post("/jobs/{job_id}/run", response_model=BaseResponse[dict]) async def run_job_now( job_id: str, background_tasks: BackgroundTasks, @@ -100,21 +112,24 @@ async def run_job_now( Args: job_id: 任务ID (account_sync 或 resume_process) """ - # 验证任务是否存在 - valid_jobs = ["account_sync", "resume_process", "crawl_boss", "analyze_pending"] - if job_id not in valid_jobs: - raise HTTPException(status_code=400, detail=f"Invalid job_id. Valid: {valid_jobs}") - - # 在后台执行 - background_tasks.add_task(scheduler.run_job_now, job_id) - - return APIResponse( - success=True, - message=f"Job {job_id} is running in background" - ) + try: + # 验证任务是否存在 + valid_jobs = ["account_sync", "resume_process", "crawl_boss", "analyze_pending"] + if job_id not in valid_jobs: + return BaseResponse.error(msg=f"无效的任务ID,有效值: {valid_jobs}", code=400) + + # 在后台执行 + background_tasks.add_task(scheduler.run_job_now, job_id) + + return BaseResponse.success( + data={"job_id": job_id}, + msg=f"任务 {job_id} 已在后台开始执行" + ) + except Exception as e: + return BaseResponse.error(msg=f"执行任务失败: {str(e)}") -@router.post("/jobs/{job_id}/pause") +@router.post("/jobs/{job_id}/pause", response_model=BaseResponse[dict]) async def pause_job( job_id: str, scheduler=Depends(get_scheduler_instance) @@ -122,12 +137,12 @@ async def pause_job( """暂停指定任务""" try: scheduler.pause_job(job_id) - return APIResponse(success=True, message=f"Job {job_id} paused") + return BaseResponse.success(data={"job_id": job_id}, msg=f"任务 {job_id} 已暂停") except Exception as e: - raise HTTPException(status_code=400, detail=str(e)) + return BaseResponse.error(msg=f"暂停任务失败: {str(e)}") -@router.post("/jobs/{job_id}/resume") +@router.post("/jobs/{job_id}/resume", response_model=BaseResponse[dict]) async def resume_job( job_id: str, scheduler=Depends(get_scheduler_instance) @@ -135,12 +150,12 @@ async def resume_job( """恢复指定任务""" try: scheduler.resume_job(job_id) - return APIResponse(success=True, message=f"Job {job_id} resumed") + return BaseResponse.success(data={"job_id": job_id}, msg=f"任务 {job_id} 已恢复") except Exception as e: - raise HTTPException(status_code=400, detail=str(e)) + return BaseResponse.error(msg=f"恢复任务失败: {str(e)}") -@router.put("/jobs/{job_id}/config") +@router.put("/jobs/{job_id}/config", response_model=BaseResponse[dict]) async def update_job_config( job_id: str, config: JobConfigUpdate, @@ -153,75 +168,78 @@ async def update_job_config( job_id: 任务ID config: 配置更新 """ - # 验证任务ID - valid_job_ids = ["account_sync", "resume_process"] - if job_id not in valid_job_ids: - raise HTTPException( - status_code=400, - detail=f"Can only config Job module tasks: {valid_job_ids}" - ) - - # 构建更新参数 - update_kwargs = {} - if config.enabled is not None: - update_kwargs["enabled"] = config.enabled - if config.interval_minutes is not None: - update_kwargs["interval_minutes"] = config.interval_minutes - - if not update_kwargs: - raise HTTPException(status_code=400, detail="No config to update") - try: + # 验证任务ID + valid_job_ids = ["account_sync", "resume_process"] + if job_id not in valid_job_ids: + return BaseResponse.error( + msg=f"只能配置Job模块任务: {valid_job_ids}", + code=400 + ) + + # 构建更新参数 + update_kwargs = {} + if config.enabled is not None: + update_kwargs["enabled"] = config.enabled + if config.interval_minutes is not None: + update_kwargs["interval_minutes"] = config.interval_minutes + + if not update_kwargs: + return BaseResponse.error(msg="没有要更新的配置", code=400) + scheduler.job_scheduler.update_job_config(job_id, **update_kwargs) - return APIResponse( - success=True, - message=f"Job {job_id} config updated", - data=update_kwargs + return BaseResponse.success( + data={"job_id": job_id, "config": update_kwargs}, + msg=f"任务 {job_id} 配置已更新" ) except Exception as e: - raise HTTPException(status_code=400, detail=str(e)) + return BaseResponse.error(msg=f"更新任务配置失败: {str(e)}") -@router.get("/status") +@router.get("/status", response_model=BaseResponse[dict]) async def get_scheduler_status( scheduler=Depends(get_scheduler_instance) ): """获取调度器整体状态""" - jobs = scheduler.get_jobs() - job_status = scheduler.get_job_status() - - return { - "running": scheduler._running if hasattr(scheduler, '_running') else False, - "total_jobs": len(jobs), - "job_scheduler_jobs": len([j for j in jobs if j.get("type") == "job"]), - "legacy_jobs": len([j for j in jobs if j.get("type") == "legacy"]), - "job_status_summary": { - "total": len(job_status), - "running": sum(1 for s in job_status if s["is_running"]), - "enabled": sum(1 for s in job_status if s["enabled"]) + try: + jobs = scheduler.get_jobs() + job_status = scheduler.get_job_status() + + data = { + "running": scheduler._running if hasattr(scheduler, '_running') else False, + "total_jobs": len(jobs), + "job_scheduler_jobs": len([j for j in jobs if j.get("type") == "job"]), + "legacy_jobs": len([j for j in jobs if j.get("type") == "legacy"]), + "job_status_summary": { + "total": len(job_status), + "running": sum(1 for s in job_status if s["is_running"]), + "enabled": sum(1 for s in job_status if s["enabled"]) + } } - } + return BaseResponse.success(data=data, msg="获取调度器状态成功") + except Exception as e: + return BaseResponse.error(msg=f"获取调度器状态失败: {str(e)}") -@router.post("/start") +@router.post("/start", response_model=BaseResponse[dict]) async def start_scheduler( scheduler=Depends(get_scheduler_instance) ): """启动调度器""" try: scheduler.start() - return APIResponse(success=True, message="Scheduler started") + return BaseResponse.success(data={}, msg="调度器已启动") except Exception as e: - raise HTTPException(status_code=400, detail=str(e)) + return BaseResponse.error(msg=f"启动调度器失败: {str(e)}") -@router.post("/stop") +@router.post("/stop", response_model=BaseResponse[dict]) async def stop_scheduler( scheduler=Depends(get_scheduler_instance) ): """停止调度器""" try: scheduler.stop() - return APIResponse(success=True, message="Scheduler stopped") + return BaseResponse.success(data={}, msg="调度器已停止") except Exception as e: - raise HTTPException(status_code=400, detail=str(e)) + return BaseResponse.error(msg=f"停止调度器失败: {str(e)}") diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/system.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/system.py index c830ca3..f7c4ff2 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/system.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/system.py @@ -5,49 +5,53 @@ """ from fastapi import APIRouter -from ..schemas import APIResponse +from ..schemas import BaseResponse router = APIRouter(tags=["系统"]) -@router.get("/") +@router.get("/", response_model=BaseResponse[dict]) async def root(): """API根路径""" - return { + data = { "name": "简历智能体 API", "version": "0.1.0", "docs": "/docs", "endpoints": { "recruiters": "/api/recruiters", + "jobs": "/api/jobs", + "candidates": "/candidates", "scheduler": "/api/scheduler", "health": "/health" } } + return BaseResponse.success(data=data, msg="简历智能体 API 运行中") -@router.get("/health") +@router.get("/health", response_model=BaseResponse[dict]) async def health_check(): """健康检查""" - return { + data = { "status": "healthy", "service": "hr-agent-api", "version": "0.1.0" } + return BaseResponse.success(data=data, msg="服务运行正常") -@router.get("/api/status") +@router.get("/api/status", response_model=BaseResponse[dict]) async def api_status(): """获取API状态信息""" - return APIResponse( - success=True, - message="API is running", - data={ - "version": "0.1.0", - "features": [ - "recruiter_management", - "scheduler_management", - "auto_register" - ] - } - ) + data = { + "version": "0.1.0", + "features": [ + "recruiter_management", + "job_management", + "candidate_management", + "scheduler_management", + "auto_register", + "llm_evaluation" + ] + } + return BaseResponse.success(data=data, msg="API运行中") diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/schemas.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/schemas.py index 7f3a8b8..56fc75b 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/schemas.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/schemas.py @@ -228,16 +228,89 @@ class CandidateUpdateScoreRequest(BaseModel): # ============== 职位(预留) ============== class JobPositionResponse(BaseModel): - """职位响应(预留)""" + """职位响应""" id: str title: str source: str status: str recruiter_id: Optional[str] = None + evaluation_schema_id: Optional[str] = Field(None, description="关联的评价方案ID") + department: Optional[str] = None + location: Optional[str] = None + salary_min: Optional[int] = None + salary_max: Optional[int] = None + requirements: Optional[Dict[str, Any]] = None + description: Optional[str] = None + candidate_count: int = Field(default=0, description="候选人数量") + new_candidate_count: int = Field(default=0, description="新候选人数量") + last_sync_at: Optional[datetime] = None created_at: Optional[datetime] = None + updated_at: Optional[datetime] = None class JobPositionListResponse(BaseModel): - """职位列表响应(预留)""" + """职位列表响应""" total: int items: List[JobPositionResponse] + + +class JobPositionCreateRequest(BaseModel): + """创建职位请求""" + title: str = Field(..., description="职位标题") + source: str = Field(default="boss", description="平台来源: boss, liepin, etc.") + source_id: str = Field(..., description="来源平台ID") + recruiter_id: Optional[str] = Field(None, description="招聘者账号ID") + evaluation_schema_id: Optional[str] = Field(None, description="关联的评价方案ID") + department: Optional[str] = Field(None, description="部门") + location: Optional[str] = Field(None, description="地点") + salary_min: Optional[int] = Field(None, description="薪资下限(K)") + salary_max: Optional[int] = Field(None, description="薪资上限(K)") + requirements: Optional[Dict[str, Any]] = Field(None, description="职位要求JSON") + description: Optional[str] = Field(None, description="职位描述") + + +class JobPositionUpdateRequest(BaseModel): + """更新职位请求""" + title: Optional[str] = Field(None, description="职位标题") + evaluation_schema_id: Optional[str] = Field(None, description="关联的评价方案ID") + department: Optional[str] = Field(None, description="部门") + location: Optional[str] = Field(None, description="地点") + salary_min: Optional[int] = Field(None, description="薪资下限(K)") + salary_max: Optional[int] = Field(None, description="薪资上限(K)") + requirements: Optional[Dict[str, Any]] = Field(None, description="职位要求JSON") + description: Optional[str] = Field(None, description="职位描述") + status: Optional[str] = Field(None, description="状态: ACTIVE, PAUSED, CLOSED, ARCHIVED") + + +class JobPositionFilterRequest(BaseModel): + """职位筛选请求""" + source: Optional[str] = Field(None, description="平台来源") + recruiter_id: Optional[str] = Field(None, description="招聘者账号ID") + evaluation_schema_id: Optional[str] = Field(None, description="评价方案ID") + status: Optional[str] = Field(None, description="状态: ACTIVE, PAUSED, CLOSED, ARCHIVED") + keyword: Optional[str] = Field(None, description="关键词搜索(标题/部门)") + page: int = Field(default=1, ge=1, description="页码") + page_size: int = Field(default=20, ge=1, le=100, description="每页数量") + + +class JobBindSchemaRequest(BaseModel): + """职位关联评价方案请求""" + evaluation_schema_id: str = Field(..., description="评价方案ID") + + +class EvaluationSchemaResponse(BaseModel): + """评价方案响应""" + id: str + name: str + description: Optional[str] = None + dimensions: Optional[List[Dict[str, Any]]] = None + weights: Optional[Dict[str, float]] = None + is_default: bool = False + created_at: Optional[datetime] = None + updated_at: Optional[datetime] = None + + +class EvaluationSchemaListResponse(BaseModel): + """评价方案列表响应""" + total: int + items: List[EvaluationSchemaResponse] diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/domain/job.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/domain/job.py index 70fa3ec..b75e838 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/domain/job.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/domain/job.py @@ -32,6 +32,7 @@ class Job: source: CandidateSource = CandidateSource.BOSS source_id: str = "" recruiter_id: Optional[str] = None # 关联的招聘者账号ID + evaluation_schema_id: Optional[str] = None # 关联的评价方案ID # 职位信息 title: str = "" diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/job/resume_process_job.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/job/resume_process_job.py index 92ce306..a4b1718 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/job/resume_process_job.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/job/resume_process_job.py @@ -4,16 +4,18 @@ 负责处理简历信息入库操作: 1. 遍历所有活跃账号的职位 2. 获取职位下的候选人列表 -3. 获取候选人简历详情 -4. 将简历信息入库 +3. 获取候选人简历详情,同步进行LLM评分 +4. 将简历信息和评分结果入库 +5. 超过阈值触发打招呼和HR通知 -入库更新操作: -- 插入/更新 candidates 表 -- 插入/更新 resumes 表 +架构特点: +- 多账号并行处理(使用asyncio.gather) +- 每个账号内部串行处理候选人(确保同一账号下操作的顺序性,避免Boss平台检测) """ +import asyncio from datetime import datetime from typing import List, Optional, Dict, Any -from dataclasses import dataclass +from dataclasses import dataclass, field from ..domain.candidate import Candidate, CandidateSource, CandidateStatus from ..domain.resume import Resume @@ -21,7 +23,9 @@ from ..domain.job import Job from ..domain.recruiter import Recruiter, RecruiterStatus from ..service.recruiter_service import RecruiterService from ..service.ingestion.unified_ingestion_service import UnifiedIngestionService +from ..service.resume_evaluation_service import ResumeEvaluationService, ProcessResult from ..service.crawler.base_crawler import BaseCrawler +from ..config.settings import Settings, get_settings @dataclass @@ -35,9 +39,13 @@ class ResumeProcessResult: candidates_new: int = 0 candidates_updated: int = 0 candidates_failed: int = 0 + candidates_greeted: int = 0 # 打招呼成功数 + candidates_notified: int = 0 # 通知HR成功数 + high_score_count: int = 0 # 高分候选人数量 success: bool = True message: str = "" timestamp: datetime = None + process_results: List[ProcessResult] = field(default_factory=list) # 详细处理结果 def __post_init__(self): if self.timestamp is None: @@ -48,62 +56,88 @@ class ResumeProcessJob: """ 简历处理定时任务 - 定时执行简历爬取和入库: + 定时执行简历爬取、LLM评分和入库: - 获取所有活跃账号 - - 遍历账号下的职位 - - 获取候选人列表和简历详情 - - 统一入库处理 + - 多账号并行处理(asyncio.gather) + - 单账号内串行处理候选人(简历获取+LLM分析同步,避免Boss检测) + - 超阈值触发打招呼和HR通知 """ def __init__( self, recruiter_service: RecruiterService, - ingestion_service: UnifiedIngestionService + ingestion_service: UnifiedIngestionService, + evaluation_service: Optional[ResumeEvaluationService] = None, + settings: Optional[Settings] = None ): self.recruiter_service = recruiter_service self.ingestion_service = ingestion_service - # 每个职位最多处理的候选人数量 - self.max_candidates_per_job = 20 + self.evaluation_service = evaluation_service + self.settings = settings or get_settings() + + # 从配置中读取参数 + self.max_candidates_per_job = self.settings.resume_max_per_job + self.process_delay = self.settings.resume_process_delay async def execute(self) -> List[ResumeProcessResult]: """ 执行简历处理任务 + 架构: + - 多个账号并行处理(使用asyncio.gather) + - 每个账号内部串行处理候选人(确保同一账号下操作的顺序性) + Returns: List[ResumeProcessResult]: 各职位的处理结果 """ print(f"[{datetime.now()}] 开始执行简历处理任务...") - - results = [] + print(f"[{datetime.now()}] 配置: 评分阈值={self.settings.score_threshold_greet}, " + f"自动打招呼={'ON' if self.settings.auto_greet_enabled else 'OFF'}, " + f"自动通知={'ON' if self.settings.auto_notify_enabled else 'OFF'}") # 获取所有活跃且同步成功的账号 recruiters = self._get_eligible_recruiters() if not recruiters: print(f"[{datetime.now()}] 没有符合条件的招聘者账号(需要ACTIVE状态且同步成功)") - return results + return [] - print(f"[{datetime.now()}] 找到 {len(recruiters)} 个符合条件的账号") + print(f"[{datetime.now()}] 找到 {len(recruiters)} 个符合条件的账号,开始并行处理...") - for recruiter in recruiters: - try: - job_results = await self._process_recruiter_jobs(recruiter) - results.extend(job_results) - except Exception as e: - print(f"[{datetime.now()}] 处理账号 {recruiter.name} 时发生异常: {e}") + # 多账号并行处理 + tasks = [ + self._process_recruiter_jobs_with_lock(recruiter) + for recruiter in recruiters + ] + + results_list = await asyncio.gather(*tasks, return_exceptions=True) + + # 汇总结果 + all_results = [] + for i, result in enumerate(results_list): + if isinstance(result, Exception): + print(f"[{datetime.now()}] 账号 {recruiters[i].name} 处理异常: {result}") + elif isinstance(result, list): + all_results.extend(result) # 统计结果 - total_processed = sum(r.candidates_processed for r in results) - total_new = sum(r.candidates_new for r in results) - total_failed = sum(r.candidates_failed for r in results) + total_processed = sum(r.candidates_processed for r in all_results) + total_new = sum(r.candidates_new for r in all_results) + total_failed = sum(r.candidates_failed for r in all_results) + total_greeted = sum(r.candidates_greeted for r in all_results) + total_notified = sum(r.candidates_notified for r in all_results) + total_high_score = sum(r.high_score_count for r in all_results) print(f"[{datetime.now()}] 简历处理任务完成:") - print(f" - 处理职位数: {len(results)}") + print(f" - 处理职位数: {len(all_results)}") print(f" - 处理候选人数: {total_processed}") print(f" - 新增候选人数: {total_new}") + print(f" - 高分候选人数: {total_high_score}") + print(f" - 打招呼成功数: {total_greeted}") + print(f" - HR通知成功数: {total_notified}") print(f" - 失败数: {total_failed}") - return results + return all_results def _get_eligible_recruiters(self) -> List[Recruiter]: """ @@ -136,9 +170,15 @@ class ResumeProcessJob: return eligible_recruiters - async def _process_recruiter_jobs(self, recruiter: Recruiter) -> List[ResumeProcessResult]: + async def _process_recruiter_jobs_with_lock( + self, + recruiter: Recruiter + ) -> List[ResumeProcessResult]: """ - 处理单个账号下的所有职位 + 处理单个账号下的所有职位(内部串行) + + 关键:同一账号下的简历获取和LLM分析必须串行执行, + 避免Boss平台检测到异常操作模式 Args: recruiter: 招聘者账号 @@ -165,7 +205,9 @@ class ResumeProcessJob: for job in jobs: try: - result = await self._process_single_job(recruiter, crawler, job) + result = await self._process_single_job_with_evaluation( + recruiter, crawler, job + ) results.append(result) except Exception as e: error_result = ResumeProcessResult( @@ -184,6 +226,111 @@ class ResumeProcessJob: return results + async def _process_single_job_with_evaluation( + self, + recruiter: Recruiter, + crawler: BaseCrawler, + job: Job + ) -> ResumeProcessResult: + """ + 处理单个职位的候选人(集成LLM评分) + + Args: + recruiter: 招聘者账号 + crawler: 爬虫实例 + job: 职位 + + Returns: + ResumeProcessResult: 处理结果 + """ + result = ResumeProcessResult( + job_id=job.id or "", + job_title=job.title, + recruiter_id=recruiter.id or "", + recruiter_name=recruiter.name + ) + + print(f"[{datetime.now()}] 处理职位: {job.title} (ID: {job.source_id}, " + f"评价方案: {job.evaluation_schema_id or 'general'})") + + # 获取候选人列表 + candidates = crawler.get_candidates(job.source_id, page=1) + + if not candidates: + result.message = "该职位下没有候选人" + print(f"[{datetime.now()}] 职位 {job.title} 没有候选人") + return result + + print(f"[{datetime.now()}] 职位 {job.title} 找到 {len(candidates)} 个候选人") + + # 限制处理数量 + candidates_to_process = candidates[:self.max_candidates_per_job] + + # 串行处理每个候选人(关键:不能并行!) + for candidate in candidates_to_process: + try: + # 使用评分服务同步处理(获取简历 -> LLM分析 -> 入库 -> 触发动作) + if self.evaluation_service: + process_result = await self.evaluation_service.process_candidate_sync( + crawler=crawler, + candidate=candidate, + job=job, + schema_id=job.evaluation_schema_id + ) + + result.candidates_processed += 1 + result.process_results.append(process_result) + + if process_result.success: + result.candidates_new += 1 + + # 统计高分候选人 + if process_result.overall_score >= self.settings.score_threshold_greet: + result.high_score_count += 1 + + # 统计打招呼和通知 + if process_result.greeting_sent: + result.candidates_greeted += 1 + if process_result.notification_sent: + result.candidates_notified += 1 + else: + result.candidates_failed += 1 + else: + # 回退到旧的处理方式(仅入库,不评分) + process_status = await self._process_single_candidate( + recruiter, crawler, job, candidate + ) + + result.candidates_processed += 1 + + if process_status == "new": + result.candidates_new += 1 + elif process_status == "updated": + result.candidates_updated += 1 + elif process_status == "failed": + result.candidates_failed += 1 + + # 添加延迟,避免请求过快被Boss平台拦截 + await asyncio.sleep(self.process_delay) + + except Exception as e: + result.candidates_processed += 1 + result.candidates_failed += 1 + print(f"[{datetime.now()}] 处理候选人 {candidate.name} 失败: {e}") + + result.success = result.candidates_failed == 0 + result.message = ( + f"处理完成: 新增{result.candidates_new}, " + f"高分{result.high_score_count}, " + f"打招呼{result.candidates_greeted}, " + f"通知{result.candidates_notified}, " + f"失败{result.candidates_failed}" + ) + + print(f"[{datetime.now()}] 职位 {job.title} 处理完成: {result.message}") + + return result + async def _process_single_job( self, recruiter: Recruiter, @@ -191,7 +338,9 @@ class ResumeProcessJob: job: Job ) -> ResumeProcessResult: """ - 处理单个职位的候选人 + 处理单个职位的候选人(旧版本,仅入库) + + 已废弃,请使用 _process_single_job_with_evaluation Args: recruiter: 招聘者账号 diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/mapper/__init__.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/mapper/__init__.py index 80dba04..268fee2 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/mapper/__init__.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/mapper/__init__.py @@ -1 +1,15 @@ """Mapper layer - Data access""" + +from .candidate_mapper import CandidateMapper +from .job_mapper import JobMapper +from .recruiter_mapper import RecruiterMapper +from .resume_mapper import ResumeMapper +from .evaluation_mapper import EvaluationMapper + +__all__ = [ + 'CandidateMapper', + 'JobMapper', + 'RecruiterMapper', + 'ResumeMapper', + 'EvaluationMapper', +] diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/mapper/evaluation_mapper.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/mapper/evaluation_mapper.py new file mode 100644 index 0000000..7889f3f --- /dev/null +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/mapper/evaluation_mapper.py @@ -0,0 +1,376 @@ +"""Evaluation data mapper using SQLAlchemy""" +from typing import List, Optional +from datetime import datetime +import uuid +import json + +from sqlalchemy import select, update, delete +from sqlalchemy.orm import Session + +from ..domain.evaluation import Evaluation, DimensionScore, EvaluationSchema +from ..domain.enums import Recommendation +from ..config.database import get_db_manager, EvaluationModel, EvaluationSchemaModel + + +class EvaluationMapper: + """评价记录数据访问 - SQLAlchemy实现""" + + def __init__(self, db_url: Optional[str] = None): + self.db_manager = get_db_manager(db_url) + # 确保表存在 + self.db_manager.create_tables() + + def _get_session(self) -> Session: + """获取数据库会话""" + return self.db_manager.get_session() + + def _model_to_entity(self, model: EvaluationModel) -> Evaluation: + """将模型转换为实体""" + # 解析维度评分 + dimension_scores = [] + if model.dimension_scores: + ds_list = model.dimension_scores if isinstance(model.dimension_scores, list) else json.loads(model.dimension_scores) + for ds_data in ds_list: + dimension_scores.append(DimensionScore( + dimension_id=ds_data.get('dimension_id', ''), + dimension_name=ds_data.get('dimension_name', ''), + score=float(ds_data.get('score', 0)), + weight=float(ds_data.get('weight', 1.0)), + comment=ds_data.get('comment') + )) + + # 解析推荐意见 + recommendation = None + if model.recommendation: + try: + recommendation = Recommendation(model.recommendation.lower()) + except ValueError: + recommendation = None + + # 解析tags, strengths, weaknesses + tags = model.tags if isinstance(model.tags, list) else (json.loads(model.tags) if model.tags else []) + strengths = model.strengths if isinstance(model.strengths, list) else (json.loads(model.strengths) if model.strengths else []) + weaknesses = model.weaknesses if isinstance(model.weaknesses, list) else (json.loads(model.weaknesses) if model.weaknesses else []) + + return Evaluation( + id=model.id, + candidate_id=model.candidate_id, + schema_id=model.schema_id, + job_id=model.job_id, + overall_score=float(model.overall_score) if model.overall_score else 0.0, + dimension_scores=dimension_scores, + tags=tags, + summary=model.summary, + strengths=strengths, + weaknesses=weaknesses, + recommendation=recommendation, + raw_response=model.raw_response, + created_at=model.created_at + ) + + def _entity_to_model(self, entity: Evaluation) -> EvaluationModel: + """将实体转换为模型""" + # 序列化维度评分 + dimension_scores_json = [ + { + 'dimension_id': ds.dimension_id, + 'dimension_name': ds.dimension_name, + 'score': ds.score, + 'weight': ds.weight, + 'comment': ds.comment + } + for ds in entity.dimension_scores + ] if entity.dimension_scores else [] + + return EvaluationModel( + id=entity.id or str(uuid.uuid4()), + candidate_id=entity.candidate_id, + schema_id=entity.schema_id, + job_id=entity.job_id, + overall_score=entity.overall_score, + dimension_scores=dimension_scores_json, + tags=entity.tags or [], + summary=entity.summary, + strengths=entity.strengths or [], + weaknesses=entity.weaknesses or [], + recommendation=entity.recommendation.value if entity.recommendation else None, + raw_response=entity.raw_response + ) + + def save(self, evaluation: Evaluation) -> Evaluation: + """保存评价记录(插入或更新)""" + session = self._get_session() + try: + if evaluation.id: + # 检查是否已存在 + existing = session.execute( + select(EvaluationModel).where(EvaluationModel.id == evaluation.id) + ).scalar_one_or_none() + + if existing: + # 更新现有记录 + existing.overall_score = evaluation.overall_score + existing.dimension_scores = [ + { + 'dimension_id': ds.dimension_id, + 'dimension_name': ds.dimension_name, + 'score': ds.score, + 'weight': ds.weight, + 'comment': ds.comment + } + for ds in evaluation.dimension_scores + ] if evaluation.dimension_scores else [] + existing.tags = evaluation.tags or [] + existing.summary = evaluation.summary + existing.strengths = evaluation.strengths or [] + existing.weaknesses = evaluation.weaknesses or [] + existing.recommendation = evaluation.recommendation.value if evaluation.recommendation else None + existing.raw_response = evaluation.raw_response + session.commit() + return evaluation + + # 插入新记录 + evaluation.id = evaluation.id or str(uuid.uuid4()) + model = self._entity_to_model(evaluation) + session.add(model) + session.commit() + return evaluation + finally: + session.close() + + def find_by_id(self, evaluation_id: str) -> Optional[Evaluation]: + """根据ID查询评价记录""" + session = self._get_session() + try: + result = session.execute( + select(EvaluationModel).where(EvaluationModel.id == evaluation_id) + ) + model = result.scalar_one_or_none() + return self._model_to_entity(model) if model else None + finally: + session.close() + + def find_by_candidate_id(self, candidate_id: str) -> List[Evaluation]: + """根据候选人ID查询评价记录""" + session = self._get_session() + try: + result = session.execute( + select(EvaluationModel) + .where(EvaluationModel.candidate_id == candidate_id) + .order_by(EvaluationModel.created_at.desc()) + ) + models = result.scalars().all() + return [self._model_to_entity(m) for m in models] + finally: + session.close() + + def find_latest_by_candidate_id(self, candidate_id: str) -> Optional[Evaluation]: + """获取候选人最新的评价记录""" + session = self._get_session() + try: + result = session.execute( + select(EvaluationModel) + .where(EvaluationModel.candidate_id == candidate_id) + .order_by(EvaluationModel.created_at.desc()) + .limit(1) + ) + model = result.scalar_one_or_none() + return self._model_to_entity(model) if model else None + finally: + session.close() + + def find_by_job_id(self, job_id: str) -> List[Evaluation]: + """根据职位ID查询评价记录""" + session = self._get_session() + try: + result = session.execute( + select(EvaluationModel) + .where(EvaluationModel.job_id == job_id) + .order_by(EvaluationModel.created_at.desc()) + ) + models = result.scalars().all() + return [self._model_to_entity(m) for m in models] + finally: + session.close() + + def find_by_schema_id(self, schema_id: str) -> List[Evaluation]: + """根据评价方案ID查询评价记录""" + session = self._get_session() + try: + result = session.execute( + select(EvaluationModel) + .where(EvaluationModel.schema_id == schema_id) + .order_by(EvaluationModel.created_at.desc()) + ) + models = result.scalars().all() + return [self._model_to_entity(m) for m in models] + finally: + session.close() + + def find_by_candidate_and_job(self, candidate_id: str, job_id: str) -> Optional[Evaluation]: + """根据候选人ID和职位ID查询评价记录""" + session = self._get_session() + try: + result = session.execute( + select(EvaluationModel) + .where(EvaluationModel.candidate_id == candidate_id) + .where(EvaluationModel.job_id == job_id) + .order_by(EvaluationModel.created_at.desc()) + .limit(1) + ) + model = result.scalar_one_or_none() + return self._model_to_entity(model) if model else None + finally: + session.close() + + def find_high_score_candidates( + self, + min_score: float, + job_id: Optional[str] = None, + page: int = 1, + page_size: int = 20 + ) -> tuple: + """ + 查询高分候选人 + + Args: + min_score: 最低评分 + job_id: 职位ID(可选) + page: 页码 + page_size: 每页数量 + + Returns: + tuple: (评价列表, 总记录数) + """ + session = self._get_session() + try: + from sqlalchemy import func + + stmt = select(EvaluationModel).where(EvaluationModel.overall_score >= min_score) + count_stmt = select(func.count()).select_from(EvaluationModel).where( + EvaluationModel.overall_score >= min_score + ) + + if job_id: + stmt = stmt.where(EvaluationModel.job_id == job_id) + count_stmt = count_stmt.where(EvaluationModel.job_id == job_id) + + # 获取总数 + total = session.execute(count_stmt).scalar() + + # 分页查询,按分数降序 + stmt = stmt.order_by(EvaluationModel.overall_score.desc()) + stmt = stmt.offset((page - 1) * page_size).limit(page_size) + + results = session.execute(stmt).scalars().all() + return [self._model_to_entity(r) for r in results], total + finally: + session.close() + + def delete(self, evaluation_id: str) -> bool: + """删除评价记录""" + session = self._get_session() + try: + stmt = delete(EvaluationModel).where(EvaluationModel.id == evaluation_id) + result = session.execute(stmt) + session.commit() + return result.rowcount > 0 + finally: + session.close() + + def delete_by_candidate_id(self, candidate_id: str) -> int: + """删除候选人的所有评价记录""" + session = self._get_session() + try: + stmt = delete(EvaluationModel).where(EvaluationModel.candidate_id == candidate_id) + result = session.execute(stmt) + session.commit() + return result.rowcount + finally: + session.close() + + # ============== 评价方案查询方法 ============== + + def _schema_model_to_entity(self, model: EvaluationSchemaModel) -> EvaluationSchema: + """将评价方案模型转换为实体""" + dimensions = model.dimensions if isinstance(model.dimensions, list) else ( + json.loads(model.dimensions) if model.dimensions else [] + ) + weights = model.weights if isinstance(model.weights, dict) else ( + json.loads(model.weights) if model.weights else {} + ) + + return EvaluationSchema( + id=model.id, + name=model.name, + description=model.description, + dimensions=dimensions, + weights=weights, + prompt_template=model.prompt_template, + is_default=model.is_default, + created_at=model.created_at, + updated_at=model.updated_at + ) + + def find_schema_by_id(self, schema_id: str) -> Optional[EvaluationSchema]: + """根据ID查询评价方案""" + session = self._get_session() + try: + result = session.execute( + select(EvaluationSchemaModel).where(EvaluationSchemaModel.id == schema_id) + ) + model = result.scalar_one_or_none() + return self._schema_model_to_entity(model) if model else None + finally: + session.close() + + def find_default_schema(self) -> Optional[EvaluationSchema]: + """查询默认评价方案""" + session = self._get_session() + try: + result = session.execute( + select(EvaluationSchemaModel) + .where(EvaluationSchemaModel.is_default == True) + .limit(1) + ) + model = result.scalar_one_or_none() + return self._schema_model_to_entity(model) if model else None + finally: + session.close() + + def find_all_schemas( + self, + page: int = 1, + page_size: int = 20 + ) -> tuple: + """ + 查询所有评价方案 + + Args: + page: 页码 + page_size: 每页数量 + + Returns: + tuple: (评价方案列表, 总记录数) + """ + from sqlalchemy import func + + session = self._get_session() + try: + stmt = select(EvaluationSchemaModel) + count_stmt = select(func.count()).select_from(EvaluationSchemaModel) + + # 获取总数 + total = session.execute(count_stmt).scalar() + + # 分页查询,默认方案在前,按创建时间降序 + stmt = stmt.order_by( + EvaluationSchemaModel.is_default.desc(), + EvaluationSchemaModel.created_at.desc() + ) + stmt = stmt.offset((page - 1) * page_size).limit(page_size) + + results = session.execute(stmt).scalars().all() + return [self._schema_model_to_entity(r) for r in results], total + finally: + session.close() diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/mapper/job_mapper.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/mapper/job_mapper.py index 9b4f103..d1e767f 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/mapper/job_mapper.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/mapper/job_mapper.py @@ -45,6 +45,7 @@ class JobMapper: source=CandidateSource(source_value), source_id=model.source_id, recruiter_id=model.recruiter_id, + evaluation_schema_id=model.evaluation_schema_id, title=model.title, department=model.department, location=model.location, @@ -79,6 +80,7 @@ class JobMapper: source=entity.source.value, source_id=entity.source_id, recruiter_id=entity.recruiter_id, + evaluation_schema_id=entity.evaluation_schema_id, title=entity.title, department=entity.department, location=entity.location, @@ -114,6 +116,7 @@ class JobMapper: description=job.description, status=job.status.value, recruiter_id=job.recruiter_id, + evaluation_schema_id=job.evaluation_schema_id, candidate_count=job.candidate_count, new_candidate_count=job.new_candidate_count, last_sync_at=job.last_sync_at or datetime.now(), @@ -269,3 +272,69 @@ class JobMapper: return result.rowcount finally: session.close() + + def find_filtered_jobs( + self, + source: Optional[str] = None, + recruiter_id: Optional[str] = None, + evaluation_schema_id: Optional[str] = None, + status: Optional[str] = None, + keyword: Optional[str] = None, + page: int = 1, + page_size: int = 20 + ) -> tuple: + """ + 多条件筛选查询职位 + + Args: + source: 平台来源 + recruiter_id: 招聘者账号ID + evaluation_schema_id: 评价方案ID + status: 职位状态 + keyword: 关键词搜索(标题/部门) + page: 页码 + page_size: 每页数量 + + Returns: + tuple: (职位列表, 总记录数) + """ + from sqlalchemy import func + + session = self._get_session() + try: + stmt = select(JobModel) + count_stmt = select(func.count()).select_from(JobModel) + + # 动态添加筛选条件 + conditions = [] + if source: + conditions.append(JobModel.source == source.lower()) + if recruiter_id: + conditions.append(JobModel.recruiter_id == recruiter_id) + if evaluation_schema_id: + conditions.append(JobModel.evaluation_schema_id == evaluation_schema_id) + if status: + conditions.append(JobModel.status == status.lower()) + if keyword: + conditions.append( + (JobModel.title.ilike(f"%{keyword}%")) | + (JobModel.department.ilike(f"%{keyword}%")) + ) + + # 应用条件 + if conditions: + for condition in conditions: + stmt = stmt.where(condition) + count_stmt = count_stmt.where(condition) + + # 获取总数 + total = session.execute(count_stmt).scalar() + + # 分页查询,按最后同步时间降序 + stmt = stmt.order_by(JobModel.last_sync_at.desc().nullslast()) + stmt = stmt.offset((page - 1) * page_size).limit(page_size) + + results = session.execute(stmt).scalars().all() + return [self._model_to_entity(r) for r in results], total + finally: + session.close() diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/__init__.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/__init__.py index 30c71ab..1c5c024 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/__init__.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/__init__.py @@ -1 +1,10 @@ """Service layer - Business logic""" + +from .resume_evaluation_service import ResumeEvaluationService, ProcessResult +from .recruiter_service import RecruiterService + +__all__ = [ + 'ResumeEvaluationService', + 'ProcessResult', + 'RecruiterService', +] diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/crawler/boss_crawler.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/crawler/boss_crawler.py index 52ab225..c520b8d 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/crawler/boss_crawler.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/crawler/boss_crawler.py @@ -307,3 +307,94 @@ class BossCrawler(BaseCrawler): if highest: return getattr(highest, 'schoolName', None) return None + + def send_greeting(self, candidate: Candidate, message: str) -> bool: + """ + 向候选人发送打招呼消息 + + Args: + candidate: 候选人对象(需包含 raw_data) + message: 打招呼消息内容 + + Returns: + bool: 是否发送成功 + """ + try: + geek_data = getattr(candidate, 'raw_data', None) + if geek_data is None: + print(f"[BossCrawler] 候选人 {candidate.name} 缺少 raw_data,无法发送打招呼") + return False + + # 从 geek_data 中提取必要的参数 + geek_card = getattr(geek_data, 'geekCard', None) + if not geek_card: + print(f"[BossCrawler] 候选人 {candidate.name} 缺少 geekCard,无法发送打招呼") + return False + + # 获取发送消息所需的参数 + encrypt_geek_id = getattr(geek_card, 'encryptGeekId', '') or getattr(geek_data, 'encryptGeekId', '') + encrypt_job_id = getattr(geek_card, 'encryptJobId', '') + security_id = getattr(geek_card, 'securityId', '') + lid = getattr(geek_card, 'lid', '') + expect_id = getattr(geek_card, 'expectId', 0) + + if not encrypt_geek_id: + print(f"[BossCrawler] 候选人 {candidate.name} 缺少 encryptGeekId,无法发送打招呼") + return False + + # 调用 SDK 的打招呼接口 + # 注意:这里假设 SDK 提供了 send_message 或 greet 方法 + # 实际方法名需要根据 ylhp-boss-hr SDK 的实际接口调整 + if hasattr(self.client, 'send_message'): + result = self.client.send_message( + encryptGeekId=encrypt_geek_id, + encryptJobId=encrypt_job_id, + securityId=security_id, + lid=lid, + expectId=expect_id, + content=message + ) + print(f"[BossCrawler] 向候选人 {candidate.name} 发送打招呼成功") + return True + elif hasattr(self.client, 'greet'): + result = self.client.greet( + encryptGeekId=encrypt_geek_id, + encryptJobId=encrypt_job_id, + securityId=security_id, + lid=lid, + expectId=expect_id, + content=message + ) + print(f"[BossCrawler] 向候选人 {candidate.name} 发送打招呼成功") + return True + elif hasattr(self.client, 'say_hi'): + result = self.client.say_hi( + encryptGeekId=encrypt_geek_id, + encryptJobId=encrypt_job_id, + securityId=security_id, + lid=lid, + expectId=expect_id, + content=message + ) + print(f"[BossCrawler] 向候选人 {candidate.name} 发送打招呼成功") + return True + else: + print(f"[BossCrawler] SDK 不支持打招呼功能,请检查 ylhp-boss-hr SDK 版本") + return False + + except Exception as e: + print(f"[BossCrawler] 向候选人 {candidate.name} 发送打招呼失败: {e}") + return False + + def can_send_greeting(self) -> bool: + """ + 检查当前 SDK 是否支持打招呼功能 + + Returns: + bool: 是否支持 + """ + return ( + hasattr(self.client, 'send_message') or + hasattr(self.client, 'greet') or + hasattr(self.client, 'say_hi') + ) diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/resume_evaluation_service.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/resume_evaluation_service.py new file mode 100644 index 0000000..0bc2607 --- /dev/null +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/resume_evaluation_service.py @@ -0,0 +1,434 @@ +""" +简历评分服务 + +负责同步执行:获取简历 -> LLM分析 -> 评分入库 -> 触发后续动作 +确保在同一账号下串行执行,避免Boss平台检测 +""" +from dataclasses import dataclass, field +from datetime import datetime +from decimal import Decimal +from typing import Optional, Dict, Any, List +import json + +from .analysis.resume_analyzer import ResumeAnalyzer +from .analysis.evaluation_schema import EvaluationSchemaService +from .notification.notification_service import NotificationService +from .crawler.base_crawler import BaseCrawler +from .crawler.boss_crawler import BossCrawler +from ..mapper.candidate_mapper import CandidateMapper +from ..mapper.evaluation_mapper import EvaluationMapper +from ..domain.candidate import Candidate, CandidateStatus +from ..domain.evaluation import Evaluation +from ..domain.job import Job +from ..domain.resume import Resume +from ..config.settings import Settings, get_settings + + +@dataclass +class ProcessResult: + """候选人处理结果""" + candidate_id: str + candidate_name: str + job_id: str + job_title: str + + # 处理状态 + success: bool = True + status: str = "new" # new, updated, skipped, failed + message: str = "" + + # 评分结果 + evaluation_id: Optional[str] = None + overall_score: float = 0.0 + + # 触发动作结果 + greeting_sent: bool = False + notification_sent: bool = False + + # 时间戳 + timestamp: datetime = field(default_factory=datetime.now) + + def to_dict(self) -> Dict[str, Any]: + return { + "candidate_id": self.candidate_id, + "candidate_name": self.candidate_name, + "job_id": self.job_id, + "job_title": self.job_title, + "success": self.success, + "status": self.status, + "message": self.message, + "evaluation_id": self.evaluation_id, + "overall_score": self.overall_score, + "greeting_sent": self.greeting_sent, + "notification_sent": self.notification_sent, + "timestamp": self.timestamp.isoformat() + } + + +class ResumeEvaluationService: + """ + 简历评分服务 + + 负责同步执行:获取简历 -> LLM分析 -> 评分入库 -> 触发后续动作 + 确保在同一账号下串行执行,避免Boss平台检测 + """ + + def __init__( + self, + resume_analyzer: ResumeAnalyzer, + candidate_mapper: CandidateMapper, + evaluation_mapper: EvaluationMapper, + notification_service: Optional[NotificationService] = None, + schema_service: Optional[EvaluationSchemaService] = None, + settings: Optional[Settings] = None + ): + """ + 初始化简历评分服务 + + Args: + resume_analyzer: 简历分析器 + candidate_mapper: 候选人数据访问 + evaluation_mapper: 评价数据访问 + notification_service: 通知服务(可选) + schema_service: 评价方案服务(可选) + settings: 配置(可选) + """ + self.resume_analyzer = resume_analyzer + self.candidate_mapper = candidate_mapper + self.evaluation_mapper = evaluation_mapper + self.notification_service = notification_service + self.schema_service = schema_service or EvaluationSchemaService() + self.settings = settings or get_settings() + + async def process_candidate_sync( + self, + crawler: BaseCrawler, + candidate: Candidate, + job: Job, + schema_id: Optional[str] = None + ) -> ProcessResult: + """ + 同步处理单个候选人(获取简历+LLM分析+入库+触发动作) + + 关键:必须在同一线程/协程中串行完成,不能并行 + 这是为了确保在查看简历页面时不会进行其他操作,避免Boss平台检测 + + Args: + crawler: 爬虫实例 + candidate: 候选人对象 + job: 职位对象 + schema_id: 评价方案ID(可选,优先使用job关联的方案) + + Returns: + ProcessResult: 处理结果 + """ + result = ProcessResult( + candidate_id=candidate.id or "", + candidate_name=candidate.name, + job_id=job.id or "", + job_title=job.title + ) + + print(f"[ResumeEvaluationService] 开始处理候选人: {candidate.name}") + + try: + # 1. 获取简历详情 - 同步操作 + resume = self._get_resume(crawler, candidate) + if not resume: + result.success = False + result.status = "failed" + result.message = "无法获取简历详情" + print(f"[ResumeEvaluationService] 候选人 {candidate.name} 无法获取简历") + return result + + # 2. 确定评价方案ID + effective_schema_id = schema_id or job.evaluation_schema_id or "general" + + # 3. 调用LLM进行评分分析 - 同步操作(await确保串行) + evaluation = await self._analyze_resume( + candidate=candidate, + resume=resume, + job=job, + schema_id=effective_schema_id + ) + + if not evaluation or evaluation.overall_score == 0: + result.success = False + result.status = "failed" + result.message = "LLM分析失败" + print(f"[ResumeEvaluationService] 候选人 {candidate.name} LLM分析失败") + return result + + result.evaluation_id = evaluation.id + result.overall_score = evaluation.overall_score + + # 4. 保存评价结果到数据库 + saved_evaluation = self.evaluation_mapper.save(evaluation) + print(f"[ResumeEvaluationService] 候选人 {candidate.name} 评价已保存, 分数: {evaluation.overall_score}") + + # 5. 更新候选人的LLM评分字段 + self._update_candidate_score(candidate, evaluation) + + # 6. 检查是否超过阈值,触发后续动作 + await self._trigger_actions_if_qualified( + crawler=crawler, + candidate=candidate, + evaluation=evaluation, + job=job, + result=result + ) + + result.success = True + result.status = "new" + result.message = f"处理成功, 分数: {evaluation.overall_score}" + + print(f"[ResumeEvaluationService] 候选人 {candidate.name} 处理完成: {result.message}") + + except Exception as e: + result.success = False + result.status = "failed" + result.message = f"处理异常: {str(e)}" + print(f"[ResumeEvaluationService] 候选人 {candidate.name} 处理异常: {e}") + + return result + + def _get_resume(self, crawler: BaseCrawler, candidate: Candidate) -> Optional[Resume]: + """ + 获取简历详情 + + Args: + crawler: 爬虫实例 + candidate: 候选人对象 + + Returns: + Resume: 简历对象 + """ + try: + return crawler.get_resume_detail(candidate) + except Exception as e: + print(f"[ResumeEvaluationService] 获取简历失败: {e}") + return None + + async def _analyze_resume( + self, + candidate: Candidate, + resume: Resume, + job: Job, + schema_id: str + ) -> Optional[Evaluation]: + """ + 调用LLM进行简历分析 + + Args: + candidate: 候选人 + resume: 简历 + job: 职位 + schema_id: 评价方案ID + + Returns: + Evaluation: 评价结果 + """ + try: + # 确保candidate有ID + if not candidate.id: + import uuid + candidate.id = str(uuid.uuid4()) + + evaluation = await self.resume_analyzer.analyze( + candidate_id=candidate.id, + resume=resume, + schema_id=schema_id, + job_id=job.id + ) + + return evaluation + except Exception as e: + print(f"[ResumeEvaluationService] LLM分析失败: {e}") + return None + + def _update_candidate_score(self, candidate: Candidate, evaluation: Evaluation) -> None: + """ + 更新候选人的LLM评分字段 + + Args: + candidate: 候选人 + evaluation: 评价结果 + """ + try: + candidate.llm_filtered = evaluation.overall_score >= self.settings.score_threshold_greet + candidate.llm_score = Decimal(str(evaluation.overall_score)) + candidate.llm_score_details = evaluation.to_dict() + candidate.status = CandidateStatus.ANALYZED + + self.candidate_mapper.save(candidate) + + except Exception as e: + print(f"[ResumeEvaluationService] 更新候选人评分失败: {e}") + + async def _trigger_actions_if_qualified( + self, + crawler: BaseCrawler, + candidate: Candidate, + evaluation: Evaluation, + job: Job, + result: ProcessResult + ) -> None: + """ + 如果评分超过阈值,触发打招呼和通知 + + Args: + crawler: 爬虫实例 + candidate: 候选人 + evaluation: 评价结果 + job: 职位 + result: 处理结果(用于记录触发状态) + """ + score = evaluation.overall_score + + # 6.1 触发打招呼(如果启用且超阈值) + if (self.settings.auto_greet_enabled and + score >= self.settings.score_threshold_greet): + greeting_success = await self._trigger_greeting( + crawler=crawler, + candidate=candidate, + evaluation=evaluation + ) + result.greeting_sent = greeting_success + + # 6.2 触发HR通知(如果启用且超阈值) + if (self.settings.auto_notify_enabled and + score >= self.settings.score_threshold_notify): + notify_success = await self._trigger_notification( + candidate=candidate, + evaluation=evaluation, + job=job + ) + result.notification_sent = notify_success + + async def _trigger_greeting( + self, + crawler: BaseCrawler, + candidate: Candidate, + evaluation: Evaluation + ) -> bool: + """ + 触发打招呼 + + Args: + crawler: 爬虫实例 + candidate: 候选人 + evaluation: 评价结果 + + Returns: + bool: 是否成功 + """ + try: + # 检查爬虫是否支持打招呼 + if isinstance(crawler, BossCrawler) and crawler.can_send_greeting(): + message = self.settings.greet_message_template + success = crawler.send_greeting(candidate, message) + + if success: + # 更新候选人状态为已联系 + candidate.status = CandidateStatus.CONTACTED + self.candidate_mapper.save(candidate) + print(f"[ResumeEvaluationService] 向 {candidate.name} 发送打招呼成功") + + return success + else: + print(f"[ResumeEvaluationService] 爬虫不支持打招呼功能") + return False + + except Exception as e: + print(f"[ResumeEvaluationService] 触发打招呼失败: {e}") + return False + + async def _trigger_notification( + self, + candidate: Candidate, + evaluation: Evaluation, + job: Job + ) -> bool: + """ + 触发HR通知 + + Args: + candidate: 候选人 + evaluation: 评价结果 + job: 职位 + + Returns: + bool: 是否成功 + """ + try: + if not self.notification_service: + print(f"[ResumeEvaluationService] 通知服务未配置") + return False + + # 发送通知 + result = await self.notification_service.notify( + candidate=candidate, + evaluation=evaluation, + title=f"【人才推荐】{candidate.name} - {job.title}", + extra_data={ + "job_id": job.id, + "job_title": job.title, + "score": evaluation.overall_score + } + ) + + if result.success: + # 更新候选人状态为已推送 + if candidate.status != CandidateStatus.CONTACTED: + candidate.status = CandidateStatus.PUSHED + self.candidate_mapper.save(candidate) + print(f"[ResumeEvaluationService] 候选人 {candidate.name} 已推送给HR") + + return result.success + + except Exception as e: + print(f"[ResumeEvaluationService] 触发HR通知失败: {e}") + return False + + def should_process_candidate( + self, + candidate: Candidate, + job: Job + ) -> bool: + """ + 判断是否应该处理该候选人 + + 用于跳过已处理过的候选人 + + Args: + candidate: 候选人 + job: 职位 + + Returns: + bool: 是否应该处理 + """ + # 如果候选人已有评价记录,检查是否需要重新评价 + if candidate.id: + existing = self.evaluation_mapper.find_by_candidate_and_job( + candidate.id, job.id + ) + if existing: + print(f"[ResumeEvaluationService] 候选人 {candidate.name} 已有评价记录,跳过") + return False + + return True + + def get_evaluation_summary(self, candidate_id: str) -> Optional[Dict[str, Any]]: + """ + 获取候选人评价摘要 + + Args: + candidate_id: 候选人ID + + Returns: + 评价摘要字典 + """ + evaluation = self.evaluation_mapper.find_latest_by_candidate_id(candidate_id) + if evaluation: + return evaluation.to_dict() + return None