diff --git a/pom.xml b/pom.xml index f98f389..f671c24 100644 --- a/pom.xml +++ b/pom.xml @@ -145,6 +145,12 @@ 1.39.0 + + + org.redisson + redisson-spring-boot-starter + 3.27.0 + org.springframework.boot diff --git a/src/main/java/cn/yinlihupo/common/util/RedisService.java b/src/main/java/cn/yinlihupo/common/util/RedisService.java new file mode 100644 index 0000000..94f4f52 --- /dev/null +++ b/src/main/java/cn/yinlihupo/common/util/RedisService.java @@ -0,0 +1,243 @@ +package cn.yinlihupo.common.util; + +import org.redisson.api.RMap; +import org.redisson.api.RSet; +import org.redisson.api.RedissonClient; +import org.springframework.stereotype.Service; + +import java.time.Duration; +import java.util.*; +import java.util.concurrent.TimeUnit; + +/** + * Redis 服务类 + * 提供常用的 Redis 操作封装 + */ +@Service +public class RedisService { + + private final RedissonClient redissonClient; + + public RedisService(RedissonClient redissonClient) { + this.redissonClient = redissonClient; + } + + /** + * 存储对象到 Redis(带过期时间) + * + * @param key 键 + * @param value 值 + * @param duration 过期时间 + */ + public void set(String key, T value, Duration duration) { + redissonClient.getBucket(key).set(value, duration.toMillis(), TimeUnit.MILLISECONDS); + } + + /** + * 存储对象到 Redis(永不过期) + * + * @param key 键 + * @param value 值 + */ + public void set(String key, T value) { + redissonClient.getBucket(key).set(value); + } + + /** + * 获取对象 + * + * @param key 键 + * @return 值,不存在返回 null + */ + @SuppressWarnings("unchecked") + public T get(String key) { + return (T) redissonClient.getBucket(key).get(); + } + + /** + * 删除键 + * + * @param key 键 + * @return 是否删除成功 + */ + public boolean delete(String key) { + return redissonClient.getBucket(key).delete(); + } + + /** + * 检查键是否存在 + * + * @param key 键 + * @return 是否存在 + */ + public boolean exists(String key) { + return redissonClient.getBucket(key).isExists(); + } + + /** + * 设置过期时间 + * + * @param key 键 + * @param duration 过期时间 + * @return 是否设置成功 + */ + public boolean expire(String key, Duration duration) { + return redissonClient.getBucket(key).expire(duration.toMillis(), TimeUnit.MILLISECONDS); + } + + /** + * 获取匹配模式的所有键 + * + * @param pattern 匹配模式 + * @return 键集合 + */ + public Set keys(String pattern) { + Iterable keys = redissonClient.getKeys().getKeysByPattern(pattern); + Set result = new HashSet<>(); + keys.forEach(result::add); + return result; + } + + /** + * 批量删除匹配模式的键 + * + * @param pattern 匹配模式 + * @return 删除的数量 + */ + public long deleteByPattern(String pattern) { + return redissonClient.getKeys().deleteByPattern(pattern); + } + + /** + * 存储到 Hash + * + * @param key 键 + * @param field 字段 + * @param value 值 + * @param duration 过期时间(可选,传 null 表示不设置过期) + */ + public void hSet(String key, String field, T value, Duration duration) { + RMap map = redissonClient.getMap(key); + map.put(field, value); + if (duration != null) { + map.expire(duration.toMillis(), TimeUnit.MILLISECONDS); + } + } + + /** + * 从 Hash 获取值 + * + * @param key 键 + * @param field 字段 + * @return 值 + */ + @SuppressWarnings("unchecked") + public T hGet(String key, String field) { + RMap map = redissonClient.getMap(key); + return (T) map.get(field); + } + + /** + * 从 Hash 删除字段 + * + * @param key 键 + * @param field 字段 + * @return 删除的数量 + */ + public long hDel(String key, String field) { + RMap map = redissonClient.getMap(key); + return map.remove(field) != null ? 1 : 0; + } + + /** + * 获取 Hash 所有字段和值 + * + * @param key 键 + * @return 字段值映射 + */ + public Map hGetAll(String key) { + RMap map = redissonClient.getMap(key); + return new HashMap<>(map.readAllMap()); + } + + /** + * 获取 Hash 所有值 + * + * @param key 键 + * @return 值集合 + */ + public Collection hGetAllValues(String key) { + RMap map = redissonClient.getMap(key); + return map.values(); + } + + /** + * 检查 Hash 字段是否存在 + * + * @param key 键 + * @param field 字段 + * @return 是否存在 + */ + public boolean hExists(String key, String field) { + RMap map = redissonClient.getMap(key); + return map.containsKey(field); + } + + /** + * 获取 Hash 大小 + * + * @param key 键 + * @return 大小 + */ + public int hSize(String key) { + RMap map = redissonClient.getMap(key); + return map.size(); + } + + /** + * 添加到 Set + * + * @param key 键 + * @param value 值 + * @return 是否添加成功 + */ + public boolean sAdd(String key, String value) { + RSet set = redissonClient.getSet(key); + return set.add(value); + } + + /** + * 从 Set 移除 + * + * @param key 键 + * @param value 值 + * @return 是否移除成功 + */ + public boolean sRem(String key, String value) { + RSet set = redissonClient.getSet(key); + return set.remove(value); + } + + /** + * 获取 Set 所有成员 + * + * @param key 键 + * @return 成员集合 + */ + public Set sMembers(String key) { + RSet set = redissonClient.getSet(key); + return new HashSet<>(set.readAll()); + } + + /** + * 检查 Set 是否包含成员 + * + * @param key 键 + * @param value 值 + * @return 是否包含 + */ + public boolean sIsMember(String key, String value) { + RSet set = redissonClient.getSet(key); + return set.contains(value); + } +} diff --git a/src/main/java/cn/yinlihupo/controller/project/ProjectInitSseController.java b/src/main/java/cn/yinlihupo/controller/project/ProjectInitSseController.java index b1f7939..584cca6 100644 --- a/src/main/java/cn/yinlihupo/controller/project/ProjectInitSseController.java +++ b/src/main/java/cn/yinlihupo/controller/project/ProjectInitSseController.java @@ -5,16 +5,16 @@ import cn.yinlihupo.common.enums.AsyncTaskStatus; import cn.yinlihupo.common.sse.SseChannelManager; import cn.yinlihupo.common.sse.SseMessage; import cn.yinlihupo.common.util.ResultUtils; +import cn.yinlihupo.common.util.SecurityUtils; +import cn.yinlihupo.domain.vo.ProjectInitTaskVO; import cn.yinlihupo.service.project.ProjectInitAsyncService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.annotation.*; import org.springframework.web.multipart.MultipartFile; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -37,7 +37,7 @@ public class ProjectInitSseController { /** * 通过 SSE 提交项目初始化任务 - * 使用通用 SSE 通道,通过 userId 推送进度 + * SSE 推送已在 Service 层自动处理 * * @param userId 用户ID * @param file 项目资料文件 @@ -58,18 +58,10 @@ public class ProjectInitSseController { } try { - // 提交异步任务,带进度回调 - String taskId = projectInitAsyncService.submitPreviewTask(file, taskVO -> { - // 构建消息并推送 - SseMessage message = SseMessage.of(MESSAGE_TYPE, "progress", userId, taskVO); - sseChannelManager.send(userId, message); - - // 任务完成或失败,推送完成事件 - if (isTaskFinished(taskVO.getStatus())) { - SseMessage completeMessage = SseMessage.of(MESSAGE_TYPE, "complete", userId, taskVO); - sseChannelManager.send(userId, completeMessage); - } - }); + Long userIdLong = Long.valueOf(userId); + + // 提交异步任务(SSE 推送由 Service 层自动处理) + String taskId = projectInitAsyncService.submitPreviewTask(file, userIdLong); // 推送任务提交成功事件 Map submittedData = new HashMap<>(); @@ -99,20 +91,74 @@ public class ProjectInitSseController { } } + /** + * 查询我的任务列表 + * 根据当前登录用户的token查询其所有任务 + * + * @return 任务列表 + */ + @GetMapping("/my-tasks") + public BaseResponse> getMyTasks() { + Long userId = SecurityUtils.getCurrentUserId(); + if (userId == null) { + return ResultUtils.error("用户未登录"); + } + + List tasks = projectInitAsyncService.getTasksByUserId(userId); + return ResultUtils.success(tasks); + } + + /** + * 查询我的任务统计信息 + * + * @return 统计信息 + */ + @GetMapping("/my-tasks/stats") + public BaseResponse> getMyTaskStats() { + Long userId = SecurityUtils.getCurrentUserId(); + if (userId == null) { + return ResultUtils.error("用户未登录"); + } + + List tasks = projectInitAsyncService.getTasksByUserId(userId); + int processingCount = projectInitAsyncService.getProcessingTaskCount(userId); + + Map stats = new HashMap<>(); + stats.put("total", tasks.size()); + stats.put("processing", processingCount); + stats.put("completed", (int) tasks.stream() + .filter(t -> AsyncTaskStatus.COMPLETED.getCode().equals(t.getStatus())).count()); + stats.put("failed", (int) tasks.stream() + .filter(t -> AsyncTaskStatus.FAILED.getCode().equals(t.getStatus())).count()); + + return ResultUtils.success(stats); + } + + /** + * 查询单个任务状态 + * + * @param taskId 任务ID + * @return 任务状态 + */ + @GetMapping("/task/{taskId}") + public BaseResponse getTaskStatus(@PathVariable String taskId) { + Long userId = SecurityUtils.getCurrentUserId(); + if (userId == null) { + return ResultUtils.error("用户未登录"); + } + + ProjectInitTaskVO task = projectInitAsyncService.getTaskStatus(taskId); + if (task == null) { + return ResultUtils.error("任务不存在"); + } + + // 校验任务归属 + if (!userId.equals(task.getUserId())) { + return ResultUtils.error("无权访问该任务"); + } + + return ResultUtils.success(task); + } + // ==================== 工具方法 ==================== - - private boolean isTaskFinished(String status) { - return AsyncTaskStatus.COMPLETED.getCode().equals(status) || - AsyncTaskStatus.FAILED.getCode().equals(status) || - AsyncTaskStatus.CANCELLED.getCode().equals(status); - } - - private void sendErrorAndClose(String userId, String errorMessage) { - Map errorData = new HashMap<>(); - errorData.put("error", errorMessage); - - SseMessage errorMessage_obj = SseMessage.of(MESSAGE_TYPE, "error", userId, errorData); - sseChannelManager.send(userId, errorMessage_obj); - sseChannelManager.closeChannel(userId); - } } diff --git a/src/main/java/cn/yinlihupo/domain/vo/ProjectInitTaskVO.java b/src/main/java/cn/yinlihupo/domain/vo/ProjectInitTaskVO.java index f70019c..abc7761 100644 --- a/src/main/java/cn/yinlihupo/domain/vo/ProjectInitTaskVO.java +++ b/src/main/java/cn/yinlihupo/domain/vo/ProjectInitTaskVO.java @@ -15,6 +15,11 @@ public class ProjectInitTaskVO { */ private String taskId; + /** + * 用户ID(任务所属用户) + */ + private Long userId; + /** * 任务状态: pending-待处理, processing-处理中, completed-已完成, failed-失败 */ diff --git a/src/main/java/cn/yinlihupo/service/project/ProjectInitAsyncService.java b/src/main/java/cn/yinlihupo/service/project/ProjectInitAsyncService.java index 6169b46..c1924d5 100644 --- a/src/main/java/cn/yinlihupo/service/project/ProjectInitAsyncService.java +++ b/src/main/java/cn/yinlihupo/service/project/ProjectInitAsyncService.java @@ -4,7 +4,7 @@ import cn.yinlihupo.domain.vo.ProjectInitResult; import cn.yinlihupo.domain.vo.ProjectInitTaskVO; import org.springframework.web.multipart.MultipartFile; -import java.util.function.Consumer; +import java.util.List; /** * 项目初始化异步任务服务接口 @@ -20,13 +20,21 @@ public interface ProjectInitAsyncService { String submitPreviewTask(MultipartFile file); /** - * 提交异步项目初始化预览任务(带进度回调) + * 提交异步项目初始化预览任务(带用户ID) * - * @param file 项目资料文件 - * @param progressCallback 进度回调函数 + * @param file 项目资料文件 + * @param userId 用户ID * @return 任务ID */ - String submitPreviewTask(MultipartFile file, Consumer progressCallback); + String submitPreviewTask(MultipartFile file, Long userId); + + /** + * 获取指定用户的所有任务 + * + * @param userId 用户ID + * @return 任务列表(按创建时间倒序) + */ + List getTasksByUserId(Long userId); /** * 获取任务状态 @@ -58,4 +66,12 @@ public interface ProjectInitAsyncService { * @param expireHours 过期时间(小时) */ void cleanExpiredTasks(int expireHours); + + /** + * 获取指定用户的正在进行的任务数量 + * + * @param userId 用户ID + * @return 正在进行的任务数量 + */ + int getProcessingTaskCount(Long userId); } diff --git a/src/main/java/cn/yinlihupo/service/project/impl/ProjectInitAsyncServiceImpl.java b/src/main/java/cn/yinlihupo/service/project/impl/ProjectInitAsyncServiceImpl.java index 4be7dbe..de65d19 100644 --- a/src/main/java/cn/yinlihupo/service/project/impl/ProjectInitAsyncServiceImpl.java +++ b/src/main/java/cn/yinlihupo/service/project/impl/ProjectInitAsyncServiceImpl.java @@ -2,6 +2,9 @@ package cn.yinlihupo.service.project.impl; import cn.hutool.core.util.IdUtil; import cn.yinlihupo.common.enums.AsyncTaskStatus; +import cn.yinlihupo.common.sse.SseChannelManager; +import cn.yinlihupo.common.sse.SseMessage; +import cn.yinlihupo.common.util.RedisService; import cn.yinlihupo.domain.vo.ProjectInitResult; import cn.yinlihupo.domain.vo.ProjectInitTaskVO; import cn.yinlihupo.service.oss.OssService; @@ -13,15 +16,15 @@ import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.web.multipart.MultipartFile; +import java.time.Duration; import java.time.LocalDateTime; -import java.util.Map; +import java.util.*; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Consumer; +import java.util.stream.Collectors; /** * 项目初始化异步任务服务实现类 - * 使用内存存储任务状态 + * 使用 Redis 存储任务状态,支持分布式环境 */ @Slf4j @Service @@ -30,16 +33,28 @@ public class ProjectInitAsyncServiceImpl implements ProjectInitAsyncService { private final ProjectService projectService; private final OssService ossService; + private final RedisService redisService; + private final SseChannelManager sseChannelManager; /** - * 任务存储(内存存储) + * 任务存储 key 前缀 */ - private final Map taskStore = new ConcurrentHashMap<>(); + private static final String TASK_KEY_PREFIX = "project:init:task:"; /** - * 进度回调存储(内存存储,仅当前实例有效) + * 用户任务列表 key 前缀 */ - private final Map> progressCallbacks = new ConcurrentHashMap<>(); + private static final String USER_TASKS_KEY_PREFIX = "project:init:user:"; + + /** + * SSE 消息类型 + */ + private static final String MESSAGE_TYPE = "project-init"; + + /** + * 任务默认过期时间:24小时 + */ + private static final Duration TASK_EXPIRE_DURATION = Duration.ofHours(24); @Override public String submitPreviewTask(MultipartFile file) { @@ -47,16 +62,17 @@ public class ProjectInitAsyncServiceImpl implements ProjectInitAsyncService { } @Override - public String submitPreviewTask(MultipartFile file, Consumer progressCallback) { + public String submitPreviewTask(MultipartFile file, Long userId) { // 生成任务ID String taskId = IdUtil.fastSimpleUUID(); String originalFilename = file.getOriginalFilename(); - log.info("提交项目初始化预览任务, taskId: {}, 文件名: {}", taskId, originalFilename); + log.info("提交项目初始化预览任务, taskId: {}, userId: {}, 文件名: {}", taskId, userId, originalFilename); // 创建任务记录 ProjectInitTaskVO taskVO = new ProjectInitTaskVO(); taskVO.setTaskId(taskId); + taskVO.setUserId(userId); taskVO.setStatus(AsyncTaskStatus.PENDING.getCode()); taskVO.setStatusDesc(AsyncTaskStatus.PENDING.getDescription()); taskVO.setProgress(0); @@ -64,12 +80,16 @@ public class ProjectInitAsyncServiceImpl implements ProjectInitAsyncService { taskVO.setOriginalFilename(originalFilename); taskVO.setCreateTime(LocalDateTime.now()); - // 存储到内存 - taskStore.put(taskId, taskVO); + // 存储任务到 Redis + String taskKey = getTaskKey(taskId); + redisService.set(taskKey, taskVO, TASK_EXPIRE_DURATION); - // 保存进度回调 - if (progressCallback != null) { - progressCallbacks.put(taskId, progressCallback); + // 将任务ID添加到用户的任务列表 + if (userId != null) { + String userTasksKey = getUserTasksKey(userId); + redisService.sAdd(userTasksKey, taskId); + // 设置用户任务列表过期时间 + redisService.expire(userTasksKey, TASK_EXPIRE_DURATION); } // 异步执行任务 @@ -83,7 +103,7 @@ public class ProjectInitAsyncServiceImpl implements ProjectInitAsyncService { */ @Async("projectInitTaskExecutor") public CompletableFuture executePreviewTaskAsync(String taskId, MultipartFile file) { - ProjectInitTaskVO taskVO = taskStore.get(taskId); + ProjectInitTaskVO taskVO = getTaskFromRedis(taskId); if (taskVO == null) { log.error("任务不存在, taskId: {}", taskId); return CompletableFuture.completedFuture(null); @@ -110,21 +130,26 @@ public class ProjectInitAsyncServiceImpl implements ProjectInitAsyncService { ProjectInitResult result = projectService.generateProjectFromContent(content); // 4. 更新任务完成状态 + taskVO = getTaskFromRedis(taskId); + if (taskVO != null) { + taskVO.setResult(result); + taskVO.setCompleteTime(LocalDateTime.now()); + saveTaskToRedis(taskVO); + } + updateTaskProgress(taskId, AsyncTaskStatus.COMPLETED, 100, "项目预览数据生成成功"); - taskVO.setResult(result); - taskVO.setCompleteTime(LocalDateTime.now()); log.info("项目初始化预览任务完成, taskId: {}", taskId); } catch (Exception e) { log.error("项目初始化预览任务失败, taskId: {}, error: {}", taskId, e.getMessage(), e); - updateTaskProgress(taskId, AsyncTaskStatus.FAILED, 0, "任务执行失败"); - taskVO.setErrorMessage(e.getMessage()); - taskVO.setCompleteTime(LocalDateTime.now()); - } finally { - // 清理回调(仅清理内存中的回调) - progressCallbacks.remove(taskId); - // 注意:Redis中的任务数据保留,供后续查询,24小时后自动过期 + taskVO = getTaskFromRedis(taskId); + if (taskVO != null) { + taskVO.setErrorMessage(e.getMessage()); + taskVO.setCompleteTime(LocalDateTime.now()); + saveTaskToRedis(taskVO); + } + updateTaskProgress(taskId, AsyncTaskStatus.FAILED, 0, "任务执行失败: " + e.getMessage()); } return CompletableFuture.completedFuture(null); @@ -134,7 +159,7 @@ public class ProjectInitAsyncServiceImpl implements ProjectInitAsyncService { * 更新任务进度 */ private void updateTaskProgress(String taskId, AsyncTaskStatus status, int progress, String message) { - ProjectInitTaskVO taskVO = taskStore.get(taskId); + ProjectInitTaskVO taskVO = getTaskFromRedis(taskId); if (taskVO == null) { return; } @@ -148,26 +173,45 @@ public class ProjectInitAsyncServiceImpl implements ProjectInitAsyncService { taskVO.setStartTime(LocalDateTime.now()); } - // 更新内存存储 - taskStore.put(taskId, taskVO); + // 更新 Redis + saveTaskToRedis(taskVO); - // 触发进度回调 - Consumer callback = progressCallbacks.get(taskId); - if (callback != null) { - try { - callback.accept(taskVO); - } catch (Exception e) { - log.warn("进度回调执行失败, taskId: {}", taskId, e); - } - } + // 通过 SSE 推送进度给用户 + pushProgressToUser(taskVO, status); log.debug("任务进度更新, taskId: {}, status: {}, progress: {}%, message: {}", taskId, status.getCode(), progress, message); } + /** + * 通过 SSE 推送进度给用户 + */ + private void pushProgressToUser(ProjectInitTaskVO taskVO, AsyncTaskStatus status) { + if (taskVO.getUserId() == null) { + return; + } + + String userId = String.valueOf(taskVO.getUserId()); + + // 判断任务是否结束 + boolean isFinished = status == AsyncTaskStatus.COMPLETED || + status == AsyncTaskStatus.FAILED || + status == AsyncTaskStatus.CANCELLED; + + // 推送进度消息 + SseMessage message = SseMessage.of(MESSAGE_TYPE, "progress", userId, taskVO); + sseChannelManager.send(userId, message); + + // 任务结束时推送完成消息 + if (isFinished) { + SseMessage completeMessage = SseMessage.of(MESSAGE_TYPE, "complete", userId, taskVO); + sseChannelManager.send(userId, completeMessage); + } + } + @Override public ProjectInitTaskVO getTaskStatus(String taskId) { - ProjectInitTaskVO taskVO = taskStore.get(taskId); + ProjectInitTaskVO taskVO = getTaskFromRedis(taskId); if (taskVO == null) { return null; } @@ -177,7 +221,7 @@ public class ProjectInitAsyncServiceImpl implements ProjectInitAsyncService { @Override public ProjectInitResult getTaskResult(String taskId) { - ProjectInitTaskVO taskVO = taskStore.get(taskId); + ProjectInitTaskVO taskVO = getTaskFromRedis(taskId); if (taskVO == null || !AsyncTaskStatus.COMPLETED.getCode().equals(taskVO.getStatus())) { return null; } @@ -186,7 +230,7 @@ public class ProjectInitAsyncServiceImpl implements ProjectInitAsyncService { @Override public boolean cancelTask(String taskId) { - ProjectInitTaskVO taskVO = taskStore.get(taskId); + ProjectInitTaskVO taskVO = getTaskFromRedis(taskId); if (taskVO == null) { return false; } @@ -196,8 +240,7 @@ public class ProjectInitAsyncServiceImpl implements ProjectInitAsyncService { AsyncTaskStatus.PROCESSING.getCode().equals(taskVO.getStatus())) { updateTaskProgress(taskId, AsyncTaskStatus.CANCELLED, 0, "任务已取消"); taskVO.setCompleteTime(LocalDateTime.now()); - taskStore.put(taskId, taskVO); - progressCallbacks.remove(taskId); + saveTaskToRedis(taskVO); log.info("任务已取消, taskId: {}", taskId); return true; } @@ -207,26 +250,85 @@ public class ProjectInitAsyncServiceImpl implements ProjectInitAsyncService { @Override public void cleanExpiredTasks(int expireHours) { - // 清理已完成的任务,释放内存 - LocalDateTime expireTime = LocalDateTime.now().minusHours(expireHours); - int count = 0; - for (Map.Entry entry : taskStore.entrySet()) { - ProjectInitTaskVO task = entry.getValue(); - if (task.getCompleteTime() != null && task.getCompleteTime().isBefore(expireTime)) { - taskStore.remove(entry.getKey()); - progressCallbacks.remove(entry.getKey()); - count++; - } - } - log.info("已清理 {} 个过期任务", count); + // Redis 会自动过期,无需手动清理 + log.info("Redis任务将自动过期"); } + @Override + public List getTasksByUserId(Long userId) { + if (userId == null) { + return new ArrayList<>(); + } + + String userTasksKey = getUserTasksKey(userId); + Set taskIds = redisService.sMembers(userTasksKey); + + if (taskIds == null || taskIds.isEmpty()) { + return new ArrayList<>(); + } + + return taskIds.stream() + .map(this::getTaskFromRedis) + .filter(Objects::nonNull) + .sorted(Comparator.comparing(ProjectInitTaskVO::getCreateTime).reversed()) + .map(this::copyTaskVO) + .collect(Collectors.toList()); + } + + @Override + public int getProcessingTaskCount(Long userId) { + if (userId == null) { + return 0; + } + + List tasks = getTasksByUserId(userId); + return (int) tasks.stream() + .filter(task -> AsyncTaskStatus.PENDING.getCode().equals(task.getStatus()) + || AsyncTaskStatus.PROCESSING.getCode().equals(task.getStatus())) + .count(); + } + + // ==================== Redis 操作方法 ==================== + + /** + * 从 Redis 获取任务 + */ + private ProjectInitTaskVO getTaskFromRedis(String taskId) { + String key = getTaskKey(taskId); + return redisService.get(key); + } + + /** + * 保存任务到 Redis + */ + private void saveTaskToRedis(ProjectInitTaskVO taskVO) { + String key = getTaskKey(taskVO.getTaskId()); + redisService.set(key, taskVO, TASK_EXPIRE_DURATION); + } + + /** + * 获取任务存储 key + */ + private String getTaskKey(String taskId) { + return TASK_KEY_PREFIX + taskId; + } + + /** + * 获取用户任务列表 key + */ + private String getUserTasksKey(Long userId) { + return USER_TASKS_KEY_PREFIX + userId + ":tasks"; + } + + // ==================== 工具方法 ==================== + /** * 复制任务VO */ private ProjectInitTaskVO copyTaskVO(ProjectInitTaskVO source) { ProjectInitTaskVO copy = new ProjectInitTaskVO(); copy.setTaskId(source.getTaskId()); + copy.setUserId(source.getUserId()); copy.setStatus(source.getStatus()); copy.setStatusDesc(source.getStatusDesc()); copy.setProgress(source.getProgress()); diff --git a/src/main/resources/application-dev.yaml b/src/main/resources/application-dev.yaml index 07cace1..e1817f1 100644 --- a/src/main/resources/application-dev.yaml +++ b/src/main/resources/application-dev.yaml @@ -19,6 +19,19 @@ spring: min-idle: 0 max-wait: -1ms + # Redisson 配置 + redis: + redisson: + config: | + singleServerConfig: + address: "redis://10.200.8.25:16379" + database: 0 + connectionMinimumIdleSize: 1 + connectionPoolSize: 10 + idleConnectionTimeout: 10000 + connectTimeout: 10000 + timeout: 3000 + # PostgreSQL 数据库配置 datasource: url: jdbc:postgresql://10.200.8.25:5432/aiprojectmanager