SpringBoot多线程处理任务列表
启动类开启
@EnableAsync注解配置线程池(根据实际情况调整)
@Configuration public class ThreadPoolConfig { /** * 默认线程池线程池 * * @return Executor */ @Bean("DefaultThreadPool") public ThreadPoolTaskExecutor defaultThreadPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //核心线程数目 executor.setCorePoolSize(12); //指定最大线程数 executor.setMaxPoolSize(12); //队列中最大的数目 executor.setQueueCapacity(12); //线程名称前缀 executor.setThreadNamePrefix("DefaultThreadPool_"); //rejection-policy:当pool已经达到max size的时候,如何处理新任务 //CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行 //对拒绝task的处理策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //线程空闲后的最大存活时间 executor.setKeepAliveSeconds(60); //加载 executor.initialize(); return executor; } }任务处理处理实现
@Service @Slf4j public class CustomAsyncService { @Resource(name = "DefaultThreadPool") private ThreadPoolTaskExecutor threadPoolTaskExecutor; public <T, R> List<R> asyncSubmitList(List<T> dataList, Function<T, R> applyMethod) { // 使用CompletionService来管理任务的执行和结果获取 CompletionService<R> completionService = new ExecutorCompletionService<>(threadPoolTaskExecutor); // 提交任务 for (T data : dataList) { completionService.submit(() -> applyMethod.apply(data)); } // 查询任务执行的结果 List<R> resultList = new ArrayList<>(); int numTasks = dataList.size(); for (int i = 0; i < numTasks; i++) { try { Future<R> future = completionService.take(); // 获取已完成的任务的Future resultList.add(future.get()); // 获取任务执行结果并添加到结果列表中 } catch (InterruptedException | ExecutionException e) { log.error("Error occurred when executing async task", e); i--; // 任务执行失败,重试一次 } } return resultList; } }
