feat(project): 基于Redis实现项目初始化任务分布式管理

- 引入Redisson依赖,集成Redisson客户端实现Redis操作
- 新增RedisService封装Redis常用操作方法,支持键值、哈希和集合操作
- ProjectInitTaskVO新增userId字段,标识任务所属用户
- ProjectInitAsyncService接口新增用户任务相关方法,支持按用户获取任务列表和处理中的任务数
- ProjectInitAsyncServiceImpl改用Redis存储任务状态和用户任务列表,替代原内存存储
- 项目初始化任务异步执行流程支持通过Redis保存状态并自动过期
- 实现进度推送由Service层统一通过SSE发送至对应用户,无需Controller中重复推送
- ProjectInitSseController新增基于当前用户Token的任务查询接口,支持获取任务列表、统计信息及单任务状态
- 优化异常处理和任务取消逻辑,确保Redis中任务状态正确更新
- 新增application-dev.yaml Redisson客户端配置,支持单机Redis服务连接
This commit is contained in:
2026-03-28 17:56:05 +08:00
parent 6d91be8af5
commit 32bff3aabc
7 changed files with 522 additions and 91 deletions

View File

@@ -4,7 +4,7 @@ import cn.yinlihupo.domain.vo.ProjectInitResult;
import cn.yinlihupo.domain.vo.ProjectInitTaskVO;
import org.springframework.web.multipart.MultipartFile;
import java.util.function.Consumer;
import java.util.List;
/**
* 项目初始化异步任务服务接口
@@ -20,13 +20,21 @@ public interface ProjectInitAsyncService {
String submitPreviewTask(MultipartFile file);
/**
* 提交异步项目初始化预览任务(带进度回调
* 提交异步项目初始化预览任务(带用户ID
*
* @param file 项目资料文件
* @param progressCallback 进度回调函数
* @param file 项目资料文件
* @param userId 用户ID
* @return 任务ID
*/
String submitPreviewTask(MultipartFile file, Consumer<ProjectInitTaskVO> progressCallback);
String submitPreviewTask(MultipartFile file, Long userId);
/**
* 获取指定用户的所有任务
*
* @param userId 用户ID
* @return 任务列表(按创建时间倒序)
*/
List<ProjectInitTaskVO> getTasksByUserId(Long userId);
/**
* 获取任务状态
@@ -58,4 +66,12 @@ public interface ProjectInitAsyncService {
* @param expireHours 过期时间(小时)
*/
void cleanExpiredTasks(int expireHours);
/**
* 获取指定用户的正在进行的任务数量
*
* @param userId 用户ID
* @return 正在进行的任务数量
*/
int getProcessingTaskCount(Long userId);
}

View File

@@ -2,6 +2,9 @@ package cn.yinlihupo.service.project.impl;
import cn.hutool.core.util.IdUtil;
import cn.yinlihupo.common.enums.AsyncTaskStatus;
import cn.yinlihupo.common.sse.SseChannelManager;
import cn.yinlihupo.common.sse.SseMessage;
import cn.yinlihupo.common.util.RedisService;
import cn.yinlihupo.domain.vo.ProjectInitResult;
import cn.yinlihupo.domain.vo.ProjectInitTaskVO;
import cn.yinlihupo.service.oss.OssService;
@@ -13,15 +16,15 @@ import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
* 项目初始化异步任务服务实现类
* 使用内存存储任务状态
* 使用 Redis 存储任务状态,支持分布式环境
*/
@Slf4j
@Service
@@ -30,16 +33,28 @@ public class ProjectInitAsyncServiceImpl implements ProjectInitAsyncService {
private final ProjectService projectService;
private final OssService ossService;
private final RedisService redisService;
private final SseChannelManager sseChannelManager;
/**
* 任务存储(内存存储)
* 任务存储 key 前缀
*/
private final Map<String, ProjectInitTaskVO> taskStore = new ConcurrentHashMap<>();
private static final String TASK_KEY_PREFIX = "project:init:task:";
/**
* 进度回调存储(内存存储,仅当前实例有效)
* 用户任务列表 key 前缀
*/
private final Map<String, Consumer<ProjectInitTaskVO>> progressCallbacks = new ConcurrentHashMap<>();
private static final String USER_TASKS_KEY_PREFIX = "project:init:user:";
/**
* SSE 消息类型
*/
private static final String MESSAGE_TYPE = "project-init";
/**
* 任务默认过期时间24小时
*/
private static final Duration TASK_EXPIRE_DURATION = Duration.ofHours(24);
@Override
public String submitPreviewTask(MultipartFile file) {
@@ -47,16 +62,17 @@ public class ProjectInitAsyncServiceImpl implements ProjectInitAsyncService {
}
@Override
public String submitPreviewTask(MultipartFile file, Consumer<ProjectInitTaskVO> progressCallback) {
public String submitPreviewTask(MultipartFile file, Long userId) {
// 生成任务ID
String taskId = IdUtil.fastSimpleUUID();
String originalFilename = file.getOriginalFilename();
log.info("提交项目初始化预览任务, taskId: {}, 文件名: {}", taskId, originalFilename);
log.info("提交项目初始化预览任务, taskId: {}, userId: {}, 文件名: {}", taskId, userId, originalFilename);
// 创建任务记录
ProjectInitTaskVO taskVO = new ProjectInitTaskVO();
taskVO.setTaskId(taskId);
taskVO.setUserId(userId);
taskVO.setStatus(AsyncTaskStatus.PENDING.getCode());
taskVO.setStatusDesc(AsyncTaskStatus.PENDING.getDescription());
taskVO.setProgress(0);
@@ -64,12 +80,16 @@ public class ProjectInitAsyncServiceImpl implements ProjectInitAsyncService {
taskVO.setOriginalFilename(originalFilename);
taskVO.setCreateTime(LocalDateTime.now());
// 存储到内存
taskStore.put(taskId, taskVO);
// 存储任务到 Redis
String taskKey = getTaskKey(taskId);
redisService.set(taskKey, taskVO, TASK_EXPIRE_DURATION);
// 保存进度回调
if (progressCallback != null) {
progressCallbacks.put(taskId, progressCallback);
// 将任务ID添加到用户的任务列表
if (userId != null) {
String userTasksKey = getUserTasksKey(userId);
redisService.sAdd(userTasksKey, taskId);
// 设置用户任务列表过期时间
redisService.expire(userTasksKey, TASK_EXPIRE_DURATION);
}
// 异步执行任务
@@ -83,7 +103,7 @@ public class ProjectInitAsyncServiceImpl implements ProjectInitAsyncService {
*/
@Async("projectInitTaskExecutor")
public CompletableFuture<Void> executePreviewTaskAsync(String taskId, MultipartFile file) {
ProjectInitTaskVO taskVO = taskStore.get(taskId);
ProjectInitTaskVO taskVO = getTaskFromRedis(taskId);
if (taskVO == null) {
log.error("任务不存在, taskId: {}", taskId);
return CompletableFuture.completedFuture(null);
@@ -110,21 +130,26 @@ public class ProjectInitAsyncServiceImpl implements ProjectInitAsyncService {
ProjectInitResult result = projectService.generateProjectFromContent(content);
// 4. 更新任务完成状态
taskVO = getTaskFromRedis(taskId);
if (taskVO != null) {
taskVO.setResult(result);
taskVO.setCompleteTime(LocalDateTime.now());
saveTaskToRedis(taskVO);
}
updateTaskProgress(taskId, AsyncTaskStatus.COMPLETED, 100, "项目预览数据生成成功");
taskVO.setResult(result);
taskVO.setCompleteTime(LocalDateTime.now());
log.info("项目初始化预览任务完成, taskId: {}", taskId);
} catch (Exception e) {
log.error("项目初始化预览任务失败, taskId: {}, error: {}", taskId, e.getMessage(), e);
updateTaskProgress(taskId, AsyncTaskStatus.FAILED, 0, "任务执行失败");
taskVO.setErrorMessage(e.getMessage());
taskVO.setCompleteTime(LocalDateTime.now());
} finally {
// 清理回调(仅清理内存中的回调)
progressCallbacks.remove(taskId);
// 注意Redis中的任务数据保留供后续查询24小时后自动过期
taskVO = getTaskFromRedis(taskId);
if (taskVO != null) {
taskVO.setErrorMessage(e.getMessage());
taskVO.setCompleteTime(LocalDateTime.now());
saveTaskToRedis(taskVO);
}
updateTaskProgress(taskId, AsyncTaskStatus.FAILED, 0, "任务执行失败: " + e.getMessage());
}
return CompletableFuture.completedFuture(null);
@@ -134,7 +159,7 @@ public class ProjectInitAsyncServiceImpl implements ProjectInitAsyncService {
* 更新任务进度
*/
private void updateTaskProgress(String taskId, AsyncTaskStatus status, int progress, String message) {
ProjectInitTaskVO taskVO = taskStore.get(taskId);
ProjectInitTaskVO taskVO = getTaskFromRedis(taskId);
if (taskVO == null) {
return;
}
@@ -148,26 +173,45 @@ public class ProjectInitAsyncServiceImpl implements ProjectInitAsyncService {
taskVO.setStartTime(LocalDateTime.now());
}
// 更新内存存储
taskStore.put(taskId, taskVO);
// 更新 Redis
saveTaskToRedis(taskVO);
// 触发进度回调
Consumer<ProjectInitTaskVO> callback = progressCallbacks.get(taskId);
if (callback != null) {
try {
callback.accept(taskVO);
} catch (Exception e) {
log.warn("进度回调执行失败, taskId: {}", taskId, e);
}
}
// 通过 SSE 推送进度给用户
pushProgressToUser(taskVO, status);
log.debug("任务进度更新, taskId: {}, status: {}, progress: {}%, message: {}",
taskId, status.getCode(), progress, message);
}
/**
* 通过 SSE 推送进度给用户
*/
private void pushProgressToUser(ProjectInitTaskVO taskVO, AsyncTaskStatus status) {
if (taskVO.getUserId() == null) {
return;
}
String userId = String.valueOf(taskVO.getUserId());
// 判断任务是否结束
boolean isFinished = status == AsyncTaskStatus.COMPLETED ||
status == AsyncTaskStatus.FAILED ||
status == AsyncTaskStatus.CANCELLED;
// 推送进度消息
SseMessage message = SseMessage.of(MESSAGE_TYPE, "progress", userId, taskVO);
sseChannelManager.send(userId, message);
// 任务结束时推送完成消息
if (isFinished) {
SseMessage completeMessage = SseMessage.of(MESSAGE_TYPE, "complete", userId, taskVO);
sseChannelManager.send(userId, completeMessage);
}
}
@Override
public ProjectInitTaskVO getTaskStatus(String taskId) {
ProjectInitTaskVO taskVO = taskStore.get(taskId);
ProjectInitTaskVO taskVO = getTaskFromRedis(taskId);
if (taskVO == null) {
return null;
}
@@ -177,7 +221,7 @@ public class ProjectInitAsyncServiceImpl implements ProjectInitAsyncService {
@Override
public ProjectInitResult getTaskResult(String taskId) {
ProjectInitTaskVO taskVO = taskStore.get(taskId);
ProjectInitTaskVO taskVO = getTaskFromRedis(taskId);
if (taskVO == null || !AsyncTaskStatus.COMPLETED.getCode().equals(taskVO.getStatus())) {
return null;
}
@@ -186,7 +230,7 @@ public class ProjectInitAsyncServiceImpl implements ProjectInitAsyncService {
@Override
public boolean cancelTask(String taskId) {
ProjectInitTaskVO taskVO = taskStore.get(taskId);
ProjectInitTaskVO taskVO = getTaskFromRedis(taskId);
if (taskVO == null) {
return false;
}
@@ -196,8 +240,7 @@ public class ProjectInitAsyncServiceImpl implements ProjectInitAsyncService {
AsyncTaskStatus.PROCESSING.getCode().equals(taskVO.getStatus())) {
updateTaskProgress(taskId, AsyncTaskStatus.CANCELLED, 0, "任务已取消");
taskVO.setCompleteTime(LocalDateTime.now());
taskStore.put(taskId, taskVO);
progressCallbacks.remove(taskId);
saveTaskToRedis(taskVO);
log.info("任务已取消, taskId: {}", taskId);
return true;
}
@@ -207,26 +250,85 @@ public class ProjectInitAsyncServiceImpl implements ProjectInitAsyncService {
@Override
public void cleanExpiredTasks(int expireHours) {
// 清理已完成的任务,释放内存
LocalDateTime expireTime = LocalDateTime.now().minusHours(expireHours);
int count = 0;
for (Map.Entry<String, ProjectInitTaskVO> entry : taskStore.entrySet()) {
ProjectInitTaskVO task = entry.getValue();
if (task.getCompleteTime() != null && task.getCompleteTime().isBefore(expireTime)) {
taskStore.remove(entry.getKey());
progressCallbacks.remove(entry.getKey());
count++;
}
}
log.info("已清理 {} 个过期任务", count);
// Redis 会自动过期,无需手动清理
log.info("Redis任务将自动过期");
}
@Override
public List<ProjectInitTaskVO> getTasksByUserId(Long userId) {
if (userId == null) {
return new ArrayList<>();
}
String userTasksKey = getUserTasksKey(userId);
Set<String> taskIds = redisService.sMembers(userTasksKey);
if (taskIds == null || taskIds.isEmpty()) {
return new ArrayList<>();
}
return taskIds.stream()
.map(this::getTaskFromRedis)
.filter(Objects::nonNull)
.sorted(Comparator.comparing(ProjectInitTaskVO::getCreateTime).reversed())
.map(this::copyTaskVO)
.collect(Collectors.toList());
}
@Override
public int getProcessingTaskCount(Long userId) {
if (userId == null) {
return 0;
}
List<ProjectInitTaskVO> tasks = getTasksByUserId(userId);
return (int) tasks.stream()
.filter(task -> AsyncTaskStatus.PENDING.getCode().equals(task.getStatus())
|| AsyncTaskStatus.PROCESSING.getCode().equals(task.getStatus()))
.count();
}
// ==================== Redis 操作方法 ====================
/**
* 从 Redis 获取任务
*/
private ProjectInitTaskVO getTaskFromRedis(String taskId) {
String key = getTaskKey(taskId);
return redisService.get(key);
}
/**
* 保存任务到 Redis
*/
private void saveTaskToRedis(ProjectInitTaskVO taskVO) {
String key = getTaskKey(taskVO.getTaskId());
redisService.set(key, taskVO, TASK_EXPIRE_DURATION);
}
/**
* 获取任务存储 key
*/
private String getTaskKey(String taskId) {
return TASK_KEY_PREFIX + taskId;
}
/**
* 获取用户任务列表 key
*/
private String getUserTasksKey(Long userId) {
return USER_TASKS_KEY_PREFIX + userId + ":tasks";
}
// ==================== 工具方法 ====================
/**
* 复制任务VO
*/
private ProjectInitTaskVO copyTaskVO(ProjectInitTaskVO source) {
ProjectInitTaskVO copy = new ProjectInitTaskVO();
copy.setTaskId(source.getTaskId());
copy.setUserId(source.getUserId());
copy.setStatus(source.getStatus());
copy.setStatusDesc(source.getStatusDesc());
copy.setProgress(source.getProgress());