#!/usr/bin/env python3 """全面质量检查脚本""" import openpyxl import sqlite3 print("\n" + "="*70) print("🔍 全面质量检查") print("="*70 + "\n") # ============================================================================ # 1. Excel 文件对比 # ============================================================================ print("1️⃣ EXCEL 文件结构和内容对比") print("-"*70 + "\n") wb0 = openpyxl.load_workbook('/Users/inkling/Desktop/dmp/家庭教育档案-天数.xlsx') wb1 = openpyxl.load_workbook('/Users/inkling/Desktop/dmp/清洗1.0.xlsx') wb2 = openpyxl.load_workbook('/Users/inkling/Desktop/dmp/清洗2.0.xlsx') ws0, ws1, ws2 = wb0.active, wb1.active, wb2.active print(f"📊 行列统计:") print(f" 原始(家庭教育档案-天数): {ws0.max_row} rows × {ws0.max_column} cols") print(f" 清洗1.0: {ws1.max_row} rows × {ws1.max_column} cols") print(f" 清洗2.0: {ws2.max_row} rows × {ws2.max_column} cols") # 列结构对比 print(f"\n📋 列结构对比:") print(f" {'列':<3} {'原始':<25} {'清洗1.0':<25} {'清洗2.0':<25} {'状态':<5}") print(f" {'-'*3} {'-'*25} {'-'*25} {'-'*25} {'-'*5}") for col in range(1, 17): h0 = str(ws0.cell(1, col).value or '')[:22] h1 = str(ws1.cell(1, col).value or '')[:22] h2 = str(ws2.cell(1, col).value or '')[:22] match = "✓" if h1 == h2 else "✗" print(f" {col:<3} {h0:<25} {h1:<25} {h2:<25} {match:<5}") # 数据完整性检查 print(f"\n✅ 数据完整性 (前100行检查):") def check_null_rate(ws, start_col=1, end_col=16, rows=100): results = {} for col in range(start_col, min(end_col + 1, ws.max_column + 1)): nulls = 0 total = 0 for row in range(2, min(rows + 2, ws.max_row + 1)): total += 1 if ws.cell(row, col).value is None: nulls += 1 if total > 0: results[col] = (nulls, total, 100 * nulls / total) return results nulls1 = check_null_rate(ws1) nulls2 = check_null_rate(ws2) print(f" 清洗1.0: ", end="") if all(rate == 0 for _, _, rate in nulls1.values()): print("✓ 完全无缺失值") else: for col, (n, t, rate) in sorted(nulls1.items()): if rate > 0: print(f"列{col}({rate:.0f}%) ", end="") print(f"\n 清洗2.0: ", end="") if all(rate == 0 for _, _, rate in nulls2.values()): print("✓ 完全无缺失值") else: for col, (n, t, rate) in sorted(nulls2.items()): if rate > 0: print(f"列{col}({rate:.0f}%) ", end="") print() # ============================================================================ # 2. 数据库内容检查 # ============================================================================ print(f"\n\n2️⃣ 数据库内容检查") print("-"*70 + "\n") conn = sqlite3.connect('/Users/inkling/Desktop/dmp/dmp_onion.db') cursor = conn.cursor() # 用户数据 cursor.execute('SELECT COUNT(*) FROM users') user_count = cursor.fetchone()[0] print(f"👥 用户数: {user_count}") # 标签数据 cursor.execute('SELECT COUNT(*) FROM tags') tag_count = cursor.fetchone()[0] print(f"🏷️ 标签数: {tag_count}") # 分类数据 cursor.execute('SELECT COUNT(*) FROM tag_categories') cat_count = cursor.fetchone()[0] print(f"📂 分类数: {cat_count}") # 关系数据 cursor.execute('SELECT COUNT(*) FROM user_tags') rel_count = cursor.fetchone()[0] print(f"🔗 关系数: {rel_count}") # 分类分布 print(f"\n📊 标签分类分布:") cursor.execute(''' SELECT tc.name, COUNT(DISTINCT t.id) as tag_count, COUNT(DISTINCT ut.user_id) as user_count, COUNT(ut.id) as rel_count FROM tag_categories tc LEFT JOIN tags t ON tc.id = t.category_id LEFT JOIN user_tags ut ON t.id = ut.tag_id GROUP BY tc.id ORDER BY tc.id ''') for row in cursor.fetchall(): name, tags, users, rels = row coverage = f"{(users*100/user_count):.0f}%" if users else "0%" print(f" • {name:<20} {tags:3d} tags, {users:4d} users ({coverage:>3s}), {rels:5d} relations") conn.close() print("\n" + "="*70)