from .content_extract import extract_json_files, extract_json_data from collections import Counter import matplotlib.pyplot as plt import json import pandas as pd from typing import Dict, List, Tuple, Optional valid_keys = [ "Core_Fear_Source", "Pain_Threshold", "Time_Window_Pressure", "Helplessness_Index", "Social_Shame", "Payer_Decision_Maker", "Hidden_Wealth_Proof", "Price_Sensitivity", "Sunk_Cost", "Compensatory_Spending", "Trust_Deficit", "Secret_Resistance", "Family_Sabotage", "Low_Self_Efficacy", "Attribution_Barrier", "Emotional_Trigger", "Ultimatum_Event", "Expectation_Bonus", "Competitor_Mindset", "Cognitive_Stage", "Follow_up_Priority", "Last_Interaction", "Referral_Potential" ] ch_valid_keys = [ "核心恐惧源", "疼痛阈值", "时间窗口压力", "无助指数", "社会羞耻感", "付款决策者", "隐藏财富证明", "价格敏感度", "沉没成本", "补偿性消费", "信任赤字", "秘密抵触情绪", "家庭破坏", "低自我效能感", "归因障碍", "情绪触发点", "最后通牒事件", "期望加成", "竞争者心态", "认知阶段", "跟进优先级", "最后互动时间", "推荐潜力" ] all_keys = valid_keys + ["session_id", "label"] en2ch = {en:ch for en, ch in zip(valid_keys, ch_valid_keys)} d1_keys = valid_keys[:5] d2_keys = valid_keys[5:10] d3_keys = valid_keys[10:15] d4_keys = valid_keys[15:19] d5_keys = valid_keys[19:23] class StatisticData: def __init__(self, folder: str): self.data = extract_json_data(extract_json_files(folder)) self.session_ids = list(self.data.keys()) self.labels = list(self.data.values()) self.priorities = ["S", "A", "B", "C"] def statistic_priority(self): priority_full_counter = Counter() priority_counter = Counter() priority_full = [data["Follow_up_Priority"] for data in self.labels] priority_full_counter.update(priority_full) priority = [p[0].upper() for p in priority_full] self._check_priority(priority) priority_counter.update(priority) return priority_full_counter, priority_counter def _check_priority(self, priorities: list): for priority in priorities: if priority not in self.priorities: raise ValueError(f"Invalid priority {priority}") def statistic_other_keys(self): key2counter = {} for label in self.labels: for key in label.keys(): if key not in key2counter: key2counter[key] = Counter() key2counter[key].update([label[key]]) return key2counter def main(self): priority_full_counter, priority_counter = self.statistic_priority() key2counter = self.statistic_other_keys() return priority_full_counter, priority_counter, key2counter class Outputer: def __init__(self, deal_data, not_deal_data): self.deal_priority_full, self.deal_priority, self.deal_key2counter = deal_data self.not_deal_priority_full, self.not_deal_priority, self.not_deal_key2counter = not_deal_data self.deal_key2counter['Follow_up_Priority'] = self.deal_priority_full self.not_deal_key2counter['Follow_up_Priority'] = self.not_deal_priority_full def visualize_priority(self): # 准备数据 deal_labels = list(self.deal_priority.keys()) deal_sizes = list(self.deal_priority.values()) not_deal_labels = list(self.not_deal_priority.keys()) not_deal_sizes = list(self.not_deal_priority.values()) colors = ['#ff9999','#66b3ff','#99ff99','#ffcc99'] # 创建包含两个子图的图表 fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 6)) # 成交数据饼状图 ax1.pie(deal_sizes, labels=deal_labels, colors=colors, autopct='%1.1f%%', startangle=90) ax1.axis('equal') ax1.set_title('Priority Distribution (Deal)') # 非成交数据饼状图 ax2.pie(not_deal_sizes, labels=not_deal_labels, colors=colors, autopct='%1.1f%%', startangle=90) ax2.axis('equal') ax2.set_title('Priority Distribution (Not Deal)') # 整体标题 plt.suptitle('Priority Distribution Comparison', fontsize=16) # 保存和显示 plt.tight_layout() plt.savefig('priority_comparison.png', bbox_inches='tight') print("Chart saved to: priority_comparison.png") plt.show() def save_key2counter_excel(self): excel_path = "key2counter_comparison.xlsx" # 获取所有唯一的key all_keys = set(self.deal_key2counter.keys()) | set(self.not_deal_key2counter.keys()) with pd.ExcelWriter(excel_path, engine='openpyxl') as writer: for key in all_keys: # 准备成交数据 deal_counter = self.deal_key2counter.get(key, Counter()) deal_dict = dict(deal_counter) # 准备非成交数据 not_deal_counter = self.not_deal_key2counter.get(key, Counter()) not_deal_dict = dict(not_deal_counter) # 获取所有唯一的值 all_values = set(deal_dict.keys()) | set(not_deal_dict.keys()) # 创建数据框 data = [] for value in all_values: deal_count = deal_dict.get(value, 0) not_deal_count = not_deal_dict.get(value, 0) data.append({ 'value': value, 'deal_count': deal_count, 'not_deal_count': not_deal_count, 'total': deal_count + not_deal_count }) # 转换为DataFrame并排序 df = pd.DataFrame(data) df = df.sort_values('total', ascending=False) # 计算该字段的总样本数 total_samples = df['total'].sum() # 添加总样本数行 total_row = pd.DataFrame([{ 'value': 'Total Samples', 'deal_count': sum(deal_dict.values()), 'not_deal_count': sum(not_deal_dict.values()), 'total': total_samples }]) df = pd.concat([df, total_row], ignore_index=True) # 保存到Excel sheet_name = key[:31] # 限制sheet名长度 df.to_excel(writer, sheet_name=sheet_name, index=False) print(f"Excel saved to: {excel_path}")