from openai import OpenAI import json import os from dotenv import load_dotenv import re import pandas as pd from typing import Dict, Optional env_path = r"D:\damn\dialogue\.env" load_dotenv(dotenv_path=env_path) API_KEY = os.getenv("QWEN_API_KEY") MODEL = os.getenv("MODEL_NAME") TEMPERATURE = float(os.getenv("TEMPERATURE", 0.1)) client = OpenAI( api_key=API_KEY, base_url="https://dashscope.aliyuncs.com/compatible-mode/v1" ) CHAT_RECORD_DIR = "./data/txt" JSON_OUTPUT_DIR = "./data/json" FILE_ENCODING = "utf-8" EXTRACT_FIELDS = [ "监护人1姓名", "家庭角色", "文化程度", "职业", "年龄", "性格特征", "联系方式", "监护人2姓名", "家庭角色_2", "文化程度_2", "职业_2", "年龄_2", "性格特征_2", "联系方式_2", "孩子姓名", "性别", "孩子年龄", "年级", "孩子性格特征", "学习成绩", "家庭地址", "家庭基本情况", "家庭氛围", "亲子关系", "家长有无教育分歧", "是否经常否定孩子", "有无打骂教育", "孩子是否在父母身边长大", "还有谁参与孩子的养育", "孩子成长过程中有何重大影响事件", "既往病史", "孩子的优点", "孩子的缺点", "孩子目前情况的描述", "参加指导最想解决", "问卷评估" ] EMPTY_RESULT = {field: None for field in EXTRACT_FIELDS} def clean_llm_response(llm_text: str) -> str: """ 清洗大模型返回的内容,移除```json/```等标记、多余空格/换行,解决JSON解析失败 """ if not llm_text: return "" llm_text = llm_text.strip() llm_text = llm_text.replace("```json", "").replace("```", "").replace("JSON", "").strip() llm_text = llm_text.lstrip("{").rstrip("}") llm_text = "{" + llm_text + "}" return llm_text def extract_chat_info(chat_text: str, file_name: str) -> Dict: """从聊天记录/登记表中提取字段,返回字典结果""" prompt = f""" 你是专业的暖洋葱家庭教育档案信息提取分析师,仅从以下文本中提取指定字段信息,严格遵守以下规则,违反规则将判定为任务失败: 1. 提取字段:{', '.join(EXTRACT_FIELDS)},仅提取这些字段,不新增、不删减; 2. 内容规则:如实提取,无相关信息的字段统一填null,绝不猜测、编造内容; 3. 格式规则:仅返回**纯JSON字符串**,无任何多余内容,禁止加```json、```、注释、解释、换行; 4. 特殊提取要求: - 年龄:仅提取数字/带"岁"的数字,无则null; - 学习成绩:根据打勾标记提取优秀/良好/一般/差,无则null; - 性别:仅男/女,无则null; - 联系方式:仅纯11位手机号码,无则null; - 问卷评估:提取原始计分内容(题目+分数),用分号分隔; - 多信息用顿号分隔,无则null。 待提取文本: {chat_text[:8000]} """ try: response = client.chat.completions.create( model=MODEL, messages=[{"role": "user", "content": prompt}], temperature=TEMPERATURE, response_format={"type": "json_object"}, timeout=30 ) raw_resp = response.choices[0].message.content.strip() clean_resp = clean_llm_response(raw_resp) extract_json = json.loads(clean_resp) for field in EXTRACT_FIELDS: if field not in extract_json: extract_json[field] = None result = {field: extract_json[field] for field in EXTRACT_FIELDS} return result except json.JSONDecodeError: print(f"解析失败:{file_name} - 大模型返回非标准JSON,返回空结果") return EMPTY_RESULT def read_chat_txt(file_path: str) -> Optional[str]: """读取TXT,处理编码/空文件问题""" with open(file_path, "r", encoding=FILE_ENCODING) as f: content = f.read().strip() if not content or len(content) < 20: return None return content def batch_txt_to_json(): os.makedirs(JSON_OUTPUT_DIR, exist_ok=True) txt_files = [f for f in os.listdir(CHAT_RECORD_DIR) if f.lower().endswith(".txt")] success_count = 0 fail_count = 0 for file_name in txt_files: txt_file_path = os.path.join(CHAT_RECORD_DIR, file_name) chat_content = read_chat_txt(txt_file_path) if not chat_content: fail_count += 1 continue extract_result = extract_chat_info(chat_content, file_name) extract_result["文件名称"] = file_name json_file_name = os.path.splitext(file_name)[0] + ".json" json_file_path = os.path.join(JSON_OUTPUT_DIR, json_file_name) with open(json_file_path, "w", encoding="utf-8") as f: json.dump(extract_result, f, ensure_ascii=False, indent=4) success_count += 1 if __name__ == "__main__": batch_txt_to_json()