diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/main.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/main.py index 4fceb50..fba6969 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/main.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/main.py @@ -122,10 +122,22 @@ class HRAgentApplication: def _init_ingestion_service(self): """初始化入库服务""" + from cn.yinlihupo.ylhp_hr_2_0.mapper.candidate_mapper import CandidateMapper + from cn.yinlihupo.ylhp_hr_2_0.mapper.resume_mapper import ResumeMapper + + # 初始化数据访问层 + candidate_repo = CandidateMapper(db_url=self.settings.db_url) + resume_repo = ResumeMapper(db_url=self.settings.db_url) + + # 去重服务使用数据库 repository + deduplicator = DeduplicationService(candidate_repository=candidate_repo) + self.ingestion_service = UnifiedIngestionService( + candidate_repo=candidate_repo, + resume_repo=resume_repo, normalizer=DataNormalizer(), validator=DataValidator(), - deduplicator=DeduplicationService(), + deduplicator=deduplicator, on_analysis_triggered=self._on_analysis_triggered ) diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/mapper/candidate_mapper.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/mapper/candidate_mapper.py new file mode 100644 index 0000000..8fee880 --- /dev/null +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/mapper/candidate_mapper.py @@ -0,0 +1,165 @@ +"""Candidate data mapper using SQLAlchemy""" +from typing import Optional +from datetime import datetime + +from sqlalchemy import select +from sqlalchemy.orm import Session + +from ..domain.candidate import Candidate, CandidateSource, CandidateStatus +from ..config.database import get_db_manager, CandidateModel + + +class CandidateMapper: + """候选人数据访问 - SQLAlchemy实现""" + + def __init__(self, db_url: Optional[str] = None): + self.db_manager = get_db_manager(db_url) + + def _get_session(self) -> Session: + """获取数据库会话""" + return self.db_manager.get_session() + + def _model_to_entity(self, model: CandidateModel) -> Candidate: + """将模型转换为实体""" + from decimal import Decimal + from ..domain.candidate import SalaryRange + + salary_expectation = None + if model.salary_min is not None or model.salary_max is not None: + salary_expectation = SalaryRange( + min_salary=model.salary_min, + max_salary=model.salary_max + ) + + return Candidate( + id=model.id, + source=CandidateSource(model.source.lower()), + source_id=model.source_id, + name=model.name, + phone=model.phone, + email=model.email, + wechat=model.wechat, + gender=model.gender, + age=model.age, + location=model.location, + current_company=model.current_company, + current_position=model.current_position, + work_years=Decimal(str(model.work_years)) if model.work_years else None, + education=model.education, + school=model.school, + salary_expectation=salary_expectation, + status=CandidateStatus(model.status.lower()) if model.status else CandidateStatus.NEW, + created_at=model.created_at, + updated_at=model.updated_at + ) + + def _entity_to_model(self, entity: Candidate) -> CandidateModel: + """将实体转换为模型""" + return CandidateModel( + id=entity.id, + source=entity.source.value if entity.source else 'boss', + source_id=entity.source_id, + name=entity.name, + phone=entity.phone, + email=entity.email, + wechat=entity.wechat, + gender=entity.gender.value if entity.gender else 0, + age=entity.age, + location=entity.location, + current_company=entity.current_company, + current_position=entity.current_position, + work_years=entity.work_years, + education=entity.education, + school=entity.school, + salary_min=entity.salary_expectation.min_salary if entity.salary_expectation else None, + salary_max=entity.salary_expectation.max_salary if entity.salary_expectation else None, + status=entity.status.value if entity.status else 'new' + ) + + def save(self, candidate: Candidate) -> Candidate: + """保存候选人""" + session = self._get_session() + try: + # 检查是否已存在 + existing = session.execute( + select(CandidateModel).where(CandidateModel.id == candidate.id) + ).scalar_one_or_none() + + if existing: + # 更新现有记录 + existing.name = candidate.name + existing.phone = candidate.phone + existing.email = candidate.email + existing.wechat = candidate.wechat + existing.gender = candidate.gender.value if candidate.gender else 0 + existing.age = candidate.age + existing.location = candidate.location + existing.current_company = candidate.current_company + existing.current_position = candidate.current_position + existing.work_years = candidate.work_years + existing.education = candidate.education + existing.school = candidate.school + existing.salary_min = candidate.salary_expectation.min_salary if candidate.salary_expectation else None + existing.salary_max = candidate.salary_expectation.max_salary if candidate.salary_expectation else None + existing.status = candidate.status.value if candidate.status else 'new' + existing.updated_at = datetime.now() + else: + # 插入新记录 + model = self._entity_to_model(candidate) + session.add(model) + + session.commit() + return candidate + finally: + session.close() + + def find_by_source_and_source_id(self, source: CandidateSource, source_id: str) -> Optional[Candidate]: + """根据来源和来源ID查找""" + session = self._get_session() + try: + result = session.execute( + select(CandidateModel).where( + CandidateModel.source == source.value.upper(), + CandidateModel.source_id == source_id + ) + ).scalar_one_or_none() + + return self._model_to_entity(result) if result else None + finally: + session.close() + + def find_by_phone(self, phone: str) -> Optional[Candidate]: + """根据手机号查找""" + session = self._get_session() + try: + result = session.execute( + select(CandidateModel).where(CandidateModel.phone == phone) + ).scalar_one_or_none() + + return self._model_to_entity(result) if result else None + finally: + session.close() + + def find_by_email(self, email: str) -> Optional[Candidate]: + """根据邮箱查找""" + session = self._get_session() + try: + result = session.execute( + select(CandidateModel).where(CandidateModel.email == email) + ).scalar_one_or_none() + + return self._model_to_entity(result) if result else None + finally: + session.close() + + def find_by_name(self, name: str) -> list: + """根据姓名查找""" + session = self._get_session() + try: + results = session.execute( + select(CandidateModel).where(CandidateModel.name == name) + ).scalars().all() + + return [self._model_to_entity(r) for r in results] + finally: + session.close() diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/mapper/resume_mapper.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/mapper/resume_mapper.py new file mode 100644 index 0000000..e0adc11 --- /dev/null +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/mapper/resume_mapper.py @@ -0,0 +1,123 @@ +"""Resume data mapper using SQLAlchemy""" +from typing import Optional +from datetime import datetime + +from sqlalchemy import select +from sqlalchemy.orm import Session + +from ..domain.resume import Resume, ResumeParsed +from ..config.database import get_db_manager, ResumeModel + + +class ResumeMapper: + """简历数据访问 - SQLAlchemy实现""" + + def __init__(self, db_url: Optional[str] = None): + self.db_manager = get_db_manager(db_url) + + def _get_session(self) -> Session: + """获取数据库会话""" + return self.db_manager.get_session() + + def _model_to_entity(self, model: ResumeModel) -> Resume: + """将模型转换为实体""" + parsed_content = None + if model.parsed_content: + parsed_content = ResumeParsed(**model.parsed_content) + + return Resume( + id=model.id, + candidate_id=model.candidate_id, + raw_content=model.raw_content, + parsed_content=parsed_content, + attachment_url=model.attachment_url, + attachment_type=model.attachment_type, + version=model.version, + created_at=model.created_at, + updated_at=model.updated_at + ) + + def _entity_to_model(self, entity: Resume) -> ResumeModel: + """将实体转换为模型""" + parsed_dict = None + if entity.parsed_content: + parsed_dict = { + 'name': entity.parsed_content.name, + 'phone': entity.parsed_content.phone, + 'email': entity.parsed_content.email, + 'gender': entity.parsed_content.gender, + 'age': entity.parsed_content.age, + 'location': entity.parsed_content.location, + 'current_company': entity.parsed_content.current_company, + 'current_position': entity.parsed_content.current_position, + 'work_years': entity.parsed_content.work_years, + 'education': entity.parsed_content.education, + 'school': entity.parsed_content.school, + 'skills': entity.parsed_content.skills, + 'self_evaluation': entity.parsed_content.self_evaluation, + 'work_experiences': entity.parsed_content.work_experiences, + 'project_experiences': entity.parsed_content.project_experiences, + 'education_experiences': entity.parsed_content.education_experiences, + 'raw_data': entity.parsed_content.raw_data + } + + return ResumeModel( + id=entity.id, + candidate_id=entity.candidate_id, + raw_content=entity.raw_content, + parsed_content=parsed_dict, + attachment_url=entity.attachment_url, + attachment_type=entity.attachment_type, + version=entity.version or 1 + ) + + def save(self, resume: Resume) -> Resume: + """保存简历""" + session = self._get_session() + try: + # 检查是否已存在 + existing = session.execute( + select(ResumeModel).where(ResumeModel.id == resume.id) + ).scalar_one_or_none() + + if existing: + # 更新现有记录 + existing.raw_content = resume.raw_content + existing.parsed_content = self._entity_to_model(resume).parsed_content + existing.attachment_url = resume.attachment_url + existing.attachment_type = resume.attachment_type + existing.version = resume.version or 1 + existing.updated_at = datetime.now() + else: + # 插入新记录 + model = self._entity_to_model(resume) + session.add(model) + + session.commit() + return resume + finally: + session.close() + + def find_by_candidate_id(self, candidate_id: str) -> Optional[Resume]: + """根据候选人ID查找简历""" + session = self._get_session() + try: + result = session.execute( + select(ResumeModel).where(ResumeModel.candidate_id == candidate_id) + ).scalar_one_or_none() + + return self._model_to_entity(result) if result else None + finally: + session.close() + + def find_by_id(self, resume_id: str) -> Optional[Resume]: + """根据ID查找简历""" + session = self._get_session() + try: + result = session.execute( + select(ResumeModel).where(ResumeModel.id == resume_id) + ).scalar_one_or_none() + + return self._model_to_entity(result) if result else None + finally: + session.close() diff --git a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/ingestion/unified_ingestion_service.py b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/ingestion/unified_ingestion_service.py index 6462e54..cd0e18f 100644 --- a/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/ingestion/unified_ingestion_service.py +++ b/src/main/python/cn/yinlihupo/ylhp_hr_2_0/service/ingestion/unified_ingestion_service.py @@ -151,6 +151,7 @@ class UnifiedIngestionService: # 4. 生成ID candidate_id = self._generate_id() normalized.candidate.id = candidate_id + normalized.resume.id = self._generate_id() normalized.resume.candidate_id = candidate_id # 5. 保存候选人