Files
onion-dmp/server.js
2026-04-08 14:52:09 +08:00

553 lines
18 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* DMP API 服务器
*
* 📚 性能策略:
* 1. 内存缓存LRU- 高频查询不走 DB
* 2. 位图交叉计算 - 多标签 AND/OR 用 SQL INTERSECT/UNION
* 3. 预计算统计 - coverage 字段在写入时维护
* 4. 连接池 - 每个请求复用单个 DB 连接
*
* 📚 数据导入接口设计:
* POST /api/import/users - 批量导入用户
* POST /api/import/user-tags - 批量导入用户标签
* GET /api/import/batches - 查看导入历史
*/
const express = require('express');
const cors = require('cors');
const path = require('path');
const { getDb } = require('./db/init');
const app = express();
const PORT = 3456;
app.use(cors());
app.use(express.json({ limit: '50mb' }));
app.use(express.static(path.join(__dirname, 'public')));
// ─────────────────────────────────────────────
// 简易内存缓存TTL 60s
// ─────────────────────────────────────────────
const cache = new Map();
function cacheGet(key) {
const item = cache.get(key);
if (!item) return null;
if (Date.now() > item.expires) { cache.delete(key); return null; }
return item.value;
}
function cacheSet(key, value, ttlMs = 60_000) {
cache.set(key, { value, expires: Date.now() + ttlMs });
}
function cacheInvalidate(prefix) {
for (const k of cache.keys()) {
if (k.startsWith(prefix)) cache.delete(k);
}
}
// ─────────────────────────────────────────────
// 工具
// ─────────────────────────────────────────────
function asyncHandler(fn) {
return (req, res, next) => Promise.resolve(fn(req, res, next)).catch(next);
}
// ═════════════════════════════════════════════
// 1. 标签体系 API
// ═════════════════════════════════════════════
/** GET /api/tags — 返回所有分类+标签(含覆盖数),带缓存 */
app.get('/api/tags', asyncHandler(async (req, res) => {
const theme = req.query.theme || 'onion';
const cacheKey = `tags:${theme}:all`;
const hit = cacheGet(cacheKey);
if (hit) return res.json(hit);
const db = getDb(theme);
try {
const categories = db.prepare('SELECT * FROM tag_categories ORDER BY sort_order').all();
const tags = db.prepare(`
SELECT t.*, tc.key as cat_key, tc.color as cat_color
FROM tags t JOIN tag_categories tc ON t.category_id = tc.id
ORDER BY t.category_id, t.sort_order
`).all();
const result = categories.map(cat => ({
...cat,
tags: tags
.filter(t => t.category_id === cat.id)
.map(t => ({
...t,
source: (cat.key === 'core_problem' && /(推断)$/.test(t.name)) ? 'inferred' : 'original'
}))
}));
const totalUsers = db.prepare('SELECT COUNT(*) as n FROM users').get().n;
cacheSet(cacheKey, { categories: result, totalUsers }, 300_000); // 5分钟
res.json({ categories: result, totalUsers });
} finally {
db.close();
}
}));
// ═════════════════════════════════════════════
// 2. 实时交叉计算 API ← 核心功能
// ═════════════════════════════════════════════
/**
* POST /api/compute
* Body: {
* selected: { tagId: number, mode: 'include'|'exclude' }[],
* logic: 'AND' | 'OR'
* }
* Returns: { count, rate, breakdown: { tagId, count, rate }[] }
*
* 📚 交叉计算算法:
* AND必须同时拥有→ INTERSECT
* OR拥有任一即可→ UNION / IN
* EXCLUDE排除 → EXCEPT / NOT EXISTS
* SQLite 的 INTERSECT 底层就是对有序集合做 merge-joinO(n log n)
*/
app.post('/api/compute', asyncHandler(async (req, res) => {
const { selected = [] } = req.body;
const theme = req.query.theme || 'onion';
// 缓存 key
const cacheKey = `compute:${theme}:${selected.map(s => `${s.tagId}:${s.mode}`).sort().join(',')}`;
const hit = cacheGet(cacheKey);
if (hit) return res.json(hit);
const db = getDb(theme);
try {
const totalUsers = db.prepare('SELECT COUNT(*) as n FROM users').get().n;
// 分离 include / exclude
const includes = selected.filter(s => s.mode !== 'exclude');
const excludes = selected.filter(s => s.mode === 'exclude');
// 构建分类感知的 SQL
let baseSql;
const baseParams = [];
if (includes.length === 0) {
baseSql = 'SELECT id as user_id FROM users';
} else {
// 获取每个标签的分类信息
const categoryMap = {};
for (const inc of includes) {
const tagInfo = db.prepare(`
SELECT t.id, t.category_id FROM tags t WHERE t.id = ?
`).get(inc.tagId);
if (tagInfo) {
if (!categoryMap[tagInfo.category_id]) {
categoryMap[tagInfo.category_id] = [];
}
categoryMap[tagInfo.category_id].push(inc.tagId);
}
}
// 为每个分类生成 SQL 子句
// 同一分类OR 逻辑IN
// 不同分类AND 逻辑INTERSECT
const categoryParts = [];
for (const catId in categoryMap) {
const tagIds = categoryMap[catId];
if (tagIds.length === 1) {
// 单个标签:直接用 tagId
baseParams.push(tagIds[0]);
categoryParts.push(`SELECT user_id FROM user_tags WHERE tag_id = ?`);
} else {
// 多个标签:用 INOR
baseParams.push(...tagIds);
const placeholders = tagIds.map(() => '?').join(',');
categoryParts.push(`SELECT user_id FROM user_tags WHERE tag_id IN (${placeholders})`);
}
}
// 用 INTERSECT 链接各个分类的结果AND
baseSql = categoryParts.join(' INTERSECT ');
}
// 叠加 EXCLUDE
let finalSql = baseSql;
const finalParams = [...baseParams];
for (const ex of excludes) {
finalSql = `${finalSql} EXCEPT SELECT user_id FROM user_tags WHERE tag_id = ?`;
finalParams.push(ex.tagId);
}
// 计算主结果
const countSql = `SELECT COUNT(*) as n FROM (${finalSql})`;
const mainCount = db.prepare(countSql).get(...finalParams).n;
// 计算每个已选标签的细分(本次结果集中拥有该标签的人数)
let breakdown = [];
if (selected.length > 0 && mainCount > 0) {
// 对每个已选标签,计算它在结果集内的覆盖
const allTagIds = selected.map(s => s.tagId);
const breakdownStmt = db.prepare(`
SELECT tag_id, COUNT(*) as n
FROM user_tags
WHERE user_id IN (${finalSql})
AND tag_id IN (${allTagIds.map(() => '?').join(',')})
GROUP BY tag_id
`);
const rows = breakdownStmt.all(...finalParams, ...allTagIds);
breakdown = rows.map(r => ({
tagId: r.tag_id,
count: r.n,
rate: mainCount > 0 ? +(r.n / mainCount * 100).toFixed(1) : 0
}));
}
const result = {
count: mainCount,
rate: +(mainCount / totalUsers * 100).toFixed(2),
totalUsers,
breakdown
};
cacheSet(cacheKey, result, 30_000); // 30s 缓存
res.json(result);
} finally {
db.close();
}
}));
/**
* POST /api/compute/cross
* 计算两个标签在当前结果集内的交叉分布(用于热力图/桑基图)
* Body: { selected, logic, crossTagIds: number[] }
*/
app.post('/api/compute/cross', asyncHandler(async (req, res) => {
const { selected = [], crossTagIds = [] } = req.body;
const theme = req.query.theme || 'onion';
if (crossTagIds.length === 0) return res.json({ matrix: [] });
const db = getDb(theme);
try {
const includes = selected.filter(s => s.mode !== 'exclude');
const excludes = selected.filter(s => s.mode === 'exclude');
let baseSql, baseParams = [];
if (includes.length === 0) {
baseSql = 'SELECT id as user_id FROM users';
} else {
const parts = includes.map(s => { baseParams.push(s.tagId); return `SELECT user_id FROM user_tags WHERE tag_id = ?`; });
baseSql = parts.join(' INTERSECT ');
}
let finalSql = baseSql;
const finalParams = [...baseParams];
for (const ex of excludes) {
finalSql += ` EXCEPT SELECT user_id FROM user_tags WHERE tag_id = ?`;
finalParams.push(ex.tagId);
}
const baseCount = db.prepare(`SELECT COUNT(*) as n FROM (${finalSql})`).get(...finalParams).n;
// 对每个交叉标签计算覆盖
const matrix = [];
for (const tagId of crossTagIds) {
const n = db.prepare(`
SELECT COUNT(*) as n FROM (${finalSql})
WHERE user_id IN (SELECT user_id FROM user_tags WHERE tag_id = ?)
`).get(...finalParams, tagId).n;
matrix.push({ tagId, count: n, rate: baseCount > 0 ? +(n / baseCount * 100).toFixed(1) : 0 });
}
res.json({ baseCount, matrix });
} finally {
db.close();
}
}));
// ═════════════════════════════════════════════
// 3. 数据导入 API ← 接入点
// ═════════════════════════════════════════════
/**
* POST /api/import/users
* 批量导入/更新用户基础数据
* Body: { users: [{ uid, name, email, extra_json? }] }
*
* 📚 设计原则:
* - 使用 INSERT OR REPLACE 做 upsert
* - 分批提交事务每1000条一批避免锁超时
* - 返回导入批次 ID供后续追踪
*/
app.post('/api/import/users', asyncHandler(async (req, res) => {
const { users = [], source = 'api' } = req.body;
const theme = req.query.theme || 'onion';
if (!Array.isArray(users) || users.length === 0) {
return res.status(400).json({ error: 'invalid users array' });
}
const db = getDb(theme);
try {
const batchRes = db.prepare(
'INSERT INTO import_batches (source, record_count, status) VALUES (?, ?, ?)'
).run(source, users.length, 'running');
const batchId = batchRes.lastInsertRowid;
const stmt = db.prepare(
'INSERT OR REPLACE INTO users (uid, name, email, extra_json) VALUES (?, ?, ?, ?)'
);
let imported = 0;
const BATCH = 1000;
for (let i = 0; i < users.length; i += BATCH) {
const chunk = users.slice(i, i + BATCH);
const tx = db.transaction(() => {
for (const u of chunk) {
stmt.run(u.uid, u.name || '', u.email || '', JSON.stringify(u.extra_json || {}));
imported++;
}
});
tx();
}
db.prepare(
"UPDATE import_batches SET status='done', record_count=?, finished_at=datetime('now') WHERE id=?"
).run(imported, batchId);
cacheInvalidate(`tags:${theme}`);
res.json({ batchId, imported, total: users.length });
} catch (err) {
db.prepare("UPDATE import_batches SET status='error', error_message=? WHERE id=?")
.run(err.message, -1);
throw err;
} finally {
db.close();
}
}));
/**
* POST /api/import/user-tags
* 批量导入用户标签关联
* Body: { assignments: [{ uid, tagKey }], mode: 'append'|'replace' }
*
* mode='replace': 先清除用户现有标签,再写入(完整刷新)
* mode='append': 仅追加,不删除旧标签(增量更新)
*/
app.post('/api/import/user-tags', asyncHandler(async (req, res) => {
const { assignments = [], source = 'api', mode = 'append' } = req.body;
const theme = req.query.theme || 'onion';
if (!Array.isArray(assignments) || assignments.length === 0) {
return res.status(400).json({ error: 'invalid assignments array' });
}
const db = getDb(theme);
try {
// 构建查询缓存
const userStmt = db.prepare('SELECT id FROM users WHERE uid = ?');
const tagStmt = db.prepare('SELECT id FROM tags WHERE key = ?');
const insertStmt = db.prepare('INSERT OR IGNORE INTO user_tags (user_id, tag_id) VALUES (?, ?)');
const deleteStmt = db.prepare('DELETE FROM user_tags WHERE user_id = ?');
const batchRes = db.prepare(
'INSERT INTO import_batches (source, record_count, status) VALUES (?, ?, ?)'
).run(source, assignments.length, 'running');
const batchId = batchRes.lastInsertRowid;
let imported = 0;
let skipped = 0;
// 按 uid 分组,支持 replace 模式
const grouped = {};
for (const a of assignments) {
if (!grouped[a.uid]) grouped[a.uid] = [];
grouped[a.uid].push(a.tagKey);
}
const tx = db.transaction(() => {
for (const [uid, tagKeys] of Object.entries(grouped)) {
const user = userStmt.get(uid);
if (!user) { skipped += tagKeys.length; continue; }
if (mode === 'replace') deleteStmt.run(user.id);
for (const tagKey of tagKeys) {
const tag = tagStmt.get(tagKey);
if (!tag) { skipped++; continue; }
insertStmt.run(user.id, tag.id);
imported++;
}
}
});
tx();
// 更新覆盖统计
const totalUsers = db.prepare('SELECT COUNT(*) as n FROM users').get().n;
db.exec(`
UPDATE tags SET
coverage = (SELECT COUNT(*) FROM user_tags WHERE tag_id = tags.id),
coverage_rate = ROUND((SELECT COUNT(*) FROM user_tags WHERE tag_id = tags.id) * 100.0 / ${totalUsers}, 2)
`);
db.prepare("UPDATE import_batches SET status='done', finished_at=datetime('now') WHERE id=?").run(batchId);
cacheInvalidate(`tags:${theme}`);
cacheInvalidate(`compute:${theme}`);
res.json({ batchId, imported, skipped });
} catch (err) {
throw err;
} finally {
db.close();
}
}));
/** GET /api/import/batches — 查看导入历史 */
app.get('/api/import/batches', asyncHandler(async (req, res) => {
const theme = req.query.theme || 'onion';
const db = getDb(theme);
try {
const batches = db.prepare(
'SELECT * FROM import_batches ORDER BY id DESC LIMIT 50'
).all();
res.json(batches);
} finally {
db.close();
}
}));
/** DELETE /api/import/reset — 清空所有用户数据(仅开发用) */
app.delete('/api/import/reset', asyncHandler(async (req, res) => {
const theme = req.query.theme || 'onion';
const db = getDb(theme);
try {
db.exec('DELETE FROM user_tags; DELETE FROM users;');
db.exec('UPDATE tags SET coverage=0, coverage_rate=0');
cacheInvalidate(`tags:${theme}`);
cacheInvalidate(`compute:${theme}`);
res.json({ ok: true });
} finally {
db.close();
}
}));
// ═════════════════════════════════════════════
// 4. 用户明细 API
// ═════════════════════════════════════════════
/**
* POST /api/users/sample
* 获取当前圈选结果的用户样本最多100条
*/
app.post('/api/users/sample', asyncHandler(async (req, res) => {
const { selected = [], limit = 50 } = req.body;
const theme = req.query.theme || 'onion';
const db = getDb(theme);
try {
const includes = selected.filter(s => s.mode !== 'exclude');
const excludes = selected.filter(s => s.mode === 'exclude');
let baseSql, baseParams = [];
if (includes.length === 0) {
baseSql = 'SELECT id as user_id FROM users';
} else {
const parts = includes.map(s => { baseParams.push(s.tagId); return `SELECT user_id FROM user_tags WHERE tag_id = ?`; });
baseSql = parts.join(' INTERSECT ');
}
let finalSql = baseSql;
const finalParams = [...baseParams];
for (const ex of excludes) {
finalSql += ` EXCEPT SELECT user_id FROM user_tags WHERE tag_id = ?`;
finalParams.push(ex.tagId);
}
const users = db.prepare(`
SELECT u.uid, u.name, u.email, u.created_at, u.extra_json
FROM users u
WHERE u.id IN (${finalSql})
ORDER BY RANDOM()
LIMIT ?
`).all(...finalParams, Math.min(limit, 100));
// 解析 extra_json
const enrichedUsers = users.map(u => {
try {
const extra = JSON.parse(u.extra_json || '{}');
return { ...u, ...extra, extra_json: undefined };
} catch {
return u;
}
});
res.json({ users: enrichedUsers });
} finally {
db.close();
}
}));
/**
* GET /api/duration-stats
* 获取"指导周期"相关的统计信息
*/
app.get('/api/duration-stats', asyncHandler(async (req, res) => {
const theme = req.query.theme || 'onion';
const cacheKey = `duration-stats:${theme}`;
const hit = cacheGet(cacheKey);
if (hit) return res.json(hit);
const db = getDb(theme);
try {
const totalUsers = db.prepare('SELECT COUNT(*) as n FROM users').get().n;
// 获取各天数标签的统计
const durationStats = db.prepare(`
SELECT
t.id,
t.key,
t.name,
COUNT(ut.user_id) as count,
ROUND(COUNT(ut.user_id) * 100.0 / ?, 2) as rate
FROM tags t
LEFT JOIN user_tags ut ON t.id = ut.tag_id
WHERE t.key LIKE 'duration_%'
GROUP BY t.id, t.key, t.name
ORDER BY t.sort_order
`).all(totalUsers);
const result = {
totalUsers,
durationBreakdown: durationStats.map(s => ({
id: s.id,
key: s.key,
name: s.name,
count: s.count || 0,
rate: s.rate || 0
}))
};
cacheSet(cacheKey, result, 300_000); // 5分钟
res.json(result);
} finally {
db.close();
}
}));
// ═════════════════════════════════════════════
// 5. 错误处理
// ═════════════════════════════════════════════
app.use((err, req, res, next) => {
console.error(err);
res.status(500).json({ error: err.message });
});
// SPA fallback
app.get('/{*path}', (req, res) => {
res.sendFile(path.join(__dirname, 'public', 'index.html'));
});
app.listen(PORT, () => {
console.log(`\n🚀 DMP 服务启动: http://localhost:${PORT}`);
console.log(`📡 导入 API: POST /api/import/users`);
console.log(`📡 标签 API: POST /api/import/user-tags`);
console.log(`📡 计算 API: POST /api/compute\n`);
});