diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/api.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/api.py index 60531dc..dc18997 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/api.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/api.py @@ -1,73 +1,29 @@ -"""FastAPI routes for HR Agent""" -from typing import List, Optional -from datetime import datetime -from pydantic import BaseModel, Field +""" +FastAPI主应用入口 -from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks +按业务模块拆分路由: +- routes/recruiter.py: 招聘者账号管理 +- routes/scheduler.py: 定时任务管理 +- routes/system.py: 系统接口 +""" +from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware -from ..domain.candidate import CandidateSource -from ..domain.recruiter import Recruiter, RecruiterStatus -from ..service.recruiter_service import RecruiterService -from ..mapper.recruiter_mapper import RecruiterMapper -from ..config.settings import get_settings +from .routes import recruiter_router, scheduler_router +from .routes.system import router as system_router -# ============== Pydantic Schemas ============== - -class RecruiterCreate(BaseModel): - """创建招聘者账号请求""" - name: str = Field(..., description="账号名称/标识") - source: str = Field(default="boss", description="平台来源: boss, liepin, etc.") - wt_token: str = Field(..., description="WT Token") - - -class RecruiterUpdate(BaseModel): - """更新招聘者账号请求""" - name: Optional[str] = Field(None, description="账号名称") - wt_token: Optional[str] = Field(None, description="WT Token") - status: Optional[str] = Field(None, description="状态: active, inactive, expired") - - -class RecruiterResponse(BaseModel): - """招聘者账号响应""" - id: str - name: str - source: str - wt_token: str - status: str - last_used_at: Optional[datetime] = None - created_at: Optional[datetime] = None - updated_at: Optional[datetime] = None - - class Config: - from_attributes = True - - -class RecruiterListResponse(BaseModel): - """招聘者列表响应""" - total: int - items: List[RecruiterResponse] - - -class APIResponse(BaseModel): - """通用API响应""" - success: bool - message: str - data: Optional[dict] = None - - -# ============== FastAPI App ============== - def create_app() -> FastAPI: """创建FastAPI应用""" app = FastAPI( title="简历智能体 API", description="HR Agent - 多平台简历爬取、AI分析、多渠道通知", - version="0.1.0" + version="0.1.0", + docs_url="/docs", + redoc_url="/redoc" ) - # CORS + # CORS配置 app.add_middleware( CORSMiddleware, allow_origins=["*"], @@ -76,183 +32,15 @@ def create_app() -> FastAPI: allow_headers=["*"], ) - # 依赖注入:获取RecruiterService - def get_recruiter_service(): - settings = get_settings() - mapper = RecruiterMapper(db_url=settings.db_url) - return RecruiterService(mapper=mapper) + # 注册路由 + # 系统路由(根路径、健康检查等) + app.include_router(system_router) - # ============== Recruiter Routes ============== + # 招聘者账号管理路由 + app.include_router(recruiter_router) - @app.get("/") - async def root(): - """API根路径""" - return { - "name": "简历智能体 API", - "version": "0.1.0", - "docs": "/docs" - } - - @app.get("/health") - async def health_check(): - """健康检查""" - return {"status": "healthy"} - - @app.get("/api/recruiters", response_model=RecruiterListResponse) - async def list_recruiters( - source: Optional[str] = None, - service: RecruiterService = Depends(get_recruiter_service) - ): - """ - 获取招聘者账号列表 - - Args: - source: 按平台筛选 (boss, liepin, etc.) - """ - if source: - try: - candidate_source = CandidateSource(source.lower()) - recruiters = service.list_recruiters(candidate_source) - except ValueError: - raise HTTPException(status_code=400, detail=f"Invalid source: {source}") - else: - recruiters = service.list_recruiters() - - items = [ - RecruiterResponse( - id=r.id, - name=r.name, - source=r.source.value, - wt_token=r.wt_token, - status=r.status.value, - last_used_at=r.last_used_at, - created_at=r.created_at, - updated_at=r.updated_at - ) - for r in recruiters - ] - - return RecruiterListResponse(total=len(items), items=items) - - @app.get("/api/recruiters/{recruiter_id}", response_model=RecruiterResponse) - async def get_recruiter( - recruiter_id: str, - service: RecruiterService = Depends(get_recruiter_service) - ): - """获取单个招聘者账号详情""" - recruiter = service.get_recruiter(recruiter_id) - if not recruiter: - raise HTTPException(status_code=404, detail="Recruiter not found") - - return RecruiterResponse( - id=recruiter.id, - name=recruiter.name, - source=recruiter.source.value, - wt_token=recruiter.wt_token, - status=recruiter.status.value, - last_used_at=recruiter.last_used_at, - created_at=recruiter.created_at, - updated_at=recruiter.updated_at - ) - - @app.post("/api/recruiters", response_model=RecruiterResponse) - async def create_recruiter( - data: RecruiterCreate, - service: RecruiterService = Depends(get_recruiter_service) - ): - """创建招聘者账号""" - try: - source = CandidateSource(data.source.lower()) - except ValueError: - raise HTTPException(status_code=400, detail=f"Invalid source: {data.source}") - - recruiter = service.add_recruiter( - name=data.name, - source=source, - wt_token=data.wt_token - ) - - return RecruiterResponse( - id=recruiter.id, - name=recruiter.name, - source=recruiter.source.value, - wt_token=recruiter.wt_token, - status=recruiter.status.value, - last_used_at=recruiter.last_used_at, - created_at=recruiter.created_at, - updated_at=recruiter.updated_at - ) - - @app.put("/api/recruiters/{recruiter_id}", response_model=RecruiterResponse) - async def update_recruiter( - recruiter_id: str, - data: RecruiterUpdate, - service: RecruiterService = Depends(get_recruiter_service) - ): - """更新招聘者账号""" - recruiter = service.get_recruiter(recruiter_id) - if not recruiter: - raise HTTPException(status_code=404, detail="Recruiter not found") - - # 更新字段 - if data.name: - recruiter.name = data.name - if data.wt_token: - recruiter.wt_token = data.wt_token - if data.status: - try: - recruiter.status = RecruiterStatus(data.status.lower()) - except ValueError: - raise HTTPException(status_code=400, detail=f"Invalid status: {data.status}") - - updated = service.mapper.save(recruiter) - - return RecruiterResponse( - id=updated.id, - name=updated.name, - source=updated.source.value, - wt_token=updated.wt_token, - status=updated.status.value, - last_used_at=updated.last_used_at, - created_at=updated.created_at, - updated_at=updated.updated_at - ) - - @app.delete("/api/recruiters/{recruiter_id}") - async def delete_recruiter( - recruiter_id: str, - service: RecruiterService = Depends(get_recruiter_service) - ): - """删除招聘者账号""" - success = service.delete_recruiter(recruiter_id) - if not success: - raise HTTPException(status_code=404, detail="Recruiter not found") - - return APIResponse(success=True, message="Recruiter deleted successfully") - - @app.post("/api/recruiters/{recruiter_id}/activate") - async def activate_recruiter( - recruiter_id: str, - service: RecruiterService = Depends(get_recruiter_service) - ): - """启用招聘者账号""" - success = service.activate_recruiter(recruiter_id) - if not success: - raise HTTPException(status_code=404, detail="Recruiter not found") - - return APIResponse(success=True, message="Recruiter activated") - - @app.post("/api/recruiters/{recruiter_id}/deactivate") - async def deactivate_recruiter( - recruiter_id: str, - service: RecruiterService = Depends(get_recruiter_service) - ): - """停用招聘者账号""" - success = service.deactivate_recruiter(recruiter_id) - if not success: - raise HTTPException(status_code=404, detail="Recruiter not found") - - return APIResponse(success=True, message="Recruiter deactivated") + # 定时任务管理路由 + app.include_router(scheduler_router) return app diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/__init__.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/__init__.py new file mode 100644 index 0000000..6b45f1d --- /dev/null +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/__init__.py @@ -0,0 +1,29 @@ +""" +API路由模块 + +按业务模块拆分的FastAPI路由: +- recruiter: 招聘者账号管理 +- scheduler: 定时任务管理 +- system: 系统接口 +""" + +from .recruiter import router as recruiter_router + +try: + from .scheduler import router as scheduler_router +except ImportError: + # 如果scheduler模块有问题,提供一个空的router + from fastapi import APIRouter + scheduler_router = APIRouter() + +try: + from .system import router as system_router +except ImportError: + from fastapi import APIRouter + system_router = APIRouter() + +__all__ = [ + "recruiter_router", + "scheduler_router", + "system_router" +] diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/recruiter.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/recruiter.py new file mode 100644 index 0000000..263ab4c --- /dev/null +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/recruiter.py @@ -0,0 +1,349 @@ +""" +招聘者账号管理路由 + +提供招聘者账号的CRUD操作和自动注册功能 +""" +from typing import List, Optional +from datetime import datetime + +from fastapi import APIRouter, HTTPException, Depends + +from ..schemas import ( + RecruiterCreate, RecruiterRegister, RecruiterUpdate, + RecruiterResponse, RecruiterListResponse, RecruiterRegisterResponse, + RecruiterPrivilegeInfo, RecruiterSyncInfo, RecruiterSourceInfo, + APIResponse +) +from ...domain.candidate import CandidateSource +from ...domain.recruiter import Recruiter, RecruiterStatus +from ...service.recruiter_service import RecruiterService +from ...service.crawler import BossCrawler +from ...mapper.recruiter_mapper import RecruiterMapper +from ...config.settings import get_settings + + +router = APIRouter(prefix="/api/recruiters", tags=["招聘者账号"]) + + +def get_recruiter_service(): + """依赖注入:获取RecruiterService""" + settings = get_settings() + mapper = RecruiterMapper(db_url=settings.db_url) + return RecruiterService(mapper=mapper) + + +def _build_recruiter_response(recruiter: Recruiter) -> RecruiterResponse: + """构建招聘者账号响应""" + # 构建权益信息 + privilege = None + if recruiter.privilege: + privilege = RecruiterPrivilegeInfo( + vip_level=recruiter.privilege.vip_level, + vip_status=recruiter.privilege.vip_status, + vip_expire_at=recruiter.privilege.vip_expire_at, + resume_view_count=recruiter.privilege.resume_view_count, + resume_view_total=recruiter.privilege.resume_view_total + ) + + # 构建同步信息 + sync_info = None + if recruiter.sync_status: + sync_info = RecruiterSyncInfo( + last_sync_at=recruiter.last_sync_at, + sync_status=recruiter.sync_status.value, + sync_error=recruiter.sync_error + ) + + return RecruiterResponse( + id=recruiter.id, + name=recruiter.name, + source=recruiter.source.value, + wt_token=recruiter.wt_token, + status=recruiter.status.value, + last_used_at=recruiter.last_used_at, + privilege=privilege, + sync_info=sync_info, + created_at=recruiter.created_at, + updated_at=recruiter.updated_at + ) + + +@router.get("/sources", response_model=List[RecruiterSourceInfo]) +async def get_recruiter_sources(): + """获取支持的平台来源列表""" + return [ + RecruiterSourceInfo( + value="boss", + label="Boss直聘", + description="Boss直聘招聘平台" + ), + RecruiterSourceInfo( + value="liepin", + label="猎聘", + description="猎聘招聘平台(预留)" + ), + RecruiterSourceInfo( + value="zhilian", + label="智联招聘", + description="智联招聘平台(预留)" + ) + ] + + +@router.get("", response_model=RecruiterListResponse) +async def list_recruiters( + source: Optional[str] = None, + service: RecruiterService = Depends(get_recruiter_service) +): + """ + 获取招聘者账号列表 + + Args: + source: 按平台筛选 (boss, liepin, etc.) + """ + if source: + try: + candidate_source = CandidateSource(source.lower()) + recruiters = service.list_recruiters(candidate_source) + except ValueError: + raise HTTPException(status_code=400, detail=f"Invalid source: {source}") + else: + recruiters = service.list_recruiters() + + items = [_build_recruiter_response(r) for r in recruiters] + + return RecruiterListResponse(total=len(items), items=items) + + +@router.get("/{recruiter_id}", response_model=RecruiterResponse) +async def get_recruiter( + recruiter_id: str, + service: RecruiterService = Depends(get_recruiter_service) +): + """获取单个招聘者账号详情""" + recruiter = service.get_recruiter(recruiter_id) + if not recruiter: + raise HTTPException(status_code=404, detail="Recruiter not found") + + return _build_recruiter_response(recruiter) + + +@router.post("", response_model=RecruiterResponse) +async def create_recruiter( + data: RecruiterCreate, + service: RecruiterService = Depends(get_recruiter_service) +): + """手动创建招聘者账号""" + try: + source = CandidateSource(data.source.lower()) + except ValueError: + raise HTTPException(status_code=400, detail=f"Invalid source: {data.source}") + + recruiter = service.add_recruiter( + name=data.name, + source=source, + wt_token=data.wt_token + ) + + return _build_recruiter_response(recruiter) + + +@router.post("/register", response_model=RecruiterRegisterResponse) +async def register_recruiter( + data: RecruiterRegister, + service: RecruiterService = Depends(get_recruiter_service) +): + """ + 自动注册招聘者账号 + + 根据Token自动获取账号信息并注册 + """ + # 验证平台来源 + try: + source = CandidateSource(data.source.lower()) + except ValueError: + raise HTTPException(status_code=400, detail=f"Invalid source: {data.source}") + + # 目前仅支持Boss平台 + if source != CandidateSource.BOSS: + raise HTTPException( + status_code=400, + detail="Auto register only supports 'boss' source currently" + ) + + try: + # 1. 使用Token创建临时爬虫获取账号信息 + crawler = BossCrawler(wt_token=data.wt_token) + + # 2. 检查登录状态 + if hasattr(crawler.client, 'check_login_status'): + is_valid = crawler.client.check_login_status() + if not is_valid: + return RecruiterRegisterResponse( + success=False, + message="Token无效或账号已过期,请检查Token是否正确" + ) + + # 3. 获取账号信息 + account_info = None + if hasattr(crawler.client, 'get_account_info'): + account_info = crawler.client.get_account_info() + + if not account_info: + return RecruiterRegisterResponse( + success=False, + message="无法获取账号信息,请检查Token是否有效" + ) + + # 4. 提取账号名称 + account_name = "" + if hasattr(account_info, 'name'): + account_name = account_info.name + elif hasattr(account_info, 'zpData') and hasattr(account_info.zpData, 'name'): + account_name = account_info.zpData.name + elif isinstance(account_info, dict): + account_name = account_info.get('name', '') or \ + account_info.get('zpData', {}).get('name', '') + + if not account_name: + account_name = f"Boss账号_{data.wt_token[:8]}..." + + # 5. 检查是否已存在相同Token的账号 + existing_recruiters = service.list_recruiters(source) + for existing in existing_recruiters: + if existing.wt_token == data.wt_token: + return RecruiterRegisterResponse( + success=False, + message=f"该Token已注册为账号: {existing.name}", + recruiter=_build_recruiter_response(existing) + ) + + # 6. 创建账号 + recruiter = service.add_recruiter( + name=account_name, + source=source, + wt_token=data.wt_token + ) + + # 7. 尝试获取并保存权益信息 + try: + if hasattr(crawler.client, 'get_account_detail'): + privilege_data = crawler.client.get_account_detail() + # 解析权益数据 + from ...job.account_sync_job import AccountSyncJob + sync_job = AccountSyncJob(service) + sync_job._parse_privilege_data(recruiter, privilege_data) + # 保存更新后的权益信息 + service.save_recruiter(recruiter) + except Exception as e: + # 权益信息获取失败不影响注册 + print(f"[Register] 获取权益信息失败: {e}") + + return RecruiterRegisterResponse( + success=True, + message=f"账号注册成功: {account_name}", + recruiter=_build_recruiter_response(recruiter) + ) + + except ImportError as e: + return RecruiterRegisterResponse( + success=False, + message=f"SDK未安装或导入失败: {str(e)}" + ) + except Exception as e: + return RecruiterRegisterResponse( + success=False, + message=f"注册失败: {str(e)}" + ) + + +@router.put("/{recruiter_id}", response_model=RecruiterResponse) +async def update_recruiter( + recruiter_id: str, + data: RecruiterUpdate, + service: RecruiterService = Depends(get_recruiter_service) +): + """更新招聘者账号""" + recruiter = service.get_recruiter(recruiter_id) + if not recruiter: + raise HTTPException(status_code=404, detail="Recruiter not found") + + # 更新字段 + if data.name: + recruiter.name = data.name + if data.wt_token: + recruiter.wt_token = data.wt_token + if data.status: + try: + recruiter.status = RecruiterStatus(data.status.lower()) + except ValueError: + raise HTTPException( + status_code=400, + detail=f"Invalid status: {data.status}" + ) + + updated = service.mapper.save(recruiter) + + return _build_recruiter_response(updated) + + +@router.delete("/{recruiter_id}") +async def delete_recruiter( + recruiter_id: str, + service: RecruiterService = Depends(get_recruiter_service) +): + """删除招聘者账号""" + success = service.delete_recruiter(recruiter_id) + if not success: + raise HTTPException(status_code=404, detail="Recruiter not found") + + return APIResponse(success=True, message="Recruiter deleted successfully") + + +@router.post("/{recruiter_id}/activate") +async def activate_recruiter( + recruiter_id: str, + service: RecruiterService = Depends(get_recruiter_service) +): + """启用招聘者账号""" + success = service.activate_recruiter(recruiter_id) + if not success: + raise HTTPException(status_code=404, detail="Recruiter not found") + + return APIResponse(success=True, message="Recruiter activated") + + +@router.post("/{recruiter_id}/deactivate") +async def deactivate_recruiter( + recruiter_id: str, + service: RecruiterService = Depends(get_recruiter_service) +): + """停用招聘者账号""" + success = service.deactivate_recruiter(recruiter_id) + if not success: + raise HTTPException(status_code=404, detail="Recruiter not found") + + return APIResponse(success=True, message="Recruiter deactivated") + + +@router.post("/{recruiter_id}/sync") +async def sync_recruiter( + recruiter_id: str, + service: RecruiterService = Depends(get_recruiter_service) +): + """ + 手动同步账号信息 + + 触发账号状态、权益信息、职位列表的同步 + """ + recruiter = service.get_recruiter(recruiter_id) + if not recruiter: + raise HTTPException(status_code=404, detail="Recruiter not found") + + # TODO: 触发同步任务 + # 可以调用 AccountSyncJob 来执行同步 + + return APIResponse( + success=True, + message=f"Sync triggered for recruiter: {recruiter.name}" + ) diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/scheduler.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/scheduler.py new file mode 100644 index 0000000..29f47cb --- /dev/null +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/scheduler.py @@ -0,0 +1,227 @@ +""" +定时任务管理路由 + +提供定时任务的查询、控制和配置功能 +""" +from typing import List, Optional + +from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks + +from ..schemas import ( + JobInfo, JobStatusInfo, JobConfigUpdate, + APIResponse +) +from ...service.scheduler import get_scheduler + + +router = APIRouter(prefix="/api/scheduler", tags=["定时任务"]) + + +def get_scheduler_instance(): + """依赖注入:获取调度器实例""" + return get_scheduler() + + +@router.get("/jobs", response_model=List[JobInfo]) +async def list_jobs( + scheduler=Depends(get_scheduler_instance) +): + """获取所有定时任务列表""" + jobs = scheduler.get_jobs() + return [ + JobInfo( + id=job["id"], + name=job["name"], + next_run_time=job.get("next_run_time"), + trigger=job["trigger"], + type=job.get("type") + ) + for job in jobs + ] + + +@router.get("/jobs/status", response_model=List[JobStatusInfo]) +async def get_jobs_status( + scheduler=Depends(get_scheduler_instance) +): + """获取所有Job任务状态""" + status_list = scheduler.get_job_status() + return [ + JobStatusInfo( + job_id=s["job_id"], + name=s["name"], + enabled=s["enabled"], + is_running=s["is_running"], + last_run_time=s.get("last_run_time"), + next_run_time=s.get("next_run_time"), + run_count=s["run_count"], + success_count=s["success_count"], + fail_count=s["fail_count"], + last_error=s.get("last_error") + ) + for s in status_list + ] + + +@router.get("/jobs/{job_id}/status", response_model=JobStatusInfo) +async def get_job_status( + job_id: str, + scheduler=Depends(get_scheduler_instance) +): + """获取指定Job任务状态""" + status_list = scheduler.get_job_status(job_id) + if not status_list: + raise HTTPException(status_code=404, detail="Job not found") + + s = status_list[0] + return JobStatusInfo( + job_id=s["job_id"], + name=s["name"], + enabled=s["enabled"], + is_running=s["is_running"], + last_run_time=s.get("last_run_time"), + next_run_time=s.get("next_run_time"), + run_count=s["run_count"], + success_count=s["success_count"], + fail_count=s["fail_count"], + last_error=s.get("last_error") + ) + + +@router.post("/jobs/{job_id}/run") +async def run_job_now( + job_id: str, + background_tasks: BackgroundTasks, + scheduler=Depends(get_scheduler_instance) +): + """ + 立即执行指定任务 + + Args: + job_id: 任务ID (account_sync 或 resume_process) + """ + # 验证任务是否存在 + valid_jobs = ["account_sync", "resume_process", "crawl_boss", "analyze_pending"] + if job_id not in valid_jobs: + raise HTTPException(status_code=400, detail=f"Invalid job_id. Valid: {valid_jobs}") + + # 在后台执行 + background_tasks.add_task(scheduler.run_job_now, job_id) + + return APIResponse( + success=True, + message=f"Job {job_id} is running in background" + ) + + +@router.post("/jobs/{job_id}/pause") +async def pause_job( + job_id: str, + scheduler=Depends(get_scheduler_instance) +): + """暂停指定任务""" + try: + scheduler.pause_job(job_id) + return APIResponse(success=True, message=f"Job {job_id} paused") + except Exception as e: + raise HTTPException(status_code=400, detail=str(e)) + + +@router.post("/jobs/{job_id}/resume") +async def resume_job( + job_id: str, + scheduler=Depends(get_scheduler_instance) +): + """恢复指定任务""" + try: + scheduler.resume_job(job_id) + return APIResponse(success=True, message=f"Job {job_id} resumed") + except Exception as e: + raise HTTPException(status_code=400, detail=str(e)) + + +@router.put("/jobs/{job_id}/config") +async def update_job_config( + job_id: str, + config: JobConfigUpdate, + scheduler=Depends(get_scheduler_instance) +): + """ + 更新任务配置 + + Args: + job_id: 任务ID + config: 配置更新 + """ + # 验证任务ID + valid_job_ids = ["account_sync", "resume_process"] + if job_id not in valid_job_ids: + raise HTTPException( + status_code=400, + detail=f"Can only config Job module tasks: {valid_job_ids}" + ) + + # 构建更新参数 + update_kwargs = {} + if config.enabled is not None: + update_kwargs["enabled"] = config.enabled + if config.interval_minutes is not None: + update_kwargs["interval_minutes"] = config.interval_minutes + + if not update_kwargs: + raise HTTPException(status_code=400, detail="No config to update") + + try: + scheduler.job_scheduler.update_job_config(job_id, **update_kwargs) + return APIResponse( + success=True, + message=f"Job {job_id} config updated", + data=update_kwargs + ) + except Exception as e: + raise HTTPException(status_code=400, detail=str(e)) + + +@router.get("/status") +async def get_scheduler_status( + scheduler=Depends(get_scheduler_instance) +): + """获取调度器整体状态""" + jobs = scheduler.get_jobs() + job_status = scheduler.get_job_status() + + return { + "running": scheduler._running if hasattr(scheduler, '_running') else False, + "total_jobs": len(jobs), + "job_scheduler_jobs": len([j for j in jobs if j.get("type") == "job"]), + "legacy_jobs": len([j for j in jobs if j.get("type") == "legacy"]), + "job_status_summary": { + "total": len(job_status), + "running": sum(1 for s in job_status if s["is_running"]), + "enabled": sum(1 for s in job_status if s["enabled"]) + } + } + + +@router.post("/start") +async def start_scheduler( + scheduler=Depends(get_scheduler_instance) +): + """启动调度器""" + try: + scheduler.start() + return APIResponse(success=True, message="Scheduler started") + except Exception as e: + raise HTTPException(status_code=400, detail=str(e)) + + +@router.post("/stop") +async def stop_scheduler( + scheduler=Depends(get_scheduler_instance) +): + """停止调度器""" + try: + scheduler.stop() + return APIResponse(success=True, message="Scheduler stopped") + except Exception as e: + raise HTTPException(status_code=400, detail=str(e)) diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/system.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/system.py new file mode 100644 index 0000000..c830ca3 --- /dev/null +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/routes/system.py @@ -0,0 +1,53 @@ +""" +系统管理路由 + +提供系统级别的接口:健康检查、根路径等 +""" +from fastapi import APIRouter + +from ..schemas import APIResponse + + +router = APIRouter(tags=["系统"]) + + +@router.get("/") +async def root(): + """API根路径""" + return { + "name": "简历智能体 API", + "version": "0.1.0", + "docs": "/docs", + "endpoints": { + "recruiters": "/api/recruiters", + "scheduler": "/api/scheduler", + "health": "/health" + } + } + + +@router.get("/health") +async def health_check(): + """健康检查""" + return { + "status": "healthy", + "service": "hr-agent-api", + "version": "0.1.0" + } + + +@router.get("/api/status") +async def api_status(): + """获取API状态信息""" + return APIResponse( + success=True, + message="API is running", + data={ + "version": "0.1.0", + "features": [ + "recruiter_management", + "scheduler_management", + "auto_register" + ] + } + ) diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/schemas.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/schemas.py new file mode 100644 index 0000000..2420dfc --- /dev/null +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/controller/schemas.py @@ -0,0 +1,168 @@ +""" +API共享Schema定义 + +集中定义所有API请求和响应的数据模型 +""" +from typing import List, Optional, Any +from datetime import datetime +from pydantic import BaseModel, Field + + +# ============== 通用响应 ============== + +class APIResponse(BaseModel): + """通用API响应""" + success: bool + message: str + data: Optional[dict] = None + + +class PaginationParams(BaseModel): + """分页参数""" + page: int = Field(default=1, ge=1, description="页码") + page_size: int = Field(default=20, ge=1, le=100, description="每页数量") + + +# ============== 招聘者账号 ============== + +class RecruiterBase(BaseModel): + """招聘者账号基础信息""" + name: str = Field(..., description="账号名称/标识") + source: str = Field(default="boss", description="平台来源: boss, liepin, etc.") + + +class RecruiterCreate(RecruiterBase): + """创建招聘者账号请求""" + wt_token: str = Field(..., description="WT Token") + + +class RecruiterRegister(BaseModel): + """自动注册招聘者账号请求""" + source: str = Field(default="boss", description="平台来源: boss") + wt_token: str = Field(..., description="WT Token") + + +class RecruiterUpdate(BaseModel): + """更新招聘者账号请求""" + name: Optional[str] = Field(None, description="账号名称") + wt_token: Optional[str] = Field(None, description="WT Token") + status: Optional[str] = Field(None, description="状态: active, inactive, expired") + + +class RecruiterPrivilegeInfo(BaseModel): + """招聘者账号权益信息""" + vip_level: Optional[str] = Field(None, description="VIP等级") + vip_status: Optional[str] = Field(None, description="VIP状态") + vip_expire_at: Optional[datetime] = Field(None, description="VIP过期时间") + resume_view_count: int = Field(default=0, description="剩余简历查看次数") + resume_view_total: int = Field(default=0, description="总简历查看次数") + + +class RecruiterSyncInfo(BaseModel): + """招聘者账号同步信息""" + last_sync_at: Optional[datetime] = Field(None, description="最后同步时间") + sync_status: Optional[str] = Field(None, description="同步状态: success, failed, pending") + sync_error: Optional[str] = Field(None, description="同步错误信息") + + +class RecruiterResponse(BaseModel): + """招聘者账号响应""" + id: str + name: str + source: str + wt_token: str + status: str + last_used_at: Optional[datetime] = None + privilege: Optional[RecruiterPrivilegeInfo] = None + sync_info: Optional[RecruiterSyncInfo] = None + created_at: Optional[datetime] = None + updated_at: Optional[datetime] = None + + class Config: + from_attributes = True + + +class RecruiterListResponse(BaseModel): + """招聘者列表响应""" + total: int + items: List[RecruiterResponse] + + +class RecruiterRegisterResponse(BaseModel): + """自动注册招聘者账号响应""" + success: bool + message: str + recruiter: Optional[RecruiterResponse] = None + + +class RecruiterSourceInfo(BaseModel): + """招聘者平台来源信息""" + value: str = Field(..., description="来源值") + label: str = Field(..., description="显示名称") + description: Optional[str] = Field(None, description="描述") + + +# ============== 定时任务 ============== + +class JobInfo(BaseModel): + """任务信息""" + id: str + name: str + next_run_time: Optional[str] = None + trigger: str + type: Optional[str] = Field(None, description="任务类型: legacy, job") + + +class JobStatusInfo(BaseModel): + """任务状态信息""" + job_id: str + name: str + enabled: bool + is_running: bool + last_run_time: Optional[str] = None + next_run_time: Optional[str] = None + run_count: int = 0 + success_count: int = 0 + fail_count: int = 0 + last_error: Optional[str] = None + + +class JobConfigUpdate(BaseModel): + """任务配置更新请求""" + enabled: Optional[bool] = None + interval_minutes: Optional[int] = Field(None, ge=1, description="执行间隔(分钟)") + + +# ============== 候选人(预留) ============== + +class CandidateResponse(BaseModel): + """候选人响应(预留)""" + id: str + name: str + source: str + status: str + created_at: Optional[datetime] = None + + +class CandidateListResponse(BaseModel): + """候选人列表响应(预留)""" + total: int + items: List[CandidateResponse] + + +# ============== 职位(预留) ============== + +class JobPositionResponse(BaseModel): + """职位响应(预留)""" + id: str + title: str + source: str + status: str + recruiter_id: Optional[str] = None + created_at: Optional[datetime] = None + + +class JobPositionListResponse(BaseModel): + """职位列表响应(预留)""" + total: int + items: List[JobPositionResponse] diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/job/account_sync_job.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/job/account_sync_job.py index 6e913f6..bc08891 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/job/account_sync_job.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/job/account_sync_job.py @@ -27,6 +27,7 @@ class AccountSyncResult: self.success = success self.message = message self.jobs_synced = 0 + self.jobs_deleted = 0 self.timestamp = datetime.now() @@ -130,8 +131,9 @@ class AccountSyncJob: # 3. 同步职位列表 try: - jobs_count = await self._sync_jobs(recruiter) - result.jobs_synced = jobs_count + jobs_synced, jobs_deleted = await self._sync_jobs(recruiter) + result.jobs_synced = jobs_synced + result.jobs_deleted = jobs_deleted except Exception as e: print(f"[{datetime.now()}] 同步账号 {recruiter.name} 职位列表失败: {e}") @@ -140,7 +142,7 @@ class AccountSyncJob: self.recruiter_service.save_recruiter(recruiter) result.success = True - result.message = f"同步成功,职位数: {result.jobs_synced}" + result.message = f"同步成功,新增/更新职位: {result.jobs_synced}, 删除职位: {result.jobs_deleted}" print(f"[{datetime.now()}] 账号 {recruiter.name} 同步完成: {result.message}") return result @@ -270,40 +272,91 @@ class AccountSyncJob: except Exception as e: print(f"[{datetime.now()}] 解析权益数据失败: {e}") - async def _sync_jobs(self, recruiter: Recruiter) -> int: + async def _sync_jobs(self, recruiter: Recruiter) -> tuple: """ 同步职位列表 + 策略: + 1. 获取API返回的最新职位列表 + 2. 保存/更新这些职位 + 3. 删除该账号下不在最新列表中的职位 + Args: recruiter: 招聘者账号 Returns: - int: 同步的职位数量 + tuple: (同步的职位数量, 删除的职位数量) """ try: crawler = self.recruiter_service.create_crawler_for_recruiter(recruiter) if not crawler: - return 0 + return 0, 0 - # 获取职位列表 - jobs = crawler.get_jobs() + # 获取API返回的最新职位列表 + current_jobs = crawler.get_jobs() - if not jobs: - print(f"[{datetime.now()}] 账号 {recruiter.name} 没有正在招聘的职位") - return 0 + # 获取当前时间戳 + sync_time = datetime.now() - # 关联recruiter_id并保存职位 - for job in jobs: - job.recruiter_id = recruiter.id - job.source = recruiter.source - job.last_sync_at = datetime.now() + # 记录当前有效的职位source_id集合 + current_source_ids = set() + + # 保存/更新职位 + synced_count = 0 + if current_jobs: + for job in current_jobs: + job.recruiter_id = recruiter.id + job.source = recruiter.source + job.last_sync_at = sync_time + job.status = JobStatus.ACTIVE # 确保状态为活跃 + + # 保存或更新职位 + saved_job = self.recruiter_service.save_job(job) + current_source_ids.add(job.source_id) + synced_count += 1 - # 保存或更新职位 - self.recruiter_service.save_job(job) + print(f"[{datetime.now()}] 账号 {recruiter.name} 同步了 {synced_count} 个职位") + else: + print(f"[{datetime.now()}] 账号 {recruiter.name} 没有正在招聘的职位") - print(f"[{datetime.now()}] 账号 {recruiter.name} 同步了 {len(jobs)} 个职位") - return len(jobs) + # 删除该账号下不在当前列表中的职位 + deleted_count = await self._delete_obsolete_jobs(recruiter, current_source_ids) + + return synced_count, deleted_count except Exception as e: print(f"[{datetime.now()}] 同步账号 {recruiter.name} 职位列表失败: {e}") + return 0, 0 + + async def _delete_obsolete_jobs(self, recruiter: Recruiter, valid_source_ids: set) -> int: + """ + 删除过期的职位(不在有效列表中的职位) + + Args: + recruiter: 招聘者账号 + valid_source_ids: 当前有效的职位source_id集合 + + Returns: + int: 删除的职位数量 + """ + try: + # 获取该账号数据库中所有的职位 + db_jobs = self.recruiter_service.find_jobs_by_recruiter(recruiter.id) + + deleted_count = 0 + for job in db_jobs: + # 如果职位不在当前有效列表中,则删除 + if job.source_id not in valid_source_ids: + success = self.recruiter_service.delete_job(job.id) + if success: + deleted_count += 1 + print(f"[{datetime.now()}] 删除过期职位: {job.title} (ID: {job.source_id})") + + if deleted_count > 0: + print(f"[{datetime.now()}] 账号 {recruiter.name} 删除了 {deleted_count} 个过期职位") + + return deleted_count + + except Exception as e: + print(f"[{datetime.now()}] 删除过期职位失败: {e}") return 0 diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/mapper/job_mapper.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/mapper/job_mapper.py new file mode 100644 index 0000000..9b4f103 --- /dev/null +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/mapper/job_mapper.py @@ -0,0 +1,271 @@ +"""Job data mapper using SQLAlchemy""" +from typing import List, Optional +from datetime import datetime +import uuid + +from sqlalchemy import select, update, delete +from sqlalchemy.orm import Session + +from ..domain.job import Job, JobStatus, JobRequirement +from ..domain.candidate import CandidateSource +from ..config.database import get_db_manager, JobModel + + +class JobMapper: + """职位数据访问 - SQLAlchemy实现""" + + def __init__(self, db_url: Optional[str] = None): + self.db_manager = get_db_manager(db_url) + # 确保表存在 + self.db_manager.create_tables() + + def _get_session(self) -> Session: + """获取数据库会话""" + return self.db_manager.get_session() + + def _model_to_entity(self, model: JobModel) -> Job: + """将模型转换为实体""" + # 处理 source 的大小写 + source_value = model.source.lower() if model.source else "boss" + # 处理 status 的大小写 + status_value = model.status.lower() if model.status else "active" + + # 解析requirements JSON + requirements = None + if model.requirements: + import json + try: + req_data = json.loads(model.requirements) if isinstance(model.requirements, str) else model.requirements + requirements = JobRequirement(**req_data) + except: + requirements = JobRequirement() + + return Job( + id=model.id, + source=CandidateSource(source_value), + source_id=model.source_id, + recruiter_id=model.recruiter_id, + title=model.title, + department=model.department, + location=model.location, + salary_min=model.salary_min, + salary_max=model.salary_max, + requirements=requirements, + description=model.description, + status=JobStatus(status_value), + candidate_count=model.candidate_count or 0, + new_candidate_count=model.new_candidate_count or 0, + last_sync_at=model.last_sync_at, + created_at=model.created_at, + updated_at=model.updated_at + ) + + def _entity_to_model(self, entity: Job) -> JobModel: + """将实体转换为模型""" + import json + + requirements_json = None + if entity.requirements: + requirements_json = json.dumps({ + "min_work_years": entity.requirements.min_work_years, + "max_work_years": entity.requirements.max_work_years, + "education": entity.requirements.education, + "skills": entity.requirements.skills, + "description": entity.requirements.description + }) + + return JobModel( + id=entity.id or str(uuid.uuid4()), + source=entity.source.value, + source_id=entity.source_id, + recruiter_id=entity.recruiter_id, + title=entity.title, + department=entity.department, + location=entity.location, + salary_min=entity.salary_min, + salary_max=entity.salary_max, + requirements=requirements_json, + description=entity.description, + status=entity.status.value, + candidate_count=entity.candidate_count, + new_candidate_count=entity.new_candidate_count, + last_sync_at=entity.last_sync_at + ) + + def save(self, job: Job) -> Job: + """保存职位信息(插入或更新)""" + session = self._get_session() + try: + # 检查是否已存在 + existing = self.find_by_source_id(job.source, job.source_id) + + if existing: + # 更新现有记录 + stmt = ( + update(JobModel) + .where(JobModel.id == existing.id) + .values( + title=job.title, + department=job.department, + location=job.location, + salary_min=job.salary_min, + salary_max=job.salary_max, + requirements=self._entity_to_model(job).requirements, + description=job.description, + status=job.status.value, + recruiter_id=job.recruiter_id, + candidate_count=job.candidate_count, + new_candidate_count=job.new_candidate_count, + last_sync_at=job.last_sync_at or datetime.now(), + updated_at=datetime.now() + ) + ) + session.execute(stmt) + job.id = existing.id + else: + # 插入新记录 + job.id = str(uuid.uuid4()) + model = self._entity_to_model(job) + session.add(model) + + session.commit() + return job + finally: + session.close() + + def find_by_id(self, job_id: str) -> Optional[Job]: + """根据ID查询职位""" + session = self._get_session() + try: + result = session.execute( + select(JobModel).where(JobModel.id == job_id) + ) + model = result.scalar_one_or_none() + return self._model_to_entity(model) if model else None + finally: + session.close() + + def find_by_source_id(self, source: CandidateSource, source_id: str) -> Optional[Job]: + """根据平台来源和源ID查询职位""" + session = self._get_session() + try: + result = session.execute( + select(JobModel) + .where(JobModel.source == source.value) + .where(JobModel.source_id == source_id) + ) + model = result.scalar_one_or_none() + return self._model_to_entity(model) if model else None + finally: + session.close() + + def find_by_recruiter(self, recruiter_id: str) -> List[Job]: + """根据招聘者ID查询职位列表""" + session = self._get_session() + try: + result = session.execute( + select(JobModel) + .where(JobModel.recruiter_id == recruiter_id) + .order_by(JobModel.last_sync_at.desc()) + ) + models = result.scalars().all() + return [self._model_to_entity(m) for m in models] + finally: + session.close() + + def find_by_source(self, source: CandidateSource) -> List[Job]: + """根据平台来源查询职位列表""" + session = self._get_session() + try: + result = session.execute( + select(JobModel) + .where(JobModel.source == source.value) + .order_by(JobModel.last_sync_at.desc()) + ) + models = result.scalars().all() + return [self._model_to_entity(m) for m in models] + finally: + session.close() + + def find_active_by_recruiter(self, recruiter_id: str) -> List[Job]: + """查询指定招聘者的活跃职位""" + session = self._get_session() + try: + result = session.execute( + select(JobModel) + .where(JobModel.recruiter_id == recruiter_id) + .where(JobModel.status == 'active') + .order_by(JobModel.last_sync_at.desc()) + ) + models = result.scalars().all() + return [self._model_to_entity(m) for m in models] + finally: + session.close() + + def find_all(self) -> List[Job]: + """查询所有职位""" + session = self._get_session() + try: + result = session.execute( + select(JobModel).order_by(JobModel.last_sync_at.desc()) + ) + models = result.scalars().all() + return [self._model_to_entity(m) for m in models] + finally: + session.close() + + def update_status(self, job_id: str, status: JobStatus) -> bool: + """更新职位状态""" + session = self._get_session() + try: + stmt = ( + update(JobModel) + .where(JobModel.id == job_id) + .values(status=status.value, updated_at=datetime.now()) + ) + result = session.execute(stmt) + session.commit() + return result.rowcount > 0 + finally: + session.close() + + def update_candidate_count(self, job_id: str, count: int, new_count: int = 0) -> bool: + """更新职位候选人数量""" + session = self._get_session() + try: + stmt = ( + update(JobModel) + .where(JobModel.id == job_id) + .values( + candidate_count=count, + new_candidate_count=new_count, + updated_at=datetime.now() + ) + ) + result = session.execute(stmt) + session.commit() + return result.rowcount > 0 + finally: + session.close() + + def delete(self, job_id: str) -> bool: + """删除职位""" + session = self._get_session() + try: + stmt = delete(JobModel).where(JobModel.id == job_id) + result = session.execute(stmt) + session.commit() + return result.rowcount > 0 + finally: + session.close() + + def delete_by_recruiter(self, recruiter_id: str) -> int: + """删除指定招聘者的所有职位""" + session = self._get_session() + try: + stmt = delete(JobModel).where(JobModel.recruiter_id == recruiter_id) + result = session.execute(stmt) + session.commit() + return result.rowcount + finally: + session.close() diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/recruiter_service.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/recruiter_service.py index 384d9d2..7a5c011 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/recruiter_service.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/recruiter_service.py @@ -138,23 +138,59 @@ class RecruiterService: """ 获取指定招聘者的职位列表 - TODO: 需要实现JobMapper - 目前返回空列表,后续需要添加Job数据访问层 + Returns: + List[Job]: 职位列表 """ - # 暂时返回空列表,后续需要实现JobMapper - # from ..mapper.job_mapper import JobMapper - # return JobMapper().find_by_recruiter(recruiter_id) - return [] + from ..mapper.job_mapper import JobMapper + return JobMapper(self.mapper.db_manager.db_url).find_by_recruiter(recruiter_id) + + def find_active_jobs_by_recruiter(self, recruiter_id: str) -> List[Job]: + """ + 获取指定招聘者的活跃职位列表 + + Returns: + List[Job]: 活跃职位列表 + """ + from ..mapper.job_mapper import JobMapper + return JobMapper(self.mapper.db_manager.db_url).find_active_by_recruiter(recruiter_id) def save_job(self, job: Job) -> Job: """ 保存职位信息 - TODO: 需要实现JobMapper - 目前仅打印日志,后续需要添加Job数据访问层 + Args: + job: 职位实体 + + Returns: + Job: 保存后的职位 """ - # 暂时仅打印日志,后续需要实现JobMapper - # from ..mapper.job_mapper import JobMapper - # return JobMapper().save(job) - print(f"[Job] 保存职位: {job.title} (ID: {job.source_id})") - return job + from ..mapper.job_mapper import JobMapper + saved = JobMapper(self.mapper.db_manager.db_url).save(job) + print(f"[Job] 职位已保存: {saved.title} (ID: {saved.id})") + return saved + + def delete_job(self, job_id: str) -> bool: + """ + 删除职位 + + Args: + job_id: 职位ID + + Returns: + bool: 是否删除成功 + """ + from ..mapper.job_mapper import JobMapper + return JobMapper(self.mapper.db_manager.db_url).delete(job_id) + + def delete_jobs_by_recruiter(self, recruiter_id: str) -> int: + """ + 删除指定招聘者的所有职位 + + Args: + recruiter_id: 招聘者ID + + Returns: + int: 删除的职位数量 + """ + from ..mapper.job_mapper import JobMapper + return JobMapper(self.mapper.db_manager.db_url).delete_by_recruiter(recruiter_id)