上传文件至「/」

求职者人物画像
This commit is contained in:
2026-04-02 11:40:54 +08:00
commit 131e6295e7
5 changed files with 526 additions and 0 deletions

305
batcg_extract.py Normal file
View File

@@ -0,0 +1,305 @@
import json
import time
import os
import logging
from pathlib import Path
from typing import Dict, Any, List
import requests
from tqdm import tqdm # 进度条库,提升批量处理体验
# ===================== 1. 全局配置(请根据实际情况修改) =====================
class Config:
# API配置替换为你的实际密钥/模型)
API_KEY = "你的通义千问API_KEY" # 若用其他模型需同步修改call_llm_api函数
API_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions"
MODEL = "qwen-turbo" # 轻量模型,性价比最高
TEMPERATURE = 0.0 # 0=确定性输出,降低幻觉
MAX_TOKENS = 2000 # 控制输出长度,降低成本
# 目录配置
INPUT_DIR = "./raw_candidate_files" # 原始评审文件目录(需提前创建)
OUTPUT_DIR = "./extracted_json_results" # 抽取结果保存目录(自动创建)
ERROR_LOG_PATH = "./batch_extract_error.log" # 错误日志文件
# 文件过滤配置
SUPPORTED_EXTENSIONS = [".txt", ".json"] # 仅处理这些后缀的文件
SKIP_EXISTED = True # 跳过已生成JSON的文件避免重复调用API
# 重试配置
MAX_RETRY = 3 # API调用最大重试次数
RETRY_INTERVAL = 1 # 重试间隔(秒)
# ===================== 2. 日志初始化(记录错误) =====================
def init_logger():
"""初始化错误日志,记录处理失败的文件及原因"""
logging.basicConfig(
level=logging.ERROR,
format="%(asctime)s - %(filename)s - %(message)s",
handlers=[
logging.FileHandler(Config.ERROR_LOG_PATH, encoding="utf-8"),
logging.StreamHandler() # 同时输出到控制台
]
)
return logging.getLogger(__name__)
logger = init_logger()
# ===================== 3. 工具函数 =====================
def check_and_create_dirs():
"""检查并创建输入/输出目录"""
Path(Config.INPUT_DIR).mkdir(exist_ok=True)
Path(Config.OUTPUT_DIR).mkdir(exist_ok=True)
logger.info(f"目录检查完成:输入目录={Config.INPUT_DIR},输出目录={Config.OUTPUT_DIR}")
def read_file_content(file_path: str) -> str:
"""读取文件内容,处理编码问题"""
try:
with open(file_path, "r", encoding="utf-8") as f:
return f.read().strip()
except UnicodeDecodeError:
# 兼容GBK编码的文件
with open(file_path, "r", encoding="gbk") as f:
return f.read().strip()
except Exception as e:
raise Exception(f"读取文件失败:{e}")
def build_prompt(raw_text: str) -> str:
"""构建强约束的抽取提示词(核心!保证格式对齐)"""
# 固定模板与你要求的JSON结构100%一致)
template = """
{
"基础信息": {
"最终结果": "Invite/Reject",
"分数": "数字字符串",
"面试岗位": "字符串",
"技术画像": "核心技术特征总结100字内",
"邀请解读": "评审结论总结100字内"
},
"8维度信息": {
"潜力洞察": {
"得分": "数字字符串",
"核心亮点": "字符串",
"短板": "字符串(无则填无)",
"核心依据": "字符串"
},
"叙事逻辑": {
"得分": "数字字符串",
"核心亮点": "字符串",
"短板": "字符串(无则填无)",
"核心依据": "字符串"
},
"软件工艺": {
"得分": "数字字符串",
"核心亮点": "字符串",
"短板": "字符串(无则填无)",
"核心依据": "字符串",
"面试题": "字符串(无则省略该字段)"
},
"编程能力": {
"得分": "数字字符串",
"核心亮点": "字符串",
"短板": "字符串(无则填无)",
"核心依据": "字符串"
},
"综合网络": {
"得分": "数字字符串",
"核心亮点": "字符串",
"短板": "字符串(无则填无)",
"核心依据": "字符串"
},
"电子电路": {
"得分": "数字字符串",
"核心亮点": "字符串",
"短板": "字符串(无则填无)",
"核心依据": "字符串"
},
"操作系统": {
"得分": "数字字符串",
"核心亮点": "字符串",
"短板": "字符串(无则填无)",
"核心依据": "字符串"
},
"算法能力": {
"得分": "数字字符串",
"核心亮点": "字符串",
"短板": "字符串(无则填无)",
"核心依据": "字符串",
"面试题": "字符串(无则省略该字段)"
}
}
}
"""
# 强制约束规则
constraints = """
【强制规则】
1. 输出必须是可直接解析的JSON格式不允许任何多余文字如“以下是抽取结果
2. 字段层级、命名必须与模板100%一致,禁止新增/删减字段、修改字段名;
3. 得分字段必须是数字字符串(如"7.8"),禁止纯数字或其他格式;
4. 核心亮点/短板/核心依据需精简提炼,禁止大段复制原文;
5. 无对应内容的字段填“无”,面试题无则省略该字段。
"""
prompt = f"""
请你严格按照以下模板和规则从下方原始文本中抽取信息并生成JSON
【模板】
{template}
【规则】
{constraints}
【原始文本】
{raw_text}
【输出要求】
仅输出JSON字符串无任何其他内容
"""
return prompt
def call_llm_api(prompt: str) -> str:
"""调用大模型API带重试机制"""
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {Config.API_KEY}"
}
data = {
"model": Config.MODEL,
"messages": [{"role": "user", "content": prompt}],
"temperature": Config.TEMPERATURE,
"max_tokens": Config.MAX_TOKENS
}
for retry in range(Config.MAX_RETRY):
try:
response = requests.post(Config.API_URL, headers=headers, json=data, timeout=30)
response.raise_for_status() # 触发HTTP错误
result = response.json()
return result["choices"][0]["message"]["content"].strip()
except Exception as e:
logger.error(f"API调用重试{retry+1}/{Config.MAX_RETRY}失败:{e}")
time.sleep(Config.RETRY_INTERVAL)
raise Exception(f"API调用{Config.MAX_RETRY}次均失败")
def validate_json(raw_json: str) -> Dict[str, Any]:
"""校验JSON结构确保核心字段不缺失"""
try:
data = json.loads(raw_json)
except json.JSONDecodeError as e:
raise ValueError(f"JSON解析失败{e}")
# 校验基础信息核心字段
base_fields = ["最终结果", "分数", "面试岗位", "技术画像", "邀请解读"]
if "基础信息" not in data:
raise ValueError("缺失顶级字段:基础信息")
for field in base_fields:
if field not in data["基础信息"]:
raise ValueError(f"基础信息缺失字段:{field}")
# 校验8维度核心字段
dimensions = ["潜力洞察", "叙事逻辑", "软件工艺", "编程能力", "综合网络", "电子电路", "操作系统", "算法能力"]
dimension_fields = ["得分", "核心亮点", "短板", "核心依据"]
if "8维度信息" not in data:
raise ValueError("缺失顶级字段8维度信息")
for dim in dimensions:
if dim not in data["8维度信息"]:
raise ValueError(f"8维度信息缺失维度{dim}")
for field in dimension_fields:
if field not in data["8维度信息"][dim]:
raise ValueError(f"{dim}缺失字段:{field}")
return data
def process_single_file(file_path: Path) -> bool:
"""处理单个文件读取→调用API→校验→保存"""
# 生成输出文件名(原文件名+json后缀
output_filename = f"{file_path.stem}.json"
output_path = Path(Config.OUTPUT_DIR) / output_filename
# 跳过已处理的文件
if Config.SKIP_EXISTED and output_path.exists():
logger.info(f"跳过已处理文件:{file_path.name}")
return True
try:
# 1. 读取文件内容
raw_text = read_file_content(str(file_path))
if not raw_text:
raise ValueError("文件内容为空")
# 2. 构建提示词
prompt = build_prompt(raw_text)
# 3. 调用API抽取
json_str = call_llm_api(prompt)
# 4. 校验JSON结构
valid_data = validate_json(json_str)
# 5. 保存结果
with open(output_path, "w", encoding="utf-8") as f:
json.dump(valid_data, f, ensure_ascii=False, indent=4)
return True
except Exception as e:
# 记录错误日志
logger.error(f"处理文件{file_path.name}失败:{str(e)}")
return False
# ===================== 4. 批量处理主函数 =====================
def batch_extract():
"""批量处理目录下的所有文件"""
# 1. 检查目录
check_and_create_dirs()
# 2. 获取所有待处理文件
input_files = []
for file in Path(Config.INPUT_DIR).glob("*"):
if file.is_file() and file.suffix in Config.SUPPORTED_EXTENSIONS:
input_files.append(file)
if not input_files:
logger.error(f"输入目录{Config.INPUT_DIR}下无支持的文件(支持后缀:{Config.SUPPORTED_EXTENSIONS}")
return
# 3. 批量处理(带进度条)
success_count = 0
fail_count = 0
with tqdm(total=len(input_files), desc="批量抽取进度") as pbar:
for file in input_files:
if process_single_file(file):
success_count += 1
else:
fail_count += 1
pbar.update(1)
# 4. 输出汇总结果
print("\n" + "="*50)
print(f"批量处理完成!")
print(f"总文件数:{len(input_files)}")
print(f"成功数:{success_count}")
print(f"失败数:{fail_count}")
print(f"失败日志:{Config.ERROR_LOG_PATH}")
print(f"结果目录:{Config.OUTPUT_DIR}")
print("="*50)
# ===================== 5. 适配其他大模型的扩展(可选) =====================
# 若使用OpenAI/GPT
# def call_llm_api(prompt: str) -> str:
# headers = {"Content-Type": "application/json", "Authorization": f"Bearer {Config.API_KEY}"}
# data = {
# "model": "gpt-3.5-turbo",
# "messages": [{"role": "user", "content": prompt}],
# "temperature": 0.0
# }
# response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=data)
# return response.json()["choices"][0]["message"]["content"]
# ===================== 6. 主入口 =====================
if __name__ == "__main__":
# 安装依赖(首次运行需执行)
# pip install requests tqdm
# 执行批量抽取
batch_extract()

View File

@@ -0,0 +1,57 @@
import PyPDF2
import os
from pathlib import Path
def specified_pdf_to_txt(pdf_file_path: str, save_dir: str):
"""
规则文件名与原PDF完全一致 | 已存在则跳过不重复转换
:param pdf_file_path: 待转换的PDF完整路径
:param save_dir: TXT文件保存目录
"""
# 1. 校验PDF文件是否存在
pdf_path = Path(pdf_file_path)
if not pdf_path.exists() or pdf_path.suffix.lower() != ".pdf":
print(f"错误文件不存在或不是PDF格式 → {pdf_file_path}")
return False
# 2. 创建保存目录(不存在则自动创建)
os.makedirs(save_dir, exist_ok=True)
# 3. 生成目标TXT路径**保留原PDF文件名仅改后缀**
txt_filename = pdf_path.stem + ".txt"
txt_save_path = Path(save_dir) / txt_filename
# 4. 关键已存在同名TXT → 跳过,不重复转换
if txt_save_path.exists():
print(f"跳过:{txt_filename} 已存在,无需重复转换")
return True
# 5. 执行PDF转TXT
try:
with open(pdf_path, "rb") as f:
reader = PyPDF2.PdfReader(f)
full_text = ""
for page in reader.pages:
page_text = page.extract_text()
if page_text:
full_text += page_text + "\n\n"
# 写入TXT到指定目录
with open(txt_save_path, "w", encoding="utf-8") as f:
f.write(full_text)
print(f"✅ 转换成功:{pdf_path.name}{txt_save_path}")
return True
except Exception as e:
print(f"❌ 转换失败:{str(e)}")
return False
if __name__ == "__main__":
# 待转换的【指定PDF文件完整路径】
TARGET_PDF = "./前后端/陈盼良简历(2).pdf"
# TXT文件【指定保存目录】
SAVE_DIRECTORY = "./output/前后端"
# 执行转换
specified_pdf_to_txt(TARGET_PDF, SAVE_DIRECTORY)

View File

@@ -0,0 +1,121 @@
import pdfplumber
import os
import re
from pathlib import Path
# ===================== 终极清洗配置(全平台防伪码通杀) =====================
# 终极正则:匹配 任意长度 字母/数字/下划线 组合 + 结尾~~ 的所有无意义字符
FAKE_CODE_PATTERN = re.compile(r'[A-Za-z0-9_]{30,}~~')
# ==========================================================================
def clean_resume_text(raw_text: str) -> str:
"""
终极文本清洗:彻底清除所有平台防伪码、隐形水印、重复乱码
1. 通杀所有 长随机字符+~~ 的防伪码
2. 清理多余空行、空格,保留简历正常格式
"""
# 第一步:全局清除所有匹配的防伪乱码(核心根治)
cleaned_text = FAKE_CODE_PATTERN.sub('', raw_text)
# 第二步清理多余空行≥3个换行→2个换行保留段落
cleaned_text = re.sub(r'\n{3,}', '\n\n', cleaned_text)
# 第三步:清理行首行尾空格、多余空格
cleaned_text = re.sub(r'^[ ]+|[ ]+$', '', cleaned_text, flags=re.MULTILINE)
cleaned_text = re.sub(r' {2,}', ' ', cleaned_text)
return cleaned_text.strip()
def single_pdf_to_txt(pdf_file_path: Path, save_dir: Path) -> int:
"""单文件PDF转TXT + 清洗乱码"""
if not pdf_file_path.exists() or pdf_file_path.suffix.lower() != ".pdf":
print(f"❌ 无效文件:{pdf_file_path.name}")
return -1
# 同名保存,不修改文件名
txt_filename = pdf_file_path.stem + ".txt"
txt_save_path = save_dir / txt_filename
# 已存在则跳过,不重复转换
if txt_save_path.exists():
print(f"⏭️ 已存在,跳过:{txt_filename}")
return 0
try:
full_text = ""
with pdfplumber.open(pdf_file_path) as pdf:
for page in pdf.pages:
# 提取正文,过滤页边隐藏内容
page_text = page.extract_text()
if page_text:
full_text += page_text + "\n\n"
# 终极清洗
cleaned_text = clean_resume_text(full_text)
# 写入干净TXT
with open(txt_save_path, "w", encoding="utf-8") as f:
f.write(cleaned_text)
print(f"✅ 转换+清洗完成:{pdf_file_path.name}")
return 1
except Exception as e:
print(f"❌ 失败:{pdf_file_path.name} | {str(e)}")
return -1
def batch_pdf_to_txt(pdf_input_dir: str, txt_save_dir: str, recursive: bool = True):
"""批量PDF转TXT主函数"""
input_path = Path(pdf_input_dir)
save_path = Path(txt_save_dir)
if not input_path.exists():
print(f"❌ 输入目录不存在:{input_path}")
return
os.makedirs(save_path, exist_ok=True)
# 扫描所有PDF
if recursive:
pdf_files = list(input_path.rglob("*.[pP][dD][fF]"))
else:
pdf_files = list(input_path.glob("*.[pP][dD][fF]"))
if not pdf_files:
print("⚠️ 未找到任何PDF文件")
return
print(f"📄 共找到 {len(pdf_files)} 个PDF开始处理...\n")
# 统计结果
success = 0
skip = 0
fail = 0
for pdf in pdf_files:
res = single_pdf_to_txt(pdf, save_path)
if res == 1:
success +=1
elif res ==0:
skip +=1
else:
fail +=1
# 输出结果
print("\n" + "="*50)
print("🎉 批量处理完成")
print(f"总文件:{len(pdf_files)}")
print(f"✅ 成功:{success}")
print(f"⏭️ 跳过:{skip}")
print(f"❌ 失败:{fail}")
print(f"📂 输出目录:{save_path.absolute()}")
print("="*50)
if __name__ == "__main__":
# ========== 只需改这3个配置 ==========
PDF_INPUT_DIR = "./简历" # 你的PDF文件夹
TXT_SAVE_DIR = "./清晰后的简历" # 清洗后的TXT保存目录
IS_RECURSIVE = True # 是否递归子文件夹
# ====================================
batch_pdf_to_txt(PDF_INPUT_DIR, TXT_SAVE_DIR, IS_RECURSIVE)

16
cluster_wordcloud_plot.py Normal file
View File

@@ -0,0 +1,16 @@
from wordcloud import WordCloud
import pandas as pd
df = pd.read_csv("clustering_result.csv")
# 合并所有技术画像文本
text = " ".join(df["技术画像"].dropna().tolist())
# 生成词云
wc = WordCloud(
font_path="simhei.ttf", # 中文字体
background_color="white",
width=1200, height=600,
max_words=100
).generate(text)
wc.to_file("赛道关键词词云图.png")

View File

@@ -0,0 +1,27 @@
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
# 读取你的聚类结果CSV
df = pd.read_csv("./clustering_result.csv")
# 统计各赛道人数
cluster_count = df["技术赛道标签"].value_counts()
# 绘图设置
plt.rcParams["font.sans-serif"] = ["SimHei"]
plt.figure(figsize=(10,7))
colors = ["#FF6B6B","#4ECDC4","#45B7D1","#96CEB4","#FECA57","#DDA0DD"]
# 绘制饼图
wedges, texts, autotexts = plt.pie(
cluster_count.values,
labels=cluster_count.index,
colors=colors,
autopct="%1.1f%%",
startangle=90,
textprops={"fontsize":10}
)
plt.title("简历聚类-技术赛道人数分布", fontsize=16, pad=20)
plt.savefig("聚类赛道分布饼图.png", dpi=300, bbox_inches="tight")
plt.show()