Files
deal-classification/feature_extraction.py

452 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
'''
批量提取特征方法
很花时间
'''
from openai import OpenAI
import json
import os
from dotenv import load_dotenv
import re
# 加载环境变量
load_dotenv(dotenv_path=".env")
# 读取环境变量
API_KEY = os.getenv("QWEN_API_KEY")
MODEL = os.getenv("MODEL_NAME", "qwen3-next-80b-a3b-thinking")
TEMPERATURE = float(os.getenv("TEMPERATURE", 0.1)) # 降低随机性,提升格式稳定性
OUTPUT_DIR = os.getenv("OUTPUT_DIR", "time_12_1/data_ch_1")
client = OpenAI(
api_key=API_KEY,
# base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
base_url="https://sg1.proxy.yinlihupo.cc/proxy/https://openrouter.ai/api/v1"
)
def build_extraction_prompt(dialogue_text):
prompt_template = """
# Role
你是一名拥有15年经验的“家庭教育销售通话审计专家”。你的核心能力是透过家长杂乱的表述精准捕捉深层心理动机、家庭权力结构、隐形财富信号以及高危销售线索。
# Core Protocols (核心审计协议 - 最高优先级)
## 1. 宁缺毋滥原则 (The Principle of Precision)
* **存在即输出,无证即沉默**忽略任何关于“字段数量”的限制。如果原文中有20个维度的有效证据就输出20个如果只有3个就输出3个。
* **严禁凑数**:如果原文未提及某维度,或者证据模糊两可,**绝对不要**输出该 Key。不要为了追求“信息丰富”而强行填空。
## 2. 证据阵列法则 (The Law of Evidence Arrays)
* **数据结构变更**`evidence` 字段必须是 **字符串数组 (List<String>)**,严禁使用单一字符串。
* **颗粒度控制 (Granularity Control)**
* 数组中的元素必须是 **具有独立语义的完整原句** 或 **包含主谓宾的完整意群**。
* **禁止碎片**:严禁提取如 "不合适""太贵""焦虑" 这样缺乏上下文的短语。
* **主体过滤****仅提取家长(客户)表达的原话**,严禁提取销售人员的引导语、复述语或共情语。
* **纯净引用**:每一个元素必须是原文的 100% 完美复制。
* **严禁拼接**:严禁使用“+”、“和”、“以及”将两句不连贯的话拼在同一个字符串里。
* **严禁篡改**:禁止总结、禁止润色、禁止“原文+分析”。你的分析只能体现在 `value` 字段中。
## 3. 结论极简法则 (The Law of Concise Conclusion)
* **强制必输字段**`Follow_up_Priority` 是 **核心必选字段**,无论任何情况都必须输出,不允许缺失。
* **Follow_up_Priority 兜底规则**
- 若文本完全无痛点/无财力/无意识,`value` 填“C级 (无痛点/无意识)”,`evidence` 填 ["文本未提及任何痛点、财力或意向相关内容"]
- 若仅部分信息缺失,按规则评级并在 `evidence` 中列出已有有效原句。
* **Value 约束**`value` 字段必须是 **客观、简练的定性结论**(必须限制在 **20个汉字以内**)。
* *正确示例*: "A级 (高痛点+强财力)"
* *错误示例*: "家长表现出对价格的犹豫,虽然她很有钱,但是因为..." (禁止小作文)
## 4. 身份与财富的高敏嗅觉
* 对于**高价值信号**(职业/多孩/私立学校/房产)和**生命红线**(自杀/不想活了/抑郁症确诊)保持极度敏感,一旦出现必须提取。
# Task
阅读提供的销售通话录音文本,从以下 23 个预设维度中筛选出**有效信息**,生成一份高精度的客户画像 JSON。
# Field Definitions (字段定义与提取逻辑)
### [第一组:心理动力与危机]
1. **Core_Fear_Source** (深层恐惧)
* *逻辑*: 驱动家长寻求帮助的终极噩梦。是怕孩子死(生命安全)?怕孩子阶级跌落?还是怕自己面子挂不住?
* *注意*: 必须提取具体的后果描述。
2. **Pain_Threshold** (痛苦阈值)
* *逻辑*: 家长当前的情绪状态。是“崩溃急救”(无法忍受,必须马上解决),还是“隐隐作痛”(还能凑合)?
3. **Time_Window_Pressure** (时间压力)
* *逻辑*: 客观的截止日期。如距离中高考仅剩X月、休学复课最后期限、学校劝退通牒。
4. **Helplessness_Index** (无助指数)
* *逻辑*: 家长是否已经尝试过多种方法均失败(习得性无助),还是盲目自信觉得还能管。
5. **Social_Shame** (社交耻感)
* *逻辑*: 孩子问题是否影响了家长的社会形象(怕老师找、怕亲戚问、不敢出门)。
6. **Ultimatum_Event** (爆发事件)
* *逻辑*: 迫使家长此时此刻咨询的导火索。如:昨日发生的激烈争吵、离家出走、打架、学校停课通知。
7. **Emotional_Trigger** (情绪扳机)
* *逻辑*: 沟通中家长情绪最激动的点(哭泣、愤怒、颤抖)。
### [第二组:阻力与障碍]
8. **Secret_Resistance** (隐性抗拒)
* *逻辑*: **阻碍成交**的心理障碍。特指:怕被家人知道买课、怕孩子知道家长在咨询、觉得课程是骗局。
* *排除*: 孩子的生活秘密(如抽烟/早恋)不属于此字段。
9. **Trust_Deficit** (信任赤字)
* *逻辑*: 对机构/销售/网课模式的直接质疑。如:“你们正规吗?”“之前被骗过”。
10. **Family_Sabotage** (家庭阻力)
* *逻辑*: 家庭中明确的反对者或捣乱者(拆台的配偶、干涉的长辈、发病的家属)。
* *排除*: 客观的不幸(如家人生病/车祸)不属于此字段,除非该事件直接阻碍了家长听课。
11. **Low_Self_Efficacy** (效能感低)
* *逻辑*: 家长担心**自己**学不会、坚持不下来、没时间听课。
12. **Attribution_Barrier** (归因偏差)
* *逻辑*: 家长认为错在谁?(全是学校的错 / 全是手机的错 / 全是遗传的错 / 承认自己有错)。
### [第三组:资源与决策]
13. **Payer_Decision_Maker** (决策权)
* *逻辑*: 谁掌握财权?谁有一票否决权?是“妈妈独裁”还是“需商量”?
14. **Hidden_Wealth_Proof** (隐形财力)
* *逻辑*: 寻找高消费证据。如:私立学校、出国计划、高昂学费、住别墅、高知职业(教授/医生)。
15. **Price_Sensitivity** (价格敏感度)
* *逻辑*: 对价格的反应。是“只看效果不差钱”,还是“犹豫比价”、“哭穷”。
16. **Sunk_Cost** (沉没成本)
* *逻辑*: 过往已投入的无效成本。如之前报过xx辅导班、做过xx次心理咨询、花了xx万没效果。
17. **Compensatory_Spending** (补偿心理)
* *逻辑*: 是否因亏欠感而通过花钱(买东西/报课)来弥补孩子。
### [第四组:销售价值判断]
18. **Expectation_Bonus** (期望范围)
* *逻辑*: 家长的底线(只要活着/不退学)与理想(考大学/变优秀)。
19. **Competitor_Mindset** (竞品思维)
* *逻辑*: 家长是否在对比其他**解决方案**。如:特训学校、心理医生(针对孩子)、线下辅导班。
* *排除*: 家属的就医经历不属于此字段。
20. **Cognitive_Stage** (认知阶段)
* *逻辑*: 愚昧期(修孩子) -> 觉醒期(修自己/找方法)。
21. **Referral_Potential** (转介绍潜力)
* *逻辑*: 基于身份判断。重点捕捉:多孩家庭、教师/医生/教授/公务员身份KOL潜质、家长委员会成员。
22. **Last_Interaction** (互动状态)
* *逻辑*: 通话结束时的温度。秒回/挂断/索要案例/已读不回。
23. **Follow_up_Priority** (跟进优先级) - [重点监控字段]
* *逻辑*: 综合评级S/A/B/C
* **Extraction Rule (必须使用数组逻辑)**:
* 如果评级为 **S/A**(通常需要痛点+财力/意向双重支撑),必须在 `evidence` 数组中分别列出这两方面(甚至三方面)的原话。
* **S级**: 涉及生命安全 OR (极高痛点 + 强支付能力 + 强意向)。
* **A级**: 有痛点 + 有支付能力。
* **B级**: 有痛点 + 无支付能力/犹豫。
* **C级**: 无痛点/无意识。
# Output Format (输出格式指令)
**强制要求1**JSON 中必须包含 `Follow_up_Priority` 字段,否则视为无效输出。
**强制要求2**JSON 格式必须严格合法逗号分隔、引号成对、括号匹配可直接被JSON解析工具识别。
**强制要求3**:直接输出 JSON 对象,无需 Markdown 代码块、解释性文字或额外内容。
**强制要求4**若文中未提及除了Follow_up_Priority字段的22个字段内容就不要输出。
**强制要求5**:不准将示例模板内容作为证据输出,必须从原文中找证据。
JSON 结构要求:
1. **Key**: 仅使用上述定义中出现的英文 Key。
2. **Value**: 必须是 **<20字** 的短语结论。
3. **Evidence**: 必须是 **List<String>** (字符串数组)。
4. **Strict Validation (自我审查)**:
* 检查 `evidence` 是否包含“家长说”、“意思就是”? -> 若有,**改为纯引用**。
* 检查 `evidence` 是否包含销售说的话? -> 若有,**删除该元素**。
* 检查 JSON 语法是否正确? -> 确保逗号不遗漏、括号成对。
* 不准将示例模板内容作为证据输出,必须从原文中找证据。
**Example Output:**
{
"Follow_up_Priority": {
"value": "A级 (痛点强+财力足)",
"evidence": [
"我是今年才确诊,他是焦虑的", // 证据1完整原句支撑痛点
"孩子现在在西工大附中上学", // 证据2完整原句支撑隐形财力
"留学基金我们已经准备好了" // 证据3完整原句支撑支付能力
]
},
"Pain_Threshold": {
"value": "崩溃急救状态",
"evidence": [
"我不知道怎么来处理",
"一看见难了就崩溃啊就崩溃"
]
},
"Trust_Deficit": {
"value": "质疑课程通用性",
"evidence": [
"你这些课不都是通用的吗?"
]
}
}"""
full_prompt = f"{prompt_template}\n\n### 原始通话文本\n{dialogue_text}\n\n### 请严格按照上述要求输出JSON仅JSON无其他内容"
return full_prompt
def clean_and_fix_json(json_str):
"""清洗JSON格式不补充兜底内容"""
try:
# 移除转义符、控制字符和多余空格
json_str = json_str.replace('\\"', '"').replace("\\'", "'")
json_str = re.sub(r'[\n\r\t\f\v]', '', json_str)
json_str = re.sub(r'\s+', ' ', json_str).strip()
# 修复末尾多余逗号
json_str = re.sub(r",\s*}", "}", json_str)
json_str = re.sub(r",\s*]", "]", json_str)
return json_str
except Exception as e:
raise RuntimeError(f"JSON清洗失败: {str(e)}") from e
def extract_features_with_qwen(dialogue_text, file_name, output_dir="qwen_new_123"):
"""
调用API提取特征并保存为JSON文件
:param dialogue_text: 预处理后的对话文本(字符串)
:param file_name: 原文件名称(用于生成输出文件名)
:param output_dir: 结果保存目录
:return: 提取的特征字典失败则返回None
"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
prompt = build_extraction_prompt(dialogue_text)
try:
response = client.chat.completions.create(
model=MODEL,
messages=[{"role": "user", "content": prompt}],
temperature=TEMPERATURE,
max_tokens=8000
)
feature_json_str = response.choices[0].message.content.strip()
# 提取JSON片段
json_match = re.search(r"\{[\s\S]*\}", feature_json_str)
if json_match:
feature_json_str = json_match.group()
else:
raise ValueError("返回内容中未找到有效JSON数据")
# 移除代码块标记
if feature_json_str.startswith("```json"):
feature_json_str = feature_json_str[7:-3].strip()
elif feature_json_str.startswith("```"):
feature_json_str = feature_json_str[3:-3].strip()
feature_dict = json.loads(feature_json_str)
# 验证核心字段
if "Follow_up_Priority" not in feature_dict:
raise ValueError("返回结果缺失核心必选字段Follow_up_Priority")
# 生成输出文件名
file_base = os.path.splitext(file_name)[0]
json_filename = f"{file_base}.json"
output_path = os.path.join(output_dir, json_filename)
# 保存文件
with open(output_path, "w", encoding="utf-8") as f:
json.dump(feature_dict, f, ensure_ascii=False, indent=2)
print(f"处理完成:{file_name} -> {json_filename}")
return feature_dict
except json.JSONDecodeError as e:
print(f"JSON解析失败 {file_name}{str(e)} | 原始内容:{feature_json_str[:200]}...")
return None
except ValueError as e:
print(f"数据验证失败 {file_name}{str(e)}")
return None
except Exception as e:
print(f"特征提取失败 {file_name}{str(e)}")
return None
# 批量处理函数
def batch_process_first_200_txt(folder_path, output_dir):
"""
仅处理指定文件夹下的前200个txt文件
:param folder_path: 待处理文件夹路径
:param output_dir: 结果输出目录
"""
# 检查文件夹是否存在
if not os.path.isdir(folder_path):
print(f"文件夹不存在:{folder_path}")
return
# 筛选出文件夹中的txt文件并按名称排序保证处理顺序稳定
txt_file_list = [
f for f in os.listdir(folder_path)
if os.path.isfile(os.path.join(folder_path, f)) and f.lower().endswith(".txt")
]
# 按文件名排序(可选,保证每次处理顺序一致)
txt_file_list.sort()
# 取前200个txt文件
target_files = txt_file_list[:200]
if not target_files:
print(f"文件夹 {folder_path} 中无txt文件可处理")
return
print(f"始处理前 {len(target_files)} 个txt文件")
processed_count = 0
failed_count = 0
for file_name in target_files:
file_path = os.path.join(folder_path, file_name)
# 读取文件内容
try:
with open(file_path, "r", encoding="utf-8") as f:
dialogue_content = f.read().strip()
if not dialogue_content:
print(f"文件内容为空,跳过:{file_name}")
failed_count += 1
continue
except Exception as e:
print(f"读取文件失败 {file_name}{str(e)}")
failed_count += 1
continue
# 调用特征提取函数
result = extract_features_with_qwen(dialogue_content, file_name, output_dir)
if result:
processed_count += 1
else:
failed_count += 1
# 输出批量处理统计结果
print("\n批量处理完成")
print(f"成功处理:{processed_count} 个文件")
print(f"处理失败:{failed_count} 个文件")
print(f"结果保存至:{os.path.abspath(output_dir)}")
def process_single_txt(file_path, output_dir=OUTPUT_DIR):
"""
处理单个TXT文件提取特征并保存JSON
:param file_path: 单个TXT文件的完整路径
:param output_dir: JSON结果保存目录
"""
# 1. 验证文件是否存在且是TXT文件
if not os.path.exists(file_path):
raise FileNotFoundError(f"文件不存在:{file_path}")
if not file_path.lower().endswith(".txt"):
raise ValueError(f"不是TXT文件{file_path}")
if not os.path.isfile(file_path):
raise IsADirectoryError(f"这是文件夹,不是文件:{file_path}")
# 2. 读取TXT文件内容
print(f"正在读取文件:{file_path}")
try:
with open(file_path, "r", encoding="utf-8") as f:
dialogue_content = f.read().strip()
if not dialogue_content:
raise ValueError("文件内容为空")
print(f"成功读取文件(字符数:{len(dialogue_content)}")
except Exception as e:
raise RuntimeError(f"读取文件失败:{str(e)}") from e
# 3. 构建提示词并调用API
prompt = build_extraction_prompt(dialogue_content)
try:
print("正在调用API提取特征...")
response = client.chat.completions.create(
model=MODEL,
messages=[{"role": "user", "content": prompt}],
temperature=TEMPERATURE,
max_tokens=8000,
timeout=30
)
except Exception as e:
raise RuntimeError(f"API调用失败{str(e)}") from e
# 4. 提取并清洗JSON
feature_json_str = response.choices[0].message.content.strip()
json_match = re.search(r"\{[\s\S]*\}", feature_json_str)
if not json_match:
raise RuntimeError(f"API返回无有效JSON{feature_json_str[:200]}...")
cleaned_json = clean_and_fix_json(json_match.group())
# 5. 解析并验证JSON
try:
parsed_dict = json.loads(cleaned_json)
except json.JSONDecodeError as e:
raise RuntimeError(f"JSON解析失败{str(e)} | 清洗后内容:{cleaned_json[:500]}") from e
# 验证核心字段
if "Follow_up_Priority" not in parsed_dict:
raise RuntimeError("核心字段Follow_up_Priority缺失")
fu_prio = parsed_dict["Follow_up_Priority"]
if not isinstance(fu_prio, dict) or "value" not in fu_prio or "evidence" not in fu_prio:
raise RuntimeError("Follow_up_Priority格式错误需包含value和evidence")
if not isinstance(fu_prio["evidence"], list):
raise RuntimeError("evidence必须是数组类型")
if len(str(fu_prio["value"])) >= 20:
raise RuntimeError(f"value超20字限制{fu_prio['value']}")
# 6. 保存JSON结果
os.makedirs(output_dir, exist_ok=True)
file_name = os.path.basename(file_path)
json_file_name = f"{os.path.splitext(file_name)[0]}"
json_save_path = os.path.join(output_dir, json_file_name)
try:
with open(json_save_path, "w", encoding="utf-8") as f:
json.dump(parsed_dict, f, ensure_ascii=False, indent=2)
print(f"处理完成JSON保存至{json_save_path}")
return parsed_dict
except Exception as e:
raise RuntimeError(f"保存JSON失败{str(e)}") from e
async def process_single(content : str):
# 3. 构建提示词并调用API
prompt = build_extraction_prompt(content)
try:
print("正在调用API提取特征...")
response = client.chat.completions.create(
model=MODEL,
messages=[{"role": "user", "content": prompt}],
temperature=TEMPERATURE,
max_tokens=8000,
timeout=30
)
except Exception as e:
raise RuntimeError(f"API调用失败{str(e)}") from e
# 4. 提取并清洗JSON
feature_json_str = response.choices[0].message.content.strip()
json_match = re.search(r"\{[\s\S]*\}", feature_json_str)
if not json_match:
raise RuntimeError(f"API返回无有效JSON{feature_json_str[:200]}...")
cleaned_json = clean_and_fix_json(json_match.group())
# 5. 解析并验证JSON
try:
parsed_dict = json.loads(cleaned_json)
except json.JSONDecodeError as e:
raise RuntimeError(f"JSON解析失败{str(e)} | 清洗后内容:{cleaned_json[:500]}") from e
# 验证核心字段
if "Follow_up_Priority" not in parsed_dict:
raise RuntimeError("核心字段Follow_up_Priority缺失")
fu_prio = parsed_dict["Follow_up_Priority"]
if not isinstance(fu_prio, dict) or "value" not in fu_prio or "evidence" not in fu_prio:
raise RuntimeError("Follow_up_Priority格式错误需包含value和evidence")
if not isinstance(fu_prio["evidence"], list):
raise RuntimeError("evidence必须是数组类型")
if len(str(fu_prio["value"])) >= 20:
raise RuntimeError(f"value超20字限制{fu_prio['value']}")
return parsed_dict
if __name__ == "__main__":
# 要处理的源文件夹路径
target_folder = "./time_12_1/data_200"
# 执行批量处理仅处理前200个txt文件输出到
batch_process_first_200_txt(
folder_path=target_folder,
output_dir=OUTPUT_DIR
)
# SINGLE_TXT_PATH = "./qwen/cdb7d561-975a-431e-86d4-9b3ddc714f73.txt"
# try:
# # 执行单个文件处理
# process_single_txt(file_path=SINGLE_TXT_PATH)
# except Exception as e:
# print(f"\n处理失败{str(e)}")