commit 131e6295e7195b6fb95d1c5f704f7cb37fed4f10 Author: YangGuo <3420309028@qq.com> Date: Thu Apr 2 11:40:54 2026 +0800 上传文件至「/」 求职者人物画像 diff --git a/batcg_extract.py b/batcg_extract.py new file mode 100644 index 0000000..eac93e9 --- /dev/null +++ b/batcg_extract.py @@ -0,0 +1,305 @@ +import json +import time +import os +import logging +from pathlib import Path +from typing import Dict, Any, List +import requests +from tqdm import tqdm # 进度条库,提升批量处理体验 + +# ===================== 1. 全局配置(请根据实际情况修改) ===================== +class Config: + # API配置(替换为你的实际密钥/模型) + API_KEY = "你的通义千问API_KEY" # 若用其他模型,需同步修改call_llm_api函数 + API_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions" + MODEL = "qwen-turbo" # 轻量模型,性价比最高 + TEMPERATURE = 0.0 # 0=确定性输出,降低幻觉 + MAX_TOKENS = 2000 # 控制输出长度,降低成本 + + # 目录配置 + INPUT_DIR = "./raw_candidate_files" # 原始评审文件目录(需提前创建) + OUTPUT_DIR = "./extracted_json_results" # 抽取结果保存目录(自动创建) + ERROR_LOG_PATH = "./batch_extract_error.log" # 错误日志文件 + + # 文件过滤配置 + SUPPORTED_EXTENSIONS = [".txt", ".json"] # 仅处理这些后缀的文件 + SKIP_EXISTED = True # 跳过已生成JSON的文件(避免重复调用API) + + # 重试配置 + MAX_RETRY = 3 # API调用最大重试次数 + RETRY_INTERVAL = 1 # 重试间隔(秒) + +# ===================== 2. 日志初始化(记录错误) ===================== +def init_logger(): + """初始化错误日志,记录处理失败的文件及原因""" + logging.basicConfig( + level=logging.ERROR, + format="%(asctime)s - %(filename)s - %(message)s", + handlers=[ + logging.FileHandler(Config.ERROR_LOG_PATH, encoding="utf-8"), + logging.StreamHandler() # 同时输出到控制台 + ] + ) + return logging.getLogger(__name__) + +logger = init_logger() + +# ===================== 3. 工具函数 ===================== +def check_and_create_dirs(): + """检查并创建输入/输出目录""" + Path(Config.INPUT_DIR).mkdir(exist_ok=True) + Path(Config.OUTPUT_DIR).mkdir(exist_ok=True) + logger.info(f"目录检查完成:输入目录={Config.INPUT_DIR},输出目录={Config.OUTPUT_DIR}") + +def read_file_content(file_path: str) -> str: + """读取文件内容,处理编码问题""" + try: + with open(file_path, "r", encoding="utf-8") as f: + return f.read().strip() + except UnicodeDecodeError: + # 兼容GBK编码的文件 + with open(file_path, "r", encoding="gbk") as f: + return f.read().strip() + except Exception as e: + raise Exception(f"读取文件失败:{e}") + +def build_prompt(raw_text: str) -> str: + """构建强约束的抽取提示词(核心!保证格式对齐)""" + # 固定模板(与你要求的JSON结构100%一致) + template = """ +{ + "基础信息": { + "最终结果": "Invite/Reject", + "分数": "数字字符串", + "面试岗位": "字符串", + "技术画像": "核心技术特征总结(100字内)", + "邀请解读": "评审结论总结(100字内)" + }, + "8维度信息": { + "潜力洞察": { + "得分": "数字字符串", + "核心亮点": "字符串", + "短板": "字符串(无则填无)", + "核心依据": "字符串" + }, + "叙事逻辑": { + "得分": "数字字符串", + "核心亮点": "字符串", + "短板": "字符串(无则填无)", + "核心依据": "字符串" + }, + "软件工艺": { + "得分": "数字字符串", + "核心亮点": "字符串", + "短板": "字符串(无则填无)", + "核心依据": "字符串", + "面试题": "字符串(无则省略该字段)" + }, + "编程能力": { + "得分": "数字字符串", + "核心亮点": "字符串", + "短板": "字符串(无则填无)", + "核心依据": "字符串" + }, + "综合网络": { + "得分": "数字字符串", + "核心亮点": "字符串", + "短板": "字符串(无则填无)", + "核心依据": "字符串" + }, + "电子电路": { + "得分": "数字字符串", + "核心亮点": "字符串", + "短板": "字符串(无则填无)", + "核心依据": "字符串" + }, + "操作系统": { + "得分": "数字字符串", + "核心亮点": "字符串", + "短板": "字符串(无则填无)", + "核心依据": "字符串" + }, + "算法能力": { + "得分": "数字字符串", + "核心亮点": "字符串", + "短板": "字符串(无则填无)", + "核心依据": "字符串", + "面试题": "字符串(无则省略该字段)" + } + } +} + """ + + # 强制约束规则 + constraints = """ +【强制规则】 +1. 输出必须是可直接解析的JSON格式,不允许任何多余文字(如“以下是抽取结果:”); +2. 字段层级、命名必须与模板100%一致,禁止新增/删减字段、修改字段名; +3. 得分字段必须是数字字符串(如"7.8"),禁止纯数字或其他格式; +4. 核心亮点/短板/核心依据需精简提炼,禁止大段复制原文; +5. 无对应内容的字段填“无”,面试题无则省略该字段。 + """ + + prompt = f""" +请你严格按照以下模板和规则,从下方原始文本中抽取信息并生成JSON: + +【模板】 +{template} + +【规则】 +{constraints} + +【原始文本】 +{raw_text} + +【输出要求】 +仅输出JSON字符串,无任何其他内容! + """ + return prompt + +def call_llm_api(prompt: str) -> str: + """调用大模型API,带重试机制""" + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {Config.API_KEY}" + } + data = { + "model": Config.MODEL, + "messages": [{"role": "user", "content": prompt}], + "temperature": Config.TEMPERATURE, + "max_tokens": Config.MAX_TOKENS + } + + for retry in range(Config.MAX_RETRY): + try: + response = requests.post(Config.API_URL, headers=headers, json=data, timeout=30) + response.raise_for_status() # 触发HTTP错误 + result = response.json() + return result["choices"][0]["message"]["content"].strip() + except Exception as e: + logger.error(f"API调用重试{retry+1}/{Config.MAX_RETRY}失败:{e}") + time.sleep(Config.RETRY_INTERVAL) + raise Exception(f"API调用{Config.MAX_RETRY}次均失败") + +def validate_json(raw_json: str) -> Dict[str, Any]: + """校验JSON结构,确保核心字段不缺失""" + try: + data = json.loads(raw_json) + except json.JSONDecodeError as e: + raise ValueError(f"JSON解析失败:{e}") + + # 校验基础信息核心字段 + base_fields = ["最终结果", "分数", "面试岗位", "技术画像", "邀请解读"] + if "基础信息" not in data: + raise ValueError("缺失顶级字段:基础信息") + for field in base_fields: + if field not in data["基础信息"]: + raise ValueError(f"基础信息缺失字段:{field}") + + # 校验8维度核心字段 + dimensions = ["潜力洞察", "叙事逻辑", "软件工艺", "编程能力", "综合网络", "电子电路", "操作系统", "算法能力"] + dimension_fields = ["得分", "核心亮点", "短板", "核心依据"] + if "8维度信息" not in data: + raise ValueError("缺失顶级字段:8维度信息") + for dim in dimensions: + if dim not in data["8维度信息"]: + raise ValueError(f"8维度信息缺失维度:{dim}") + for field in dimension_fields: + if field not in data["8维度信息"][dim]: + raise ValueError(f"{dim}缺失字段:{field}") + + return data + +def process_single_file(file_path: Path) -> bool: + """处理单个文件:读取→调用API→校验→保存""" + # 生成输出文件名(原文件名+json后缀) + output_filename = f"{file_path.stem}.json" + output_path = Path(Config.OUTPUT_DIR) / output_filename + + # 跳过已处理的文件 + if Config.SKIP_EXISTED and output_path.exists(): + logger.info(f"跳过已处理文件:{file_path.name}") + return True + + try: + # 1. 读取文件内容 + raw_text = read_file_content(str(file_path)) + if not raw_text: + raise ValueError("文件内容为空") + + # 2. 构建提示词 + prompt = build_prompt(raw_text) + + # 3. 调用API抽取 + json_str = call_llm_api(prompt) + + # 4. 校验JSON结构 + valid_data = validate_json(json_str) + + # 5. 保存结果 + with open(output_path, "w", encoding="utf-8") as f: + json.dump(valid_data, f, ensure_ascii=False, indent=4) + + return True + + except Exception as e: + # 记录错误日志 + logger.error(f"处理文件{file_path.name}失败:{str(e)}") + return False + +# ===================== 4. 批量处理主函数 ===================== +def batch_extract(): + """批量处理目录下的所有文件""" + # 1. 检查目录 + check_and_create_dirs() + + # 2. 获取所有待处理文件 + input_files = [] + for file in Path(Config.INPUT_DIR).glob("*"): + if file.is_file() and file.suffix in Config.SUPPORTED_EXTENSIONS: + input_files.append(file) + + if not input_files: + logger.error(f"输入目录{Config.INPUT_DIR}下无支持的文件(支持后缀:{Config.SUPPORTED_EXTENSIONS})") + return + + # 3. 批量处理(带进度条) + success_count = 0 + fail_count = 0 + with tqdm(total=len(input_files), desc="批量抽取进度") as pbar: + for file in input_files: + if process_single_file(file): + success_count += 1 + else: + fail_count += 1 + pbar.update(1) + + # 4. 输出汇总结果 + print("\n" + "="*50) + print(f"批量处理完成!") + print(f"总文件数:{len(input_files)}") + print(f"成功数:{success_count}") + print(f"失败数:{fail_count}") + print(f"失败日志:{Config.ERROR_LOG_PATH}") + print(f"结果目录:{Config.OUTPUT_DIR}") + print("="*50) + +# ===================== 5. 适配其他大模型的扩展(可选) ===================== +# 若使用OpenAI/GPT: +# def call_llm_api(prompt: str) -> str: +# headers = {"Content-Type": "application/json", "Authorization": f"Bearer {Config.API_KEY}"} +# data = { +# "model": "gpt-3.5-turbo", +# "messages": [{"role": "user", "content": prompt}], +# "temperature": 0.0 +# } +# response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=data) +# return response.json()["choices"][0]["message"]["content"] + +# ===================== 6. 主入口 ===================== +if __name__ == "__main__": + # 安装依赖(首次运行需执行) + # pip install requests tqdm + + # 执行批量抽取 + batch_extract() \ No newline at end of file diff --git a/batch_pdf_resume_to_txt.py b/batch_pdf_resume_to_txt.py new file mode 100644 index 0000000..d613180 --- /dev/null +++ b/batch_pdf_resume_to_txt.py @@ -0,0 +1,57 @@ +import PyPDF2 +import os +from pathlib import Path + +def specified_pdf_to_txt(pdf_file_path: str, save_dir: str): + """ + 规则:文件名与原PDF完全一致 | 已存在则跳过不重复转换 + :param pdf_file_path: 待转换的PDF完整路径 + :param save_dir: TXT文件保存目录 + """ + # 1. 校验PDF文件是否存在 + pdf_path = Path(pdf_file_path) + if not pdf_path.exists() or pdf_path.suffix.lower() != ".pdf": + print(f"错误:文件不存在或不是PDF格式 → {pdf_file_path}") + return False + + # 2. 创建保存目录(不存在则自动创建) + os.makedirs(save_dir, exist_ok=True) + + # 3. 生成目标TXT路径(**保留原PDF文件名,仅改后缀**) + txt_filename = pdf_path.stem + ".txt" + txt_save_path = Path(save_dir) / txt_filename + + # 4. 关键:已存在同名TXT → 跳过,不重复转换 + if txt_save_path.exists(): + print(f"跳过:{txt_filename} 已存在,无需重复转换") + return True + + # 5. 执行PDF转TXT + try: + with open(pdf_path, "rb") as f: + reader = PyPDF2.PdfReader(f) + full_text = "" + for page in reader.pages: + page_text = page.extract_text() + if page_text: + full_text += page_text + "\n\n" + + # 写入TXT到指定目录 + with open(txt_save_path, "w", encoding="utf-8") as f: + f.write(full_text) + + print(f"✅ 转换成功:{pdf_path.name} → {txt_save_path}") + return True + + except Exception as e: + print(f"❌ 转换失败:{str(e)}") + return False + +if __name__ == "__main__": + # 待转换的【指定PDF文件完整路径】 + TARGET_PDF = "./前后端/陈盼良简历(2).pdf" + # TXT文件【指定保存目录】 + SAVE_DIRECTORY = "./output/前后端" + + # 执行转换 + specified_pdf_to_txt(TARGET_PDF, SAVE_DIRECTORY) \ No newline at end of file diff --git a/batch_resume_pdf_to_txt_clean_final.py b/batch_resume_pdf_to_txt_clean_final.py new file mode 100644 index 0000000..3baf2b0 --- /dev/null +++ b/batch_resume_pdf_to_txt_clean_final.py @@ -0,0 +1,121 @@ +import pdfplumber +import os +import re +from pathlib import Path + +# ===================== 终极清洗配置(全平台防伪码通杀) ===================== +# 终极正则:匹配 任意长度 字母/数字/下划线 组合 + 结尾~~ 的所有无意义字符 +FAKE_CODE_PATTERN = re.compile(r'[A-Za-z0-9_]{30,}~~') +# ========================================================================== + +def clean_resume_text(raw_text: str) -> str: + """ + 终极文本清洗:彻底清除所有平台防伪码、隐形水印、重复乱码 + 1. 通杀所有 长随机字符+~~ 的防伪码 + 2. 清理多余空行、空格,保留简历正常格式 + """ + # 第一步:全局清除所有匹配的防伪乱码(核心根治) + cleaned_text = FAKE_CODE_PATTERN.sub('', raw_text) + + # 第二步:清理多余空行(≥3个换行→2个换行,保留段落) + cleaned_text = re.sub(r'\n{3,}', '\n\n', cleaned_text) + + # 第三步:清理行首行尾空格、多余空格 + cleaned_text = re.sub(r'^[ ]+|[ ]+$', '', cleaned_text, flags=re.MULTILINE) + cleaned_text = re.sub(r' {2,}', ' ', cleaned_text) + + return cleaned_text.strip() + +def single_pdf_to_txt(pdf_file_path: Path, save_dir: Path) -> int: + """单文件PDF转TXT + 清洗乱码""" + if not pdf_file_path.exists() or pdf_file_path.suffix.lower() != ".pdf": + print(f"❌ 无效文件:{pdf_file_path.name}") + return -1 + + # 同名保存,不修改文件名 + txt_filename = pdf_file_path.stem + ".txt" + txt_save_path = save_dir / txt_filename + + # 已存在则跳过,不重复转换 + if txt_save_path.exists(): + print(f"⏭️ 已存在,跳过:{txt_filename}") + return 0 + + try: + full_text = "" + with pdfplumber.open(pdf_file_path) as pdf: + for page in pdf.pages: + # 提取正文,过滤页边隐藏内容 + page_text = page.extract_text() + if page_text: + full_text += page_text + "\n\n" + + # 终极清洗 + cleaned_text = clean_resume_text(full_text) + + # 写入干净TXT + with open(txt_save_path, "w", encoding="utf-8") as f: + f.write(cleaned_text) + + print(f"✅ 转换+清洗完成:{pdf_file_path.name}") + return 1 + + except Exception as e: + print(f"❌ 失败:{pdf_file_path.name} | {str(e)}") + return -1 + +def batch_pdf_to_txt(pdf_input_dir: str, txt_save_dir: str, recursive: bool = True): + """批量PDF转TXT主函数""" + input_path = Path(pdf_input_dir) + save_path = Path(txt_save_dir) + + if not input_path.exists(): + print(f"❌ 输入目录不存在:{input_path}") + return + + os.makedirs(save_path, exist_ok=True) + + # 扫描所有PDF + if recursive: + pdf_files = list(input_path.rglob("*.[pP][dD][fF]")) + else: + pdf_files = list(input_path.glob("*.[pP][dD][fF]")) + + if not pdf_files: + print("⚠️ 未找到任何PDF文件") + return + + print(f"📄 共找到 {len(pdf_files)} 个PDF,开始处理...\n") + + # 统计结果 + success = 0 + skip = 0 + fail = 0 + + for pdf in pdf_files: + res = single_pdf_to_txt(pdf, save_path) + if res == 1: + success +=1 + elif res ==0: + skip +=1 + else: + fail +=1 + + # 输出结果 + print("\n" + "="*50) + print("🎉 批量处理完成") + print(f"总文件:{len(pdf_files)}") + print(f"✅ 成功:{success}") + print(f"⏭️ 跳过:{skip}") + print(f"❌ 失败:{fail}") + print(f"📂 输出目录:{save_path.absolute()}") + print("="*50) + +if __name__ == "__main__": + # ========== 只需改这3个配置 ========== + PDF_INPUT_DIR = "./简历" # 你的PDF文件夹 + TXT_SAVE_DIR = "./清晰后的简历" # 清洗后的TXT保存目录 + IS_RECURSIVE = True # 是否递归子文件夹 + # ==================================== + + batch_pdf_to_txt(PDF_INPUT_DIR, TXT_SAVE_DIR, IS_RECURSIVE) \ No newline at end of file diff --git a/cluster_wordcloud_plot.py b/cluster_wordcloud_plot.py new file mode 100644 index 0000000..119c1b7 --- /dev/null +++ b/cluster_wordcloud_plot.py @@ -0,0 +1,16 @@ +from wordcloud import WordCloud +import pandas as pd + +df = pd.read_csv("clustering_result.csv") +# 合并所有技术画像文本 +text = " ".join(df["技术画像"].dropna().tolist()) + +# 生成词云 +wc = WordCloud( + font_path="simhei.ttf", # 中文字体 + background_color="white", + width=1200, height=600, + max_words=100 +).generate(text) + +wc.to_file("赛道关键词词云图.png") \ No newline at end of file diff --git a/clustering_distribution_plot.py b/clustering_distribution_plot.py new file mode 100644 index 0000000..c8ab326 --- /dev/null +++ b/clustering_distribution_plot.py @@ -0,0 +1,27 @@ +import pandas as pd +import matplotlib.pyplot as plt +import matplotlib.patches as mpatches + +# 读取你的聚类结果CSV +df = pd.read_csv("./clustering_result.csv") +# 统计各赛道人数 +cluster_count = df["技术赛道标签"].value_counts() + +# 绘图设置 +plt.rcParams["font.sans-serif"] = ["SimHei"] +plt.figure(figsize=(10,7)) +colors = ["#FF6B6B","#4ECDC4","#45B7D1","#96CEB4","#FECA57","#DDA0DD"] + +# 绘制饼图 +wedges, texts, autotexts = plt.pie( + cluster_count.values, + labels=cluster_count.index, + colors=colors, + autopct="%1.1f%%", + startangle=90, + textprops={"fontsize":10} +) + +plt.title("简历聚类-技术赛道人数分布", fontsize=16, pad=20) +plt.savefig("聚类赛道分布饼图.png", dpi=300, bbox_inches="tight") +plt.show() \ No newline at end of file