553 lines
18 KiB
JavaScript
553 lines
18 KiB
JavaScript
/**
|
||
* DMP API 服务器
|
||
*
|
||
* 📚 性能策略:
|
||
* 1. 内存缓存(LRU)- 高频查询不走 DB
|
||
* 2. 位图交叉计算 - 多标签 AND/OR 用 SQL INTERSECT/UNION
|
||
* 3. 预计算统计 - coverage 字段在写入时维护
|
||
* 4. 连接池 - 每个请求复用单个 DB 连接
|
||
*
|
||
* 📚 数据导入接口设计:
|
||
* POST /api/import/users - 批量导入用户
|
||
* POST /api/import/user-tags - 批量导入用户标签
|
||
* GET /api/import/batches - 查看导入历史
|
||
*/
|
||
|
||
const express = require('express');
|
||
const cors = require('cors');
|
||
const path = require('path');
|
||
const { getDb } = require('./db/init');
|
||
|
||
const app = express();
|
||
const PORT = 3456;
|
||
|
||
app.use(cors());
|
||
app.use(express.json({ limit: '50mb' }));
|
||
app.use(express.static(path.join(__dirname, 'public')));
|
||
|
||
// ─────────────────────────────────────────────
|
||
// 简易内存缓存(TTL 60s)
|
||
// ─────────────────────────────────────────────
|
||
const cache = new Map();
|
||
function cacheGet(key) {
|
||
const item = cache.get(key);
|
||
if (!item) return null;
|
||
if (Date.now() > item.expires) { cache.delete(key); return null; }
|
||
return item.value;
|
||
}
|
||
function cacheSet(key, value, ttlMs = 60_000) {
|
||
cache.set(key, { value, expires: Date.now() + ttlMs });
|
||
}
|
||
function cacheInvalidate(prefix) {
|
||
for (const k of cache.keys()) {
|
||
if (k.startsWith(prefix)) cache.delete(k);
|
||
}
|
||
}
|
||
|
||
// ─────────────────────────────────────────────
|
||
// 工具
|
||
// ─────────────────────────────────────────────
|
||
function asyncHandler(fn) {
|
||
return (req, res, next) => Promise.resolve(fn(req, res, next)).catch(next);
|
||
}
|
||
|
||
// ═════════════════════════════════════════════
|
||
// 1. 标签体系 API
|
||
// ═════════════════════════════════════════════
|
||
|
||
/** GET /api/tags — 返回所有分类+标签(含覆盖数),带缓存 */
|
||
app.get('/api/tags', asyncHandler(async (req, res) => {
|
||
const theme = req.query.theme || 'onion';
|
||
const cacheKey = `tags:${theme}:all`;
|
||
const hit = cacheGet(cacheKey);
|
||
if (hit) return res.json(hit);
|
||
|
||
const db = getDb(theme);
|
||
try {
|
||
const categories = db.prepare('SELECT * FROM tag_categories ORDER BY sort_order').all();
|
||
const tags = db.prepare(`
|
||
SELECT t.*, tc.key as cat_key, tc.color as cat_color
|
||
FROM tags t JOIN tag_categories tc ON t.category_id = tc.id
|
||
ORDER BY t.category_id, t.sort_order
|
||
`).all();
|
||
|
||
const result = categories.map(cat => ({
|
||
...cat,
|
||
tags: tags
|
||
.filter(t => t.category_id === cat.id)
|
||
.map(t => ({
|
||
...t,
|
||
source: (cat.key === 'core_problem' && /(推断)$/.test(t.name)) ? 'inferred' : 'original'
|
||
}))
|
||
}));
|
||
|
||
const totalUsers = db.prepare('SELECT COUNT(*) as n FROM users').get().n;
|
||
cacheSet(cacheKey, { categories: result, totalUsers }, 300_000); // 5分钟
|
||
res.json({ categories: result, totalUsers });
|
||
} finally {
|
||
db.close();
|
||
}
|
||
}));
|
||
|
||
// ═════════════════════════════════════════════
|
||
// 2. 实时交叉计算 API ← 核心功能
|
||
// ═════════════════════════════════════════════
|
||
|
||
/**
|
||
* POST /api/compute
|
||
* Body: {
|
||
* selected: { tagId: number, mode: 'include'|'exclude' }[],
|
||
* logic: 'AND' | 'OR'
|
||
* }
|
||
* Returns: { count, rate, breakdown: { tagId, count, rate }[] }
|
||
*
|
||
* 📚 交叉计算算法:
|
||
* AND(必须同时拥有)→ INTERSECT
|
||
* OR(拥有任一即可)→ UNION / IN
|
||
* EXCLUDE(排除) → EXCEPT / NOT EXISTS
|
||
* SQLite 的 INTERSECT 底层就是对有序集合做 merge-join,O(n log n)
|
||
*/
|
||
app.post('/api/compute', asyncHandler(async (req, res) => {
|
||
const { selected = [] } = req.body;
|
||
const theme = req.query.theme || 'onion';
|
||
|
||
// 缓存 key
|
||
const cacheKey = `compute:${theme}:${selected.map(s => `${s.tagId}:${s.mode}`).sort().join(',')}`;
|
||
const hit = cacheGet(cacheKey);
|
||
if (hit) return res.json(hit);
|
||
|
||
const db = getDb(theme);
|
||
try {
|
||
const totalUsers = db.prepare('SELECT COUNT(*) as n FROM users').get().n;
|
||
|
||
// 分离 include / exclude
|
||
const includes = selected.filter(s => s.mode !== 'exclude');
|
||
const excludes = selected.filter(s => s.mode === 'exclude');
|
||
|
||
// 构建分类感知的 SQL
|
||
let baseSql;
|
||
const baseParams = [];
|
||
|
||
if (includes.length === 0) {
|
||
baseSql = 'SELECT id as user_id FROM users';
|
||
} else {
|
||
// 获取每个标签的分类信息
|
||
const categoryMap = {};
|
||
for (const inc of includes) {
|
||
const tagInfo = db.prepare(`
|
||
SELECT t.id, t.category_id FROM tags t WHERE t.id = ?
|
||
`).get(inc.tagId);
|
||
|
||
if (tagInfo) {
|
||
if (!categoryMap[tagInfo.category_id]) {
|
||
categoryMap[tagInfo.category_id] = [];
|
||
}
|
||
categoryMap[tagInfo.category_id].push(inc.tagId);
|
||
}
|
||
}
|
||
|
||
// 为每个分类生成 SQL 子句
|
||
// 同一分类:OR 逻辑(IN)
|
||
// 不同分类:AND 逻辑(INTERSECT)
|
||
const categoryParts = [];
|
||
for (const catId in categoryMap) {
|
||
const tagIds = categoryMap[catId];
|
||
if (tagIds.length === 1) {
|
||
// 单个标签:直接用 tagId
|
||
baseParams.push(tagIds[0]);
|
||
categoryParts.push(`SELECT user_id FROM user_tags WHERE tag_id = ?`);
|
||
} else {
|
||
// 多个标签:用 IN(OR)
|
||
baseParams.push(...tagIds);
|
||
const placeholders = tagIds.map(() => '?').join(',');
|
||
categoryParts.push(`SELECT user_id FROM user_tags WHERE tag_id IN (${placeholders})`);
|
||
}
|
||
}
|
||
|
||
// 用 INTERSECT 链接各个分类的结果(AND)
|
||
baseSql = categoryParts.join(' INTERSECT ');
|
||
}
|
||
|
||
// 叠加 EXCLUDE
|
||
let finalSql = baseSql;
|
||
const finalParams = [...baseParams];
|
||
for (const ex of excludes) {
|
||
finalSql = `${finalSql} EXCEPT SELECT user_id FROM user_tags WHERE tag_id = ?`;
|
||
finalParams.push(ex.tagId);
|
||
}
|
||
|
||
// 计算主结果
|
||
const countSql = `SELECT COUNT(*) as n FROM (${finalSql})`;
|
||
const mainCount = db.prepare(countSql).get(...finalParams).n;
|
||
|
||
// 计算每个已选标签的细分(本次结果集中拥有该标签的人数)
|
||
let breakdown = [];
|
||
if (selected.length > 0 && mainCount > 0) {
|
||
// 对每个已选标签,计算它在结果集内的覆盖
|
||
const allTagIds = selected.map(s => s.tagId);
|
||
const breakdownStmt = db.prepare(`
|
||
SELECT tag_id, COUNT(*) as n
|
||
FROM user_tags
|
||
WHERE user_id IN (${finalSql})
|
||
AND tag_id IN (${allTagIds.map(() => '?').join(',')})
|
||
GROUP BY tag_id
|
||
`);
|
||
const rows = breakdownStmt.all(...finalParams, ...allTagIds);
|
||
breakdown = rows.map(r => ({
|
||
tagId: r.tag_id,
|
||
count: r.n,
|
||
rate: mainCount > 0 ? +(r.n / mainCount * 100).toFixed(1) : 0
|
||
}));
|
||
}
|
||
|
||
const result = {
|
||
count: mainCount,
|
||
rate: +(mainCount / totalUsers * 100).toFixed(2),
|
||
totalUsers,
|
||
breakdown
|
||
};
|
||
|
||
cacheSet(cacheKey, result, 30_000); // 30s 缓存
|
||
res.json(result);
|
||
} finally {
|
||
db.close();
|
||
}
|
||
}));
|
||
|
||
/**
|
||
* POST /api/compute/cross
|
||
* 计算两个标签在当前结果集内的交叉分布(用于热力图/桑基图)
|
||
* Body: { selected, logic, crossTagIds: number[] }
|
||
*/
|
||
app.post('/api/compute/cross', asyncHandler(async (req, res) => {
|
||
const { selected = [], crossTagIds = [] } = req.body;
|
||
const theme = req.query.theme || 'onion';
|
||
|
||
if (crossTagIds.length === 0) return res.json({ matrix: [] });
|
||
|
||
const db = getDb(theme);
|
||
try {
|
||
const includes = selected.filter(s => s.mode !== 'exclude');
|
||
const excludes = selected.filter(s => s.mode === 'exclude');
|
||
|
||
let baseSql, baseParams = [];
|
||
if (includes.length === 0) {
|
||
baseSql = 'SELECT id as user_id FROM users';
|
||
} else {
|
||
const parts = includes.map(s => { baseParams.push(s.tagId); return `SELECT user_id FROM user_tags WHERE tag_id = ?`; });
|
||
baseSql = parts.join(' INTERSECT ');
|
||
}
|
||
let finalSql = baseSql;
|
||
const finalParams = [...baseParams];
|
||
for (const ex of excludes) {
|
||
finalSql += ` EXCEPT SELECT user_id FROM user_tags WHERE tag_id = ?`;
|
||
finalParams.push(ex.tagId);
|
||
}
|
||
|
||
const baseCount = db.prepare(`SELECT COUNT(*) as n FROM (${finalSql})`).get(...finalParams).n;
|
||
|
||
// 对每个交叉标签计算覆盖
|
||
const matrix = [];
|
||
for (const tagId of crossTagIds) {
|
||
const n = db.prepare(`
|
||
SELECT COUNT(*) as n FROM (${finalSql})
|
||
WHERE user_id IN (SELECT user_id FROM user_tags WHERE tag_id = ?)
|
||
`).get(...finalParams, tagId).n;
|
||
matrix.push({ tagId, count: n, rate: baseCount > 0 ? +(n / baseCount * 100).toFixed(1) : 0 });
|
||
}
|
||
|
||
res.json({ baseCount, matrix });
|
||
} finally {
|
||
db.close();
|
||
}
|
||
}));
|
||
|
||
// ═════════════════════════════════════════════
|
||
// 3. 数据导入 API ← 接入点
|
||
// ═════════════════════════════════════════════
|
||
|
||
/**
|
||
* POST /api/import/users
|
||
* 批量导入/更新用户基础数据
|
||
* Body: { users: [{ uid, name, email, extra_json? }] }
|
||
*
|
||
* 📚 设计原则:
|
||
* - 使用 INSERT OR REPLACE 做 upsert
|
||
* - 分批提交事务(每1000条一批),避免锁超时
|
||
* - 返回导入批次 ID,供后续追踪
|
||
*/
|
||
app.post('/api/import/users', asyncHandler(async (req, res) => {
|
||
const { users = [], source = 'api' } = req.body;
|
||
const theme = req.query.theme || 'onion';
|
||
|
||
if (!Array.isArray(users) || users.length === 0) {
|
||
return res.status(400).json({ error: 'invalid users array' });
|
||
}
|
||
|
||
const db = getDb(theme);
|
||
try {
|
||
const batchRes = db.prepare(
|
||
'INSERT INTO import_batches (source, record_count, status) VALUES (?, ?, ?)'
|
||
).run(source, users.length, 'running');
|
||
const batchId = batchRes.lastInsertRowid;
|
||
|
||
const stmt = db.prepare(
|
||
'INSERT OR REPLACE INTO users (uid, name, email, extra_json) VALUES (?, ?, ?, ?)'
|
||
);
|
||
|
||
let imported = 0;
|
||
const BATCH = 1000;
|
||
for (let i = 0; i < users.length; i += BATCH) {
|
||
const chunk = users.slice(i, i + BATCH);
|
||
const tx = db.transaction(() => {
|
||
for (const u of chunk) {
|
||
stmt.run(u.uid, u.name || '', u.email || '', JSON.stringify(u.extra_json || {}));
|
||
imported++;
|
||
}
|
||
});
|
||
tx();
|
||
}
|
||
|
||
db.prepare(
|
||
"UPDATE import_batches SET status='done', record_count=?, finished_at=datetime('now') WHERE id=?"
|
||
).run(imported, batchId);
|
||
|
||
cacheInvalidate(`tags:${theme}`);
|
||
res.json({ batchId, imported, total: users.length });
|
||
} catch (err) {
|
||
db.prepare("UPDATE import_batches SET status='error', error_message=? WHERE id=?")
|
||
.run(err.message, -1);
|
||
throw err;
|
||
} finally {
|
||
db.close();
|
||
}
|
||
}));
|
||
|
||
/**
|
||
* POST /api/import/user-tags
|
||
* 批量导入用户标签关联
|
||
* Body: { assignments: [{ uid, tagKey }], mode: 'append'|'replace' }
|
||
*
|
||
* mode='replace': 先清除用户现有标签,再写入(完整刷新)
|
||
* mode='append': 仅追加,不删除旧标签(增量更新)
|
||
*/
|
||
app.post('/api/import/user-tags', asyncHandler(async (req, res) => {
|
||
const { assignments = [], source = 'api', mode = 'append' } = req.body;
|
||
const theme = req.query.theme || 'onion';
|
||
|
||
if (!Array.isArray(assignments) || assignments.length === 0) {
|
||
return res.status(400).json({ error: 'invalid assignments array' });
|
||
}
|
||
|
||
const db = getDb(theme);
|
||
try {
|
||
// 构建查询缓存
|
||
const userStmt = db.prepare('SELECT id FROM users WHERE uid = ?');
|
||
const tagStmt = db.prepare('SELECT id FROM tags WHERE key = ?');
|
||
const insertStmt = db.prepare('INSERT OR IGNORE INTO user_tags (user_id, tag_id) VALUES (?, ?)');
|
||
const deleteStmt = db.prepare('DELETE FROM user_tags WHERE user_id = ?');
|
||
|
||
const batchRes = db.prepare(
|
||
'INSERT INTO import_batches (source, record_count, status) VALUES (?, ?, ?)'
|
||
).run(source, assignments.length, 'running');
|
||
const batchId = batchRes.lastInsertRowid;
|
||
|
||
let imported = 0;
|
||
let skipped = 0;
|
||
|
||
// 按 uid 分组,支持 replace 模式
|
||
const grouped = {};
|
||
for (const a of assignments) {
|
||
if (!grouped[a.uid]) grouped[a.uid] = [];
|
||
grouped[a.uid].push(a.tagKey);
|
||
}
|
||
|
||
const tx = db.transaction(() => {
|
||
for (const [uid, tagKeys] of Object.entries(grouped)) {
|
||
const user = userStmt.get(uid);
|
||
if (!user) { skipped += tagKeys.length; continue; }
|
||
|
||
if (mode === 'replace') deleteStmt.run(user.id);
|
||
|
||
for (const tagKey of tagKeys) {
|
||
const tag = tagStmt.get(tagKey);
|
||
if (!tag) { skipped++; continue; }
|
||
insertStmt.run(user.id, tag.id);
|
||
imported++;
|
||
}
|
||
}
|
||
});
|
||
tx();
|
||
|
||
// 更新覆盖统计
|
||
const totalUsers = db.prepare('SELECT COUNT(*) as n FROM users').get().n;
|
||
db.exec(`
|
||
UPDATE tags SET
|
||
coverage = (SELECT COUNT(*) FROM user_tags WHERE tag_id = tags.id),
|
||
coverage_rate = ROUND((SELECT COUNT(*) FROM user_tags WHERE tag_id = tags.id) * 100.0 / ${totalUsers}, 2)
|
||
`);
|
||
|
||
db.prepare("UPDATE import_batches SET status='done', finished_at=datetime('now') WHERE id=?").run(batchId);
|
||
|
||
cacheInvalidate(`tags:${theme}`);
|
||
cacheInvalidate(`compute:${theme}`);
|
||
|
||
res.json({ batchId, imported, skipped });
|
||
} catch (err) {
|
||
throw err;
|
||
} finally {
|
||
db.close();
|
||
}
|
||
}));
|
||
|
||
/** GET /api/import/batches — 查看导入历史 */
|
||
app.get('/api/import/batches', asyncHandler(async (req, res) => {
|
||
const theme = req.query.theme || 'onion';
|
||
const db = getDb(theme);
|
||
try {
|
||
const batches = db.prepare(
|
||
'SELECT * FROM import_batches ORDER BY id DESC LIMIT 50'
|
||
).all();
|
||
res.json(batches);
|
||
} finally {
|
||
db.close();
|
||
}
|
||
}));
|
||
|
||
/** DELETE /api/import/reset — 清空所有用户数据(仅开发用) */
|
||
app.delete('/api/import/reset', asyncHandler(async (req, res) => {
|
||
const theme = req.query.theme || 'onion';
|
||
const db = getDb(theme);
|
||
try {
|
||
db.exec('DELETE FROM user_tags; DELETE FROM users;');
|
||
db.exec('UPDATE tags SET coverage=0, coverage_rate=0');
|
||
cacheInvalidate(`tags:${theme}`);
|
||
cacheInvalidate(`compute:${theme}`);
|
||
res.json({ ok: true });
|
||
} finally {
|
||
db.close();
|
||
}
|
||
}));
|
||
|
||
// ═════════════════════════════════════════════
|
||
// 4. 用户明细 API
|
||
// ═════════════════════════════════════════════
|
||
|
||
/**
|
||
* POST /api/users/sample
|
||
* 获取当前圈选结果的用户样本(最多100条)
|
||
*/
|
||
app.post('/api/users/sample', asyncHandler(async (req, res) => {
|
||
const { selected = [], limit = 50 } = req.body;
|
||
const theme = req.query.theme || 'onion';
|
||
|
||
const db = getDb(theme);
|
||
try {
|
||
const includes = selected.filter(s => s.mode !== 'exclude');
|
||
const excludes = selected.filter(s => s.mode === 'exclude');
|
||
|
||
let baseSql, baseParams = [];
|
||
if (includes.length === 0) {
|
||
baseSql = 'SELECT id as user_id FROM users';
|
||
} else {
|
||
const parts = includes.map(s => { baseParams.push(s.tagId); return `SELECT user_id FROM user_tags WHERE tag_id = ?`; });
|
||
baseSql = parts.join(' INTERSECT ');
|
||
}
|
||
let finalSql = baseSql;
|
||
const finalParams = [...baseParams];
|
||
for (const ex of excludes) {
|
||
finalSql += ` EXCEPT SELECT user_id FROM user_tags WHERE tag_id = ?`;
|
||
finalParams.push(ex.tagId);
|
||
}
|
||
|
||
const users = db.prepare(`
|
||
SELECT u.uid, u.name, u.email, u.created_at, u.extra_json
|
||
FROM users u
|
||
WHERE u.id IN (${finalSql})
|
||
ORDER BY RANDOM()
|
||
LIMIT ?
|
||
`).all(...finalParams, Math.min(limit, 100));
|
||
|
||
// 解析 extra_json
|
||
const enrichedUsers = users.map(u => {
|
||
try {
|
||
const extra = JSON.parse(u.extra_json || '{}');
|
||
return { ...u, ...extra, extra_json: undefined };
|
||
} catch {
|
||
return u;
|
||
}
|
||
});
|
||
|
||
res.json({ users: enrichedUsers });
|
||
} finally {
|
||
db.close();
|
||
}
|
||
}));
|
||
|
||
/**
|
||
* GET /api/duration-stats
|
||
* 获取"指导周期"相关的统计信息
|
||
*/
|
||
app.get('/api/duration-stats', asyncHandler(async (req, res) => {
|
||
const theme = req.query.theme || 'onion';
|
||
const cacheKey = `duration-stats:${theme}`;
|
||
const hit = cacheGet(cacheKey);
|
||
if (hit) return res.json(hit);
|
||
|
||
const db = getDb(theme);
|
||
try {
|
||
const totalUsers = db.prepare('SELECT COUNT(*) as n FROM users').get().n;
|
||
|
||
// 获取各天数标签的统计
|
||
const durationStats = db.prepare(`
|
||
SELECT
|
||
t.id,
|
||
t.key,
|
||
t.name,
|
||
COUNT(ut.user_id) as count,
|
||
ROUND(COUNT(ut.user_id) * 100.0 / ?, 2) as rate
|
||
FROM tags t
|
||
LEFT JOIN user_tags ut ON t.id = ut.tag_id
|
||
WHERE t.key LIKE 'duration_%'
|
||
GROUP BY t.id, t.key, t.name
|
||
ORDER BY t.sort_order
|
||
`).all(totalUsers);
|
||
|
||
const result = {
|
||
totalUsers,
|
||
durationBreakdown: durationStats.map(s => ({
|
||
id: s.id,
|
||
key: s.key,
|
||
name: s.name,
|
||
count: s.count || 0,
|
||
rate: s.rate || 0
|
||
}))
|
||
};
|
||
|
||
cacheSet(cacheKey, result, 300_000); // 5分钟
|
||
res.json(result);
|
||
} finally {
|
||
db.close();
|
||
}
|
||
}));
|
||
|
||
// ═════════════════════════════════════════════
|
||
// 5. 错误处理
|
||
// ═════════════════════════════════════════════
|
||
app.use((err, req, res, next) => {
|
||
console.error(err);
|
||
res.status(500).json({ error: err.message });
|
||
});
|
||
|
||
// SPA fallback
|
||
app.get('/{*path}', (req, res) => {
|
||
res.sendFile(path.join(__dirname, 'public', 'index.html'));
|
||
});
|
||
|
||
app.listen(PORT, () => {
|
||
console.log(`\n🚀 DMP 服务启动: http://localhost:${PORT}`);
|
||
console.log(`📡 导入 API: POST /api/import/users`);
|
||
console.log(`📡 标签 API: POST /api/import/user-tags`);
|
||
console.log(`📡 计算 API: POST /api/compute\n`);
|
||
});
|