Update README and project cleanup

This commit is contained in:
inkling
2026-04-08 14:52:09 +08:00
commit fafd267288
71 changed files with 14865 additions and 0 deletions

552
server.js Normal file
View File

@@ -0,0 +1,552 @@
/**
* DMP API 服务器
*
* 📚 性能策略:
* 1. 内存缓存LRU- 高频查询不走 DB
* 2. 位图交叉计算 - 多标签 AND/OR 用 SQL INTERSECT/UNION
* 3. 预计算统计 - coverage 字段在写入时维护
* 4. 连接池 - 每个请求复用单个 DB 连接
*
* 📚 数据导入接口设计:
* POST /api/import/users - 批量导入用户
* POST /api/import/user-tags - 批量导入用户标签
* GET /api/import/batches - 查看导入历史
*/
const express = require('express');
const cors = require('cors');
const path = require('path');
const { getDb } = require('./db/init');
const app = express();
const PORT = 3456;
app.use(cors());
app.use(express.json({ limit: '50mb' }));
app.use(express.static(path.join(__dirname, 'public')));
// ─────────────────────────────────────────────
// 简易内存缓存TTL 60s
// ─────────────────────────────────────────────
const cache = new Map();
function cacheGet(key) {
const item = cache.get(key);
if (!item) return null;
if (Date.now() > item.expires) { cache.delete(key); return null; }
return item.value;
}
function cacheSet(key, value, ttlMs = 60_000) {
cache.set(key, { value, expires: Date.now() + ttlMs });
}
function cacheInvalidate(prefix) {
for (const k of cache.keys()) {
if (k.startsWith(prefix)) cache.delete(k);
}
}
// ─────────────────────────────────────────────
// 工具
// ─────────────────────────────────────────────
function asyncHandler(fn) {
return (req, res, next) => Promise.resolve(fn(req, res, next)).catch(next);
}
// ═════════════════════════════════════════════
// 1. 标签体系 API
// ═════════════════════════════════════════════
/** GET /api/tags — 返回所有分类+标签(含覆盖数),带缓存 */
app.get('/api/tags', asyncHandler(async (req, res) => {
const theme = req.query.theme || 'onion';
const cacheKey = `tags:${theme}:all`;
const hit = cacheGet(cacheKey);
if (hit) return res.json(hit);
const db = getDb(theme);
try {
const categories = db.prepare('SELECT * FROM tag_categories ORDER BY sort_order').all();
const tags = db.prepare(`
SELECT t.*, tc.key as cat_key, tc.color as cat_color
FROM tags t JOIN tag_categories tc ON t.category_id = tc.id
ORDER BY t.category_id, t.sort_order
`).all();
const result = categories.map(cat => ({
...cat,
tags: tags
.filter(t => t.category_id === cat.id)
.map(t => ({
...t,
source: (cat.key === 'core_problem' && /(推断)$/.test(t.name)) ? 'inferred' : 'original'
}))
}));
const totalUsers = db.prepare('SELECT COUNT(*) as n FROM users').get().n;
cacheSet(cacheKey, { categories: result, totalUsers }, 300_000); // 5分钟
res.json({ categories: result, totalUsers });
} finally {
db.close();
}
}));
// ═════════════════════════════════════════════
// 2. 实时交叉计算 API ← 核心功能
// ═════════════════════════════════════════════
/**
* POST /api/compute
* Body: {
* selected: { tagId: number, mode: 'include'|'exclude' }[],
* logic: 'AND' | 'OR'
* }
* Returns: { count, rate, breakdown: { tagId, count, rate }[] }
*
* 📚 交叉计算算法:
* AND必须同时拥有→ INTERSECT
* OR拥有任一即可→ UNION / IN
* EXCLUDE排除 → EXCEPT / NOT EXISTS
* SQLite 的 INTERSECT 底层就是对有序集合做 merge-joinO(n log n)
*/
app.post('/api/compute', asyncHandler(async (req, res) => {
const { selected = [] } = req.body;
const theme = req.query.theme || 'onion';
// 缓存 key
const cacheKey = `compute:${theme}:${selected.map(s => `${s.tagId}:${s.mode}`).sort().join(',')}`;
const hit = cacheGet(cacheKey);
if (hit) return res.json(hit);
const db = getDb(theme);
try {
const totalUsers = db.prepare('SELECT COUNT(*) as n FROM users').get().n;
// 分离 include / exclude
const includes = selected.filter(s => s.mode !== 'exclude');
const excludes = selected.filter(s => s.mode === 'exclude');
// 构建分类感知的 SQL
let baseSql;
const baseParams = [];
if (includes.length === 0) {
baseSql = 'SELECT id as user_id FROM users';
} else {
// 获取每个标签的分类信息
const categoryMap = {};
for (const inc of includes) {
const tagInfo = db.prepare(`
SELECT t.id, t.category_id FROM tags t WHERE t.id = ?
`).get(inc.tagId);
if (tagInfo) {
if (!categoryMap[tagInfo.category_id]) {
categoryMap[tagInfo.category_id] = [];
}
categoryMap[tagInfo.category_id].push(inc.tagId);
}
}
// 为每个分类生成 SQL 子句
// 同一分类OR 逻辑IN
// 不同分类AND 逻辑INTERSECT
const categoryParts = [];
for (const catId in categoryMap) {
const tagIds = categoryMap[catId];
if (tagIds.length === 1) {
// 单个标签:直接用 tagId
baseParams.push(tagIds[0]);
categoryParts.push(`SELECT user_id FROM user_tags WHERE tag_id = ?`);
} else {
// 多个标签:用 INOR
baseParams.push(...tagIds);
const placeholders = tagIds.map(() => '?').join(',');
categoryParts.push(`SELECT user_id FROM user_tags WHERE tag_id IN (${placeholders})`);
}
}
// 用 INTERSECT 链接各个分类的结果AND
baseSql = categoryParts.join(' INTERSECT ');
}
// 叠加 EXCLUDE
let finalSql = baseSql;
const finalParams = [...baseParams];
for (const ex of excludes) {
finalSql = `${finalSql} EXCEPT SELECT user_id FROM user_tags WHERE tag_id = ?`;
finalParams.push(ex.tagId);
}
// 计算主结果
const countSql = `SELECT COUNT(*) as n FROM (${finalSql})`;
const mainCount = db.prepare(countSql).get(...finalParams).n;
// 计算每个已选标签的细分(本次结果集中拥有该标签的人数)
let breakdown = [];
if (selected.length > 0 && mainCount > 0) {
// 对每个已选标签,计算它在结果集内的覆盖
const allTagIds = selected.map(s => s.tagId);
const breakdownStmt = db.prepare(`
SELECT tag_id, COUNT(*) as n
FROM user_tags
WHERE user_id IN (${finalSql})
AND tag_id IN (${allTagIds.map(() => '?').join(',')})
GROUP BY tag_id
`);
const rows = breakdownStmt.all(...finalParams, ...allTagIds);
breakdown = rows.map(r => ({
tagId: r.tag_id,
count: r.n,
rate: mainCount > 0 ? +(r.n / mainCount * 100).toFixed(1) : 0
}));
}
const result = {
count: mainCount,
rate: +(mainCount / totalUsers * 100).toFixed(2),
totalUsers,
breakdown
};
cacheSet(cacheKey, result, 30_000); // 30s 缓存
res.json(result);
} finally {
db.close();
}
}));
/**
* POST /api/compute/cross
* 计算两个标签在当前结果集内的交叉分布(用于热力图/桑基图)
* Body: { selected, logic, crossTagIds: number[] }
*/
app.post('/api/compute/cross', asyncHandler(async (req, res) => {
const { selected = [], crossTagIds = [] } = req.body;
const theme = req.query.theme || 'onion';
if (crossTagIds.length === 0) return res.json({ matrix: [] });
const db = getDb(theme);
try {
const includes = selected.filter(s => s.mode !== 'exclude');
const excludes = selected.filter(s => s.mode === 'exclude');
let baseSql, baseParams = [];
if (includes.length === 0) {
baseSql = 'SELECT id as user_id FROM users';
} else {
const parts = includes.map(s => { baseParams.push(s.tagId); return `SELECT user_id FROM user_tags WHERE tag_id = ?`; });
baseSql = parts.join(' INTERSECT ');
}
let finalSql = baseSql;
const finalParams = [...baseParams];
for (const ex of excludes) {
finalSql += ` EXCEPT SELECT user_id FROM user_tags WHERE tag_id = ?`;
finalParams.push(ex.tagId);
}
const baseCount = db.prepare(`SELECT COUNT(*) as n FROM (${finalSql})`).get(...finalParams).n;
// 对每个交叉标签计算覆盖
const matrix = [];
for (const tagId of crossTagIds) {
const n = db.prepare(`
SELECT COUNT(*) as n FROM (${finalSql})
WHERE user_id IN (SELECT user_id FROM user_tags WHERE tag_id = ?)
`).get(...finalParams, tagId).n;
matrix.push({ tagId, count: n, rate: baseCount > 0 ? +(n / baseCount * 100).toFixed(1) : 0 });
}
res.json({ baseCount, matrix });
} finally {
db.close();
}
}));
// ═════════════════════════════════════════════
// 3. 数据导入 API ← 接入点
// ═════════════════════════════════════════════
/**
* POST /api/import/users
* 批量导入/更新用户基础数据
* Body: { users: [{ uid, name, email, extra_json? }] }
*
* 📚 设计原则:
* - 使用 INSERT OR REPLACE 做 upsert
* - 分批提交事务每1000条一批避免锁超时
* - 返回导入批次 ID供后续追踪
*/
app.post('/api/import/users', asyncHandler(async (req, res) => {
const { users = [], source = 'api' } = req.body;
const theme = req.query.theme || 'onion';
if (!Array.isArray(users) || users.length === 0) {
return res.status(400).json({ error: 'invalid users array' });
}
const db = getDb(theme);
try {
const batchRes = db.prepare(
'INSERT INTO import_batches (source, record_count, status) VALUES (?, ?, ?)'
).run(source, users.length, 'running');
const batchId = batchRes.lastInsertRowid;
const stmt = db.prepare(
'INSERT OR REPLACE INTO users (uid, name, email, extra_json) VALUES (?, ?, ?, ?)'
);
let imported = 0;
const BATCH = 1000;
for (let i = 0; i < users.length; i += BATCH) {
const chunk = users.slice(i, i + BATCH);
const tx = db.transaction(() => {
for (const u of chunk) {
stmt.run(u.uid, u.name || '', u.email || '', JSON.stringify(u.extra_json || {}));
imported++;
}
});
tx();
}
db.prepare(
"UPDATE import_batches SET status='done', record_count=?, finished_at=datetime('now') WHERE id=?"
).run(imported, batchId);
cacheInvalidate(`tags:${theme}`);
res.json({ batchId, imported, total: users.length });
} catch (err) {
db.prepare("UPDATE import_batches SET status='error', error_message=? WHERE id=?")
.run(err.message, -1);
throw err;
} finally {
db.close();
}
}));
/**
* POST /api/import/user-tags
* 批量导入用户标签关联
* Body: { assignments: [{ uid, tagKey }], mode: 'append'|'replace' }
*
* mode='replace': 先清除用户现有标签,再写入(完整刷新)
* mode='append': 仅追加,不删除旧标签(增量更新)
*/
app.post('/api/import/user-tags', asyncHandler(async (req, res) => {
const { assignments = [], source = 'api', mode = 'append' } = req.body;
const theme = req.query.theme || 'onion';
if (!Array.isArray(assignments) || assignments.length === 0) {
return res.status(400).json({ error: 'invalid assignments array' });
}
const db = getDb(theme);
try {
// 构建查询缓存
const userStmt = db.prepare('SELECT id FROM users WHERE uid = ?');
const tagStmt = db.prepare('SELECT id FROM tags WHERE key = ?');
const insertStmt = db.prepare('INSERT OR IGNORE INTO user_tags (user_id, tag_id) VALUES (?, ?)');
const deleteStmt = db.prepare('DELETE FROM user_tags WHERE user_id = ?');
const batchRes = db.prepare(
'INSERT INTO import_batches (source, record_count, status) VALUES (?, ?, ?)'
).run(source, assignments.length, 'running');
const batchId = batchRes.lastInsertRowid;
let imported = 0;
let skipped = 0;
// 按 uid 分组,支持 replace 模式
const grouped = {};
for (const a of assignments) {
if (!grouped[a.uid]) grouped[a.uid] = [];
grouped[a.uid].push(a.tagKey);
}
const tx = db.transaction(() => {
for (const [uid, tagKeys] of Object.entries(grouped)) {
const user = userStmt.get(uid);
if (!user) { skipped += tagKeys.length; continue; }
if (mode === 'replace') deleteStmt.run(user.id);
for (const tagKey of tagKeys) {
const tag = tagStmt.get(tagKey);
if (!tag) { skipped++; continue; }
insertStmt.run(user.id, tag.id);
imported++;
}
}
});
tx();
// 更新覆盖统计
const totalUsers = db.prepare('SELECT COUNT(*) as n FROM users').get().n;
db.exec(`
UPDATE tags SET
coverage = (SELECT COUNT(*) FROM user_tags WHERE tag_id = tags.id),
coverage_rate = ROUND((SELECT COUNT(*) FROM user_tags WHERE tag_id = tags.id) * 100.0 / ${totalUsers}, 2)
`);
db.prepare("UPDATE import_batches SET status='done', finished_at=datetime('now') WHERE id=?").run(batchId);
cacheInvalidate(`tags:${theme}`);
cacheInvalidate(`compute:${theme}`);
res.json({ batchId, imported, skipped });
} catch (err) {
throw err;
} finally {
db.close();
}
}));
/** GET /api/import/batches — 查看导入历史 */
app.get('/api/import/batches', asyncHandler(async (req, res) => {
const theme = req.query.theme || 'onion';
const db = getDb(theme);
try {
const batches = db.prepare(
'SELECT * FROM import_batches ORDER BY id DESC LIMIT 50'
).all();
res.json(batches);
} finally {
db.close();
}
}));
/** DELETE /api/import/reset — 清空所有用户数据(仅开发用) */
app.delete('/api/import/reset', asyncHandler(async (req, res) => {
const theme = req.query.theme || 'onion';
const db = getDb(theme);
try {
db.exec('DELETE FROM user_tags; DELETE FROM users;');
db.exec('UPDATE tags SET coverage=0, coverage_rate=0');
cacheInvalidate(`tags:${theme}`);
cacheInvalidate(`compute:${theme}`);
res.json({ ok: true });
} finally {
db.close();
}
}));
// ═════════════════════════════════════════════
// 4. 用户明细 API
// ═════════════════════════════════════════════
/**
* POST /api/users/sample
* 获取当前圈选结果的用户样本最多100条
*/
app.post('/api/users/sample', asyncHandler(async (req, res) => {
const { selected = [], limit = 50 } = req.body;
const theme = req.query.theme || 'onion';
const db = getDb(theme);
try {
const includes = selected.filter(s => s.mode !== 'exclude');
const excludes = selected.filter(s => s.mode === 'exclude');
let baseSql, baseParams = [];
if (includes.length === 0) {
baseSql = 'SELECT id as user_id FROM users';
} else {
const parts = includes.map(s => { baseParams.push(s.tagId); return `SELECT user_id FROM user_tags WHERE tag_id = ?`; });
baseSql = parts.join(' INTERSECT ');
}
let finalSql = baseSql;
const finalParams = [...baseParams];
for (const ex of excludes) {
finalSql += ` EXCEPT SELECT user_id FROM user_tags WHERE tag_id = ?`;
finalParams.push(ex.tagId);
}
const users = db.prepare(`
SELECT u.uid, u.name, u.email, u.created_at, u.extra_json
FROM users u
WHERE u.id IN (${finalSql})
ORDER BY RANDOM()
LIMIT ?
`).all(...finalParams, Math.min(limit, 100));
// 解析 extra_json
const enrichedUsers = users.map(u => {
try {
const extra = JSON.parse(u.extra_json || '{}');
return { ...u, ...extra, extra_json: undefined };
} catch {
return u;
}
});
res.json({ users: enrichedUsers });
} finally {
db.close();
}
}));
/**
* GET /api/duration-stats
* 获取"指导周期"相关的统计信息
*/
app.get('/api/duration-stats', asyncHandler(async (req, res) => {
const theme = req.query.theme || 'onion';
const cacheKey = `duration-stats:${theme}`;
const hit = cacheGet(cacheKey);
if (hit) return res.json(hit);
const db = getDb(theme);
try {
const totalUsers = db.prepare('SELECT COUNT(*) as n FROM users').get().n;
// 获取各天数标签的统计
const durationStats = db.prepare(`
SELECT
t.id,
t.key,
t.name,
COUNT(ut.user_id) as count,
ROUND(COUNT(ut.user_id) * 100.0 / ?, 2) as rate
FROM tags t
LEFT JOIN user_tags ut ON t.id = ut.tag_id
WHERE t.key LIKE 'duration_%'
GROUP BY t.id, t.key, t.name
ORDER BY t.sort_order
`).all(totalUsers);
const result = {
totalUsers,
durationBreakdown: durationStats.map(s => ({
id: s.id,
key: s.key,
name: s.name,
count: s.count || 0,
rate: s.rate || 0
}))
};
cacheSet(cacheKey, result, 300_000); // 5分钟
res.json(result);
} finally {
db.close();
}
}));
// ═════════════════════════════════════════════
// 5. 错误处理
// ═════════════════════════════════════════════
app.use((err, req, res, next) => {
console.error(err);
res.status(500).json({ error: err.message });
});
// SPA fallback
app.get('/{*path}', (req, res) => {
res.sendFile(path.join(__dirname, 'public', 'index.html'));
});
app.listen(PORT, () => {
console.log(`\n🚀 DMP 服务启动: http://localhost:${PORT}`);
console.log(`📡 导入 API: POST /api/import/users`);
console.log(`📡 标签 API: POST /api/import/user-tags`);
console.log(`📡 计算 API: POST /api/compute\n`);
});