feat(repository): 添加候选人和简历数据访问层并集成入库服务

- 新增 CandidateMapper 实现候选人数据的增删改查功能,基于 SQLAlchemy
- 新增 ResumeMapper 实现简历数据的增删改查功能,基于 SQLAlchemy
- 在 UnifiedIngestionService 中集成了 candidate_repo 和 resume_repo
- 初始化 DeduplicationService 时注入候选人仓库作为依赖
- 统一入库服务保存流程中为简历生成唯一ID
- 优化数据库会话管理,确保资源正确释放
This commit is contained in:
2026-03-24 16:06:06 +08:00
parent c411b4848f
commit 9be9d338ae
4 changed files with 302 additions and 1 deletions

View File

@@ -122,10 +122,22 @@ class HRAgentApplication:
def _init_ingestion_service(self):
"""初始化入库服务"""
from cn.yinlihupo.ylhp_hr_2_0.mapper.candidate_mapper import CandidateMapper
from cn.yinlihupo.ylhp_hr_2_0.mapper.resume_mapper import ResumeMapper
# 初始化数据访问层
candidate_repo = CandidateMapper(db_url=self.settings.db_url)
resume_repo = ResumeMapper(db_url=self.settings.db_url)
# 去重服务使用数据库 repository
deduplicator = DeduplicationService(candidate_repository=candidate_repo)
self.ingestion_service = UnifiedIngestionService(
candidate_repo=candidate_repo,
resume_repo=resume_repo,
normalizer=DataNormalizer(),
validator=DataValidator(),
deduplicator=DeduplicationService(),
deduplicator=deduplicator,
on_analysis_triggered=self._on_analysis_triggered
)

View File

@@ -0,0 +1,165 @@
"""Candidate data mapper using SQLAlchemy"""
from typing import Optional
from datetime import datetime
from sqlalchemy import select
from sqlalchemy.orm import Session
from ..domain.candidate import Candidate, CandidateSource, CandidateStatus
from ..config.database import get_db_manager, CandidateModel
class CandidateMapper:
"""候选人数据访问 - SQLAlchemy实现"""
def __init__(self, db_url: Optional[str] = None):
self.db_manager = get_db_manager(db_url)
def _get_session(self) -> Session:
"""获取数据库会话"""
return self.db_manager.get_session()
def _model_to_entity(self, model: CandidateModel) -> Candidate:
"""将模型转换为实体"""
from decimal import Decimal
from ..domain.candidate import SalaryRange
salary_expectation = None
if model.salary_min is not None or model.salary_max is not None:
salary_expectation = SalaryRange(
min_salary=model.salary_min,
max_salary=model.salary_max
)
return Candidate(
id=model.id,
source=CandidateSource(model.source.lower()),
source_id=model.source_id,
name=model.name,
phone=model.phone,
email=model.email,
wechat=model.wechat,
gender=model.gender,
age=model.age,
location=model.location,
current_company=model.current_company,
current_position=model.current_position,
work_years=Decimal(str(model.work_years)) if model.work_years else None,
education=model.education,
school=model.school,
salary_expectation=salary_expectation,
status=CandidateStatus(model.status.lower()) if model.status else CandidateStatus.NEW,
created_at=model.created_at,
updated_at=model.updated_at
)
def _entity_to_model(self, entity: Candidate) -> CandidateModel:
"""将实体转换为模型"""
return CandidateModel(
id=entity.id,
source=entity.source.value if entity.source else 'boss',
source_id=entity.source_id,
name=entity.name,
phone=entity.phone,
email=entity.email,
wechat=entity.wechat,
gender=entity.gender.value if entity.gender else 0,
age=entity.age,
location=entity.location,
current_company=entity.current_company,
current_position=entity.current_position,
work_years=entity.work_years,
education=entity.education,
school=entity.school,
salary_min=entity.salary_expectation.min_salary if entity.salary_expectation else None,
salary_max=entity.salary_expectation.max_salary if entity.salary_expectation else None,
status=entity.status.value if entity.status else 'new'
)
def save(self, candidate: Candidate) -> Candidate:
"""保存候选人"""
session = self._get_session()
try:
# 检查是否已存在
existing = session.execute(
select(CandidateModel).where(CandidateModel.id == candidate.id)
).scalar_one_or_none()
if existing:
# 更新现有记录
existing.name = candidate.name
existing.phone = candidate.phone
existing.email = candidate.email
existing.wechat = candidate.wechat
existing.gender = candidate.gender.value if candidate.gender else 0
existing.age = candidate.age
existing.location = candidate.location
existing.current_company = candidate.current_company
existing.current_position = candidate.current_position
existing.work_years = candidate.work_years
existing.education = candidate.education
existing.school = candidate.school
existing.salary_min = candidate.salary_expectation.min_salary if candidate.salary_expectation else None
existing.salary_max = candidate.salary_expectation.max_salary if candidate.salary_expectation else None
existing.status = candidate.status.value if candidate.status else 'new'
existing.updated_at = datetime.now()
else:
# 插入新记录
model = self._entity_to_model(candidate)
session.add(model)
session.commit()
return candidate
finally:
session.close()
def find_by_source_and_source_id(self, source: CandidateSource, source_id: str) -> Optional[Candidate]:
"""根据来源和来源ID查找"""
session = self._get_session()
try:
result = session.execute(
select(CandidateModel).where(
CandidateModel.source == source.value.upper(),
CandidateModel.source_id == source_id
)
).scalar_one_or_none()
return self._model_to_entity(result) if result else None
finally:
session.close()
def find_by_phone(self, phone: str) -> Optional[Candidate]:
"""根据手机号查找"""
session = self._get_session()
try:
result = session.execute(
select(CandidateModel).where(CandidateModel.phone == phone)
).scalar_one_or_none()
return self._model_to_entity(result) if result else None
finally:
session.close()
def find_by_email(self, email: str) -> Optional[Candidate]:
"""根据邮箱查找"""
session = self._get_session()
try:
result = session.execute(
select(CandidateModel).where(CandidateModel.email == email)
).scalar_one_or_none()
return self._model_to_entity(result) if result else None
finally:
session.close()
def find_by_name(self, name: str) -> list:
"""根据姓名查找"""
session = self._get_session()
try:
results = session.execute(
select(CandidateModel).where(CandidateModel.name == name)
).scalars().all()
return [self._model_to_entity(r) for r in results]
finally:
session.close()

View File

@@ -0,0 +1,123 @@
"""Resume data mapper using SQLAlchemy"""
from typing import Optional
from datetime import datetime
from sqlalchemy import select
from sqlalchemy.orm import Session
from ..domain.resume import Resume, ResumeParsed
from ..config.database import get_db_manager, ResumeModel
class ResumeMapper:
"""简历数据访问 - SQLAlchemy实现"""
def __init__(self, db_url: Optional[str] = None):
self.db_manager = get_db_manager(db_url)
def _get_session(self) -> Session:
"""获取数据库会话"""
return self.db_manager.get_session()
def _model_to_entity(self, model: ResumeModel) -> Resume:
"""将模型转换为实体"""
parsed_content = None
if model.parsed_content:
parsed_content = ResumeParsed(**model.parsed_content)
return Resume(
id=model.id,
candidate_id=model.candidate_id,
raw_content=model.raw_content,
parsed_content=parsed_content,
attachment_url=model.attachment_url,
attachment_type=model.attachment_type,
version=model.version,
created_at=model.created_at,
updated_at=model.updated_at
)
def _entity_to_model(self, entity: Resume) -> ResumeModel:
"""将实体转换为模型"""
parsed_dict = None
if entity.parsed_content:
parsed_dict = {
'name': entity.parsed_content.name,
'phone': entity.parsed_content.phone,
'email': entity.parsed_content.email,
'gender': entity.parsed_content.gender,
'age': entity.parsed_content.age,
'location': entity.parsed_content.location,
'current_company': entity.parsed_content.current_company,
'current_position': entity.parsed_content.current_position,
'work_years': entity.parsed_content.work_years,
'education': entity.parsed_content.education,
'school': entity.parsed_content.school,
'skills': entity.parsed_content.skills,
'self_evaluation': entity.parsed_content.self_evaluation,
'work_experiences': entity.parsed_content.work_experiences,
'project_experiences': entity.parsed_content.project_experiences,
'education_experiences': entity.parsed_content.education_experiences,
'raw_data': entity.parsed_content.raw_data
}
return ResumeModel(
id=entity.id,
candidate_id=entity.candidate_id,
raw_content=entity.raw_content,
parsed_content=parsed_dict,
attachment_url=entity.attachment_url,
attachment_type=entity.attachment_type,
version=entity.version or 1
)
def save(self, resume: Resume) -> Resume:
"""保存简历"""
session = self._get_session()
try:
# 检查是否已存在
existing = session.execute(
select(ResumeModel).where(ResumeModel.id == resume.id)
).scalar_one_or_none()
if existing:
# 更新现有记录
existing.raw_content = resume.raw_content
existing.parsed_content = self._entity_to_model(resume).parsed_content
existing.attachment_url = resume.attachment_url
existing.attachment_type = resume.attachment_type
existing.version = resume.version or 1
existing.updated_at = datetime.now()
else:
# 插入新记录
model = self._entity_to_model(resume)
session.add(model)
session.commit()
return resume
finally:
session.close()
def find_by_candidate_id(self, candidate_id: str) -> Optional[Resume]:
"""根据候选人ID查找简历"""
session = self._get_session()
try:
result = session.execute(
select(ResumeModel).where(ResumeModel.candidate_id == candidate_id)
).scalar_one_or_none()
return self._model_to_entity(result) if result else None
finally:
session.close()
def find_by_id(self, resume_id: str) -> Optional[Resume]:
"""根据ID查找简历"""
session = self._get_session()
try:
result = session.execute(
select(ResumeModel).where(ResumeModel.id == resume_id)
).scalar_one_or_none()
return self._model_to_entity(result) if result else None
finally:
session.close()

View File

@@ -151,6 +151,7 @@ class UnifiedIngestionService:
# 4. 生成ID
candidate_id = self._generate_id()
normalized.candidate.id = candidate_id
normalized.resume.id = self._generate_id()
normalized.resume.candidate_id = candidate_id
# 5. 保存候选人