Refactor code structure for improved readability and maintainability

This commit is contained in:
2026-01-30 14:13:45 +08:00
parent 3dd90cc50e
commit 3d78b88d47
4 changed files with 605 additions and 1 deletions

411
feature_extraction.py Normal file
View File

@@ -0,0 +1,411 @@
'''
批量提取特征方法
很花时间
'''
from openai import OpenAI
import json
import os
from dotenv import load_dotenv
import re
# 加载环境变量
load_dotenv(dotenv_path=".env")
# 读取环境变量
API_KEY = os.getenv("QWEN_API_KEY")
MODEL = os.getenv("MODEL_NAME", "qwen3-next-80b-a3b-thinking")
TEMPERATURE = float(os.getenv("TEMPERATURE", 0.1)) # 降低随机性,提升格式稳定性
OUTPUT_DIR = os.getenv("OUTPUT_DIR", "time_12_1/data_ch_1")
client = OpenAI(
api_key=API_KEY,
# base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
base_url="https://sg1.proxy.yinlihupo.cc/proxy/https://openrouter.ai/api/v1"
)
def build_extraction_prompt(dialogue_text):
prompt_template = """
# Role
你是一名拥有15年经验的“家庭教育销售通话审计专家”。你的核心能力是透过家长杂乱的表述精准捕捉深层心理动机、家庭权力结构、隐形财富信号以及高危销售线索。
# Core Protocols (核心审计协议 - 最高优先级)
## 1. 宁缺毋滥原则 (The Principle of Precision)
* **存在即输出,无证即沉默**忽略任何关于“字段数量”的限制。如果原文中有20个维度的有效证据就输出20个如果只有3个就输出3个。
* **严禁凑数**:如果原文未提及某维度,或者证据模糊两可,**绝对不要**输出该 Key。不要为了追求“信息丰富”而强行填空。
## 2. 证据阵列法则 (The Law of Evidence Arrays)
* **数据结构变更**`evidence` 字段必须是 **字符串数组 (List<String>)**,严禁使用单一字符串。
* **颗粒度控制 (Granularity Control)**
* 数组中的元素必须是 **具有独立语义的完整原句** 或 **包含主谓宾的完整意群**。
* **禁止碎片**:严禁提取如 "不合适""太贵""焦虑" 这样缺乏上下文的短语。
* **主体过滤****仅提取家长(客户)表达的原话**,严禁提取销售人员的引导语、复述语或共情语。
* **纯净引用**:每一个元素必须是原文的 100% 完美复制。
* **严禁拼接**:严禁使用“+”、“和”、“以及”将两句不连贯的话拼在同一个字符串里。
* **严禁篡改**:禁止总结、禁止润色、禁止“原文+分析”。你的分析只能体现在 `value` 字段中。
## 3. 结论极简法则 (The Law of Concise Conclusion)
* **强制必输字段**`Follow_up_Priority` 是 **核心必选字段**,无论任何情况都必须输出,不允许缺失。
* **Follow_up_Priority 兜底规则**
- 若文本完全无痛点/无财力/无意识,`value` 填“C级 (无痛点/无意识)”,`evidence` 填 ["文本未提及任何痛点、财力或意向相关内容"]
- 若仅部分信息缺失,按规则评级并在 `evidence` 中列出已有有效原句。
* **Value 约束**`value` 字段必须是 **客观、简练的定性结论**(必须限制在 **20个汉字以内**)。
* *正确示例*: "A级 (高痛点+强财力)"
* *错误示例*: "家长表现出对价格的犹豫,虽然她很有钱,但是因为..." (禁止小作文)
## 4. 身份与财富的高敏嗅觉
* 对于**高价值信号**(职业/多孩/私立学校/房产)和**生命红线**(自杀/不想活了/抑郁症确诊)保持极度敏感,一旦出现必须提取。
# Task
阅读提供的销售通话录音文本,从以下 23 个预设维度中筛选出**有效信息**,生成一份高精度的客户画像 JSON。
# Field Definitions (字段定义与提取逻辑)
### [第一组:心理动力与危机]
1. **Core_Fear_Source** (深层恐惧)
* *逻辑*: 驱动家长寻求帮助的终极噩梦。是怕孩子死(生命安全)?怕孩子阶级跌落?还是怕自己面子挂不住?
* *注意*: 必须提取具体的后果描述。
2. **Pain_Threshold** (痛苦阈值)
* *逻辑*: 家长当前的情绪状态。是“崩溃急救”(无法忍受,必须马上解决),还是“隐隐作痛”(还能凑合)?
3. **Time_Window_Pressure** (时间压力)
* *逻辑*: 客观的截止日期。如距离中高考仅剩X月、休学复课最后期限、学校劝退通牒。
4. **Helplessness_Index** (无助指数)
* *逻辑*: 家长是否已经尝试过多种方法均失败(习得性无助),还是盲目自信觉得还能管。
5. **Social_Shame** (社交耻感)
* *逻辑*: 孩子问题是否影响了家长的社会形象(怕老师找、怕亲戚问、不敢出门)。
6. **Ultimatum_Event** (爆发事件)
* *逻辑*: 迫使家长此时此刻咨询的导火索。如:昨日发生的激烈争吵、离家出走、打架、学校停课通知。
7. **Emotional_Trigger** (情绪扳机)
* *逻辑*: 沟通中家长情绪最激动的点(哭泣、愤怒、颤抖)。
### [第二组:阻力与障碍]
8. **Secret_Resistance** (隐性抗拒)
* *逻辑*: **阻碍成交**的心理障碍。特指:怕被家人知道买课、怕孩子知道家长在咨询、觉得课程是骗局。
* *排除*: 孩子的生活秘密(如抽烟/早恋)不属于此字段。
9. **Trust_Deficit** (信任赤字)
* *逻辑*: 对机构/销售/网课模式的直接质疑。如:“你们正规吗?”“之前被骗过”。
10. **Family_Sabotage** (家庭阻力)
* *逻辑*: 家庭中明确的反对者或捣乱者(拆台的配偶、干涉的长辈、发病的家属)。
* *排除*: 客观的不幸(如家人生病/车祸)不属于此字段,除非该事件直接阻碍了家长听课。
11. **Low_Self_Efficacy** (效能感低)
* *逻辑*: 家长担心**自己**学不会、坚持不下来、没时间听课。
12. **Attribution_Barrier** (归因偏差)
* *逻辑*: 家长认为错在谁?(全是学校的错 / 全是手机的错 / 全是遗传的错 / 承认自己有错)。
### [第三组:资源与决策]
13. **Payer_Decision_Maker** (决策权)
* *逻辑*: 谁掌握财权?谁有一票否决权?是“妈妈独裁”还是“需商量”?
14. **Hidden_Wealth_Proof** (隐形财力)
* *逻辑*: 寻找高消费证据。如:私立学校、出国计划、高昂学费、住别墅、高知职业(教授/医生)。
15. **Price_Sensitivity** (价格敏感度)
* *逻辑*: 对价格的反应。是“只看效果不差钱”,还是“犹豫比价”、“哭穷”。
16. **Sunk_Cost** (沉没成本)
* *逻辑*: 过往已投入的无效成本。如之前报过xx辅导班、做过xx次心理咨询、花了xx万没效果。
17. **Compensatory_Spending** (补偿心理)
* *逻辑*: 是否因亏欠感而通过花钱(买东西/报课)来弥补孩子。
### [第四组:销售价值判断]
18. **Expectation_Bonus** (期望范围)
* *逻辑*: 家长的底线(只要活着/不退学)与理想(考大学/变优秀)。
19. **Competitor_Mindset** (竞品思维)
* *逻辑*: 家长是否在对比其他**解决方案**。如:特训学校、心理医生(针对孩子)、线下辅导班。
* *排除*: 家属的就医经历不属于此字段。
20. **Cognitive_Stage** (认知阶段)
* *逻辑*: 愚昧期(修孩子) -> 觉醒期(修自己/找方法)。
21. **Referral_Potential** (转介绍潜力)
* *逻辑*: 基于身份判断。重点捕捉:多孩家庭、教师/医生/教授/公务员身份KOL潜质、家长委员会成员。
22. **Last_Interaction** (互动状态)
* *逻辑*: 通话结束时的温度。秒回/挂断/索要案例/已读不回。
23. **Follow_up_Priority** (跟进优先级) - [重点监控字段]
* *逻辑*: 综合评级S/A/B/C
* **Extraction Rule (必须使用数组逻辑)**:
* 如果评级为 **S/A**(通常需要痛点+财力/意向双重支撑),必须在 `evidence` 数组中分别列出这两方面(甚至三方面)的原话。
* **S级**: 涉及生命安全 OR (极高痛点 + 强支付能力 + 强意向)。
* **A级**: 有痛点 + 有支付能力。
* **B级**: 有痛点 + 无支付能力/犹豫。
* **C级**: 无痛点/无意识。
# Output Format (输出格式指令)
**强制要求1**JSON 中必须包含 `Follow_up_Priority` 字段,否则视为无效输出。
**强制要求2**JSON 格式必须严格合法逗号分隔、引号成对、括号匹配可直接被JSON解析工具识别。
**强制要求3**:直接输出 JSON 对象,无需 Markdown 代码块、解释性文字或额外内容。
**强制要求4**若文中未提及除了Follow_up_Priority字段的22个字段内容就不要输出。
**强制要求5**:不准将示例模板内容作为证据输出,必须从原文中找证据。
JSON 结构要求:
1. **Key**: 仅使用上述定义中出现的英文 Key。
2. **Value**: 必须是 **<20字** 的短语结论。
3. **Evidence**: 必须是 **List<String>** (字符串数组)。
4. **Strict Validation (自我审查)**:
* 检查 `evidence` 是否包含“家长说”、“意思就是”? -> 若有,**改为纯引用**。
* 检查 `evidence` 是否包含销售说的话? -> 若有,**删除该元素**。
* 检查 JSON 语法是否正确? -> 确保逗号不遗漏、括号成对。
* 不准将示例模板内容作为证据输出,必须从原文中找证据。
**Example Output:**
{
"Follow_up_Priority": {
"value": "A级 (痛点强+财力足)",
"evidence": [
"我是今年才确诊,他是焦虑的", // 证据1完整原句支撑痛点
"孩子现在在西工大附中上学", // 证据2完整原句支撑隐形财力
"留学基金我们已经准备好了" // 证据3完整原句支撑支付能力
]
},
"Pain_Threshold": {
"value": "崩溃急救状态",
"evidence": [
"我不知道怎么来处理",
"一看见难了就崩溃啊就崩溃"
]
},
"Trust_Deficit": {
"value": "质疑课程通用性",
"evidence": [
"你这些课不都是通用的吗?"
]
}
}"""
full_prompt = f"{prompt_template}\n\n### 原始通话文本\n{dialogue_text}\n\n### 请严格按照上述要求输出JSON仅JSON无其他内容"
return full_prompt
def clean_and_fix_json(json_str):
"""清洗JSON格式不补充兜底内容"""
try:
# 移除转义符、控制字符和多余空格
json_str = json_str.replace('\\"', '"').replace("\\'", "'")
json_str = re.sub(r'[\n\r\t\f\v]', '', json_str)
json_str = re.sub(r'\s+', ' ', json_str).strip()
# 修复末尾多余逗号
json_str = re.sub(r",\s*}", "}", json_str)
json_str = re.sub(r",\s*]", "]", json_str)
return json_str
except Exception as e:
raise RuntimeError(f"JSON清洗失败: {str(e)}") from e
def extract_features_with_qwen(dialogue_text, file_name, output_dir="qwen_new_123"):
"""
调用API提取特征并保存为JSON文件
:param dialogue_text: 预处理后的对话文本(字符串)
:param file_name: 原文件名称(用于生成输出文件名)
:param output_dir: 结果保存目录
:return: 提取的特征字典失败则返回None
"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
prompt = build_extraction_prompt(dialogue_text)
try:
response = client.chat.completions.create(
model=MODEL,
messages=[{"role": "user", "content": prompt}],
temperature=TEMPERATURE,
max_tokens=8000
)
feature_json_str = response.choices[0].message.content.strip()
# 提取JSON片段
json_match = re.search(r"\{[\s\S]*\}", feature_json_str)
if json_match:
feature_json_str = json_match.group()
else:
raise ValueError("返回内容中未找到有效JSON数据")
# 移除代码块标记
if feature_json_str.startswith("```json"):
feature_json_str = feature_json_str[7:-3].strip()
elif feature_json_str.startswith("```"):
feature_json_str = feature_json_str[3:-3].strip()
feature_dict = json.loads(feature_json_str)
# 验证核心字段
if "Follow_up_Priority" not in feature_dict:
raise ValueError("返回结果缺失核心必选字段Follow_up_Priority")
# 生成输出文件名
file_base = os.path.splitext(file_name)[0]
json_filename = f"{file_base}.json"
output_path = os.path.join(output_dir, json_filename)
# 保存文件
with open(output_path, "w", encoding="utf-8") as f:
json.dump(feature_dict, f, ensure_ascii=False, indent=2)
print(f"处理完成:{file_name} -> {json_filename}")
return feature_dict
except json.JSONDecodeError as e:
print(f"JSON解析失败 {file_name}{str(e)} | 原始内容:{feature_json_str[:200]}...")
return None
except ValueError as e:
print(f"数据验证失败 {file_name}{str(e)}")
return None
except Exception as e:
print(f"特征提取失败 {file_name}{str(e)}")
return None
# 批量处理函数
def batch_process_first_200_txt(folder_path, output_dir):
"""
仅处理指定文件夹下的前200个txt文件
:param folder_path: 待处理文件夹路径
:param output_dir: 结果输出目录
"""
# 检查文件夹是否存在
if not os.path.isdir(folder_path):
print(f"文件夹不存在:{folder_path}")
return
# 筛选出文件夹中的txt文件并按名称排序保证处理顺序稳定
txt_file_list = [
f for f in os.listdir(folder_path)
if os.path.isfile(os.path.join(folder_path, f)) and f.lower().endswith(".txt")
]
# 按文件名排序(可选,保证每次处理顺序一致)
txt_file_list.sort()
# 取前200个txt文件
target_files = txt_file_list[:200]
if not target_files:
print(f"文件夹 {folder_path} 中无txt文件可处理")
return
print(f"始处理前 {len(target_files)} 个txt文件")
processed_count = 0
failed_count = 0
for file_name in target_files:
file_path = os.path.join(folder_path, file_name)
# 读取文件内容
try:
with open(file_path, "r", encoding="utf-8") as f:
dialogue_content = f.read().strip()
if not dialogue_content:
print(f"文件内容为空,跳过:{file_name}")
failed_count += 1
continue
except Exception as e:
print(f"读取文件失败 {file_name}{str(e)}")
failed_count += 1
continue
# 调用特征提取函数
result = extract_features_with_qwen(dialogue_content, file_name, output_dir)
if result:
processed_count += 1
else:
failed_count += 1
# 输出批量处理统计结果
print("\n批量处理完成")
print(f"成功处理:{processed_count} 个文件")
print(f"处理失败:{failed_count} 个文件")
print(f"结果保存至:{os.path.abspath(output_dir)}")
def process_single_txt(file_path, output_dir=OUTPUT_DIR):
"""
处理单个TXT文件提取特征并保存JSON
:param file_path: 单个TXT文件的完整路径
:param output_dir: JSON结果保存目录
"""
# 1. 验证文件是否存在且是TXT文件
if not os.path.exists(file_path):
raise FileNotFoundError(f"文件不存在:{file_path}")
if not file_path.lower().endswith(".txt"):
raise ValueError(f"不是TXT文件{file_path}")
if not os.path.isfile(file_path):
raise IsADirectoryError(f"这是文件夹,不是文件:{file_path}")
# 2. 读取TXT文件内容
print(f"正在读取文件:{file_path}")
try:
with open(file_path, "r", encoding="utf-8") as f:
dialogue_content = f.read().strip()
if not dialogue_content:
raise ValueError("文件内容为空")
print(f"成功读取文件(字符数:{len(dialogue_content)}")
except Exception as e:
raise RuntimeError(f"读取文件失败:{str(e)}") from e
# 3. 构建提示词并调用API
prompt = build_extraction_prompt(dialogue_content)
try:
print("正在调用API提取特征...")
response = client.chat.completions.create(
model=MODEL,
messages=[{"role": "user", "content": prompt}],
temperature=TEMPERATURE,
max_tokens=8000,
timeout=30
)
except Exception as e:
raise RuntimeError(f"API调用失败{str(e)}") from e
# 4. 提取并清洗JSON
feature_json_str = response.choices[0].message.content.strip()
json_match = re.search(r"\{[\s\S]*\}", feature_json_str)
if not json_match:
raise RuntimeError(f"API返回无有效JSON{feature_json_str[:200]}...")
cleaned_json = clean_and_fix_json(json_match.group())
# 5. 解析并验证JSON
try:
parsed_dict = json.loads(cleaned_json)
except json.JSONDecodeError as e:
raise RuntimeError(f"JSON解析失败{str(e)} | 清洗后内容:{cleaned_json[:500]}") from e
# 验证核心字段
if "Follow_up_Priority" not in parsed_dict:
raise RuntimeError("核心字段Follow_up_Priority缺失")
fu_prio = parsed_dict["Follow_up_Priority"]
if not isinstance(fu_prio, dict) or "value" not in fu_prio or "evidence" not in fu_prio:
raise RuntimeError("Follow_up_Priority格式错误需包含value和evidence")
if not isinstance(fu_prio["evidence"], list):
raise RuntimeError("evidence必须是数组类型")
if len(str(fu_prio["value"])) >= 20:
raise RuntimeError(f"value超20字限制{fu_prio['value']}")
# 6. 保存JSON结果
os.makedirs(output_dir, exist_ok=True)
file_name = os.path.basename(file_path)
json_file_name = f"{os.path.splitext(file_name)[0]}"
json_save_path = os.path.join(output_dir, json_file_name)
try:
with open(json_save_path, "w", encoding="utf-8") as f:
json.dump(parsed_dict, f, ensure_ascii=False, indent=2)
print(f"处理完成JSON保存至{json_save_path}")
return parsed_dict
except Exception as e:
raise RuntimeError(f"保存JSON失败{str(e)}") from e
if __name__ == "__main__":
# 要处理的源文件夹路径
target_folder = "./time_12_1/data_200"
# 执行批量处理仅处理前200个txt文件输出到
batch_process_first_200_txt(
folder_path=target_folder,
output_dir=OUTPUT_DIR
)
# SINGLE_TXT_PATH = "./qwen/cdb7d561-975a-431e-86d4-9b3ddc714f73.txt"
# try:
# # 执行单个文件处理
# process_single_txt(file_path=SINGLE_TXT_PATH)
# except Exception as e:
# print(f"\n处理失败{str(e)}")