154 lines
5.8 KiB
Python
154 lines
5.8 KiB
Python
from data_process import build_dataloader
|
|
from model import TransClassifier
|
|
|
|
import torch
|
|
import torch.nn as nn
|
|
import torch.nn.functional as F
|
|
from transformers import AutoTokenizer
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
import os
|
|
import json
|
|
from datetime import datetime
|
|
import gc
|
|
from tqdm import tqdm
|
|
import warnings
|
|
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
|
|
warnings.filterwarnings("ignore")
|
|
|
|
def test(backbone_dir, deal_folder, not_deal_folder, batch_size, ckpt_path="best_ckpt.pth", device="cuda"):
|
|
"""
|
|
测试模型在测试集上的表现
|
|
|
|
Args:
|
|
backbone_dir: 预训练模型目录
|
|
deal_folder: 成交数据文件夹
|
|
not_deal_folder: 非成交数据文件夹
|
|
batch_size: 批量大小
|
|
ckpt_path: 模型 checkpoint 路径
|
|
device: 运行设备
|
|
"""
|
|
# 加载测试数据
|
|
data_dict = build_dataloader(deal_folder, not_deal_folder, batch_size)
|
|
test_loader = data_dict["test"]
|
|
print(f"Test data loaded successfully. Test samples: {len(test_loader.dataset)}")
|
|
|
|
# 加载 tokenizer 和模型
|
|
tokenizer = AutoTokenizer.from_pretrained(backbone_dir)
|
|
model = TransClassifier(backbone_dir, device)
|
|
model.to(device)
|
|
|
|
# 加载训练好的模型权重
|
|
if os.path.exists(ckpt_path):
|
|
model.load_state_dict(torch.load(ckpt_path, map_location=device))
|
|
print(f"Model loaded from {ckpt_path}")
|
|
else:
|
|
print(f"Warning: {ckpt_path} not found. Using untrained model.")
|
|
|
|
# 测试模型
|
|
model.eval()
|
|
all_ids = []
|
|
all_preds = []
|
|
all_labels = []
|
|
test_loss = 0.0
|
|
|
|
loss_func = nn.CrossEntropyLoss()
|
|
|
|
pbar = tqdm(test_loader, desc="Testing")
|
|
with torch.inference_mode():
|
|
for batch_idx, (ids, texts, labels) in enumerate(pbar):
|
|
all_ids.extend(ids)
|
|
labels = labels.to(device)
|
|
|
|
# 处理输入数据
|
|
texts = tokenizer.apply_chat_template(texts, tokenize=False, add_generation_prompt=True, enable_thinking=False)
|
|
inputs = tokenizer(texts, padding=True, truncation=True, max_length=2048, return_tensors="pt").to(device)
|
|
|
|
with torch.amp.autocast(device_type="cuda", dtype=torch.bfloat16):
|
|
outputs = model(inputs)
|
|
loss = loss_func(outputs, labels)
|
|
|
|
test_loss += loss.item()
|
|
|
|
# 计算预测结果
|
|
preds = torch.argmax(outputs, dim=1).cpu().numpy()
|
|
all_preds.extend(preds)
|
|
all_labels.extend(labels.cpu().numpy())
|
|
|
|
# 清理内存
|
|
del texts, labels, outputs, loss
|
|
torch.cuda.empty_cache()
|
|
gc.collect()
|
|
|
|
# 计算评估指标
|
|
avg_loss = test_loss / len(test_loader)
|
|
accuracy = accuracy_score(all_labels, all_preds)
|
|
precision = precision_score(all_labels, all_preds, average="weighted")
|
|
recall = recall_score(all_labels, all_preds, average="weighted")
|
|
f1 = f1_score(all_labels, all_preds, average="weighted")
|
|
cm = confusion_matrix(all_labels, all_preds)
|
|
|
|
# 打印评估结果
|
|
print("\n=== Test Results ===")
|
|
print(f"Average Loss: {avg_loss:.4f}")
|
|
print(f"Accuracy: {accuracy:.4f}")
|
|
print(f"Precision: {precision:.4f}")
|
|
print(f"Recall: {recall:.4f}")
|
|
print(f"F1 Score: {f1:.4f}")
|
|
print("\nConfusion Matrix:")
|
|
print(cm)
|
|
print("\n=== Class-wise Metrics ===")
|
|
print("Class 0 (Not Deal):")
|
|
print(f" Precision: {precision_score(all_labels, all_preds, average=None)[0]:.4f}")
|
|
print(f" Recall: {recall_score(all_labels, all_preds, average=None)[0]:.4f}")
|
|
print(f" F1 Score: {f1_score(all_labels, all_preds, average=None)[0]:.4f}")
|
|
print("\nClass 1 (Deal):")
|
|
print(f" Precision: {precision_score(all_labels, all_preds, average=None)[1]:.4f}")
|
|
print(f" Recall: {recall_score(all_labels, all_preds, average=None)[1]:.4f}")
|
|
print(f" F1 Score: {f1_score(all_labels, all_preds, average=None)[1]:.4f}")
|
|
|
|
# 保存测试结果
|
|
test_results = {
|
|
"average_loss": avg_loss,
|
|
"accuracy": accuracy,
|
|
"precision": precision,
|
|
"recall": recall,
|
|
"f1_score": f1,
|
|
"confusion_matrix": cm.tolist(),
|
|
"class_0_precision": precision_score(all_labels, all_preds, average=None)[0],
|
|
"class_0_recall": recall_score(all_labels, all_preds, average=None)[0],
|
|
"class_0_f1": f1_score(all_labels, all_preds, average=None)[0],
|
|
"class_1_precision": precision_score(all_labels, all_preds, average=None)[1],
|
|
"class_1_recall": recall_score(all_labels, all_preds, average=None)[1],
|
|
"class_1_f1": f1_score(all_labels, all_preds, average=None)[1],
|
|
"test_samples": len(all_labels),
|
|
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
}
|
|
|
|
# 保存预测结果
|
|
pred_results = {
|
|
"ids": all_ids,
|
|
"predictions": all_preds,
|
|
"true_labels": all_labels
|
|
}
|
|
pred_df = pd.DataFrame(pred_results)
|
|
pred_df.to_csv("test_predictions.csv", index=False, encoding="utf-8")
|
|
|
|
# 保存为 JSON 文件
|
|
with open("test_results.json", "w", encoding="utf-8") as f:
|
|
json.dump(test_results, f, ensure_ascii=False, indent=2)
|
|
print("\nTest results saved to test_results.json")
|
|
return test_results
|
|
|
|
if __name__ == "__main__":
|
|
# 配置参数
|
|
backbone_dir = r"C:\Users\GA\Desktop\models\Qwen3-1.7B"
|
|
deal_folder = "deal"
|
|
not_deal_folder = "not_deal"
|
|
batch_size = 8
|
|
ckpt_path = "best_ckpt.pth"
|
|
device = "cuda"
|
|
|
|
# 运行测试
|
|
test(backbone_dir, deal_folder, not_deal_folder, batch_size, ckpt_path, device) |