业务上提出了一个流程的批量发起,要求在 1 分钟内异步发起 1000 个任务。因此构造了如下的线程池模型 1 、调度线程池 单例、防止并发过多
//调度线程池
public static final ThreadPoolTaskExecutor dispatchThreadPool = threadPoolDispatchTaskExecutor();
private static ThreadPoolTaskExecutor threadPoolDispatchTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(5);
threadPoolTaskExecutor.setMaxPoolSize(10);
threadPoolTaskExecutor.setQueueCapacity(10);
threadPoolTaskExecutor.setThreadNamePrefix("all-task-dispatch-");
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
2 、任务执行线程池、每次执行获取
public static ThreadPoolTaskExecutor threadPoolHandleTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(5);
threadPoolTaskExecutor.setMaxPoolSize(10);
threadPoolTaskExecutor.setQueueCapacity(10);
threadPoolTaskExecutor.setThreadNamePrefix("task-handler-");
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
3 、 执行任务方法
public void doSomething(List<String> taskList) {
ThreadPoolTaskExecutor threadPoolTaskExecutor = ThreadPoolConfig.threadPoolHandleTaskExecutor();
//分配处理线程池的 max + queue 可以充分利用线程,防止进入拒绝策略
List<List<String>> taskGroup = ListUtil.split(taskList, threadPoolTaskExecutor.getMaxPoolSize() + threadPoolTaskExecutor.getQueueSize());
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (List<String> taskMembers : taskGroup) {
for (String task : taskMembers) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(5000);
log.info("执行完任务 :{}", task);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, threadPoolTaskExecutor).exceptionally(e -> {
log.error("异常: {}, {}", e.getMessage(), e);
return null;
});
log.debug("分发 task: {}, 完毕", task);
futures.add(future);
}
//阻塞每组任务线程 防止超发
log.debug("阻塞每组任务线程 防止超发");
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).join();
log.debug("每组任务线程执行结束");
}
log.debug("执行结束");
threadPoolTaskExecutor.shutdown();
}
4 、 调度线程池分发任务
TaskHandler taskHandler = new TaskHandler();
ThreadPoolTaskExecutor dispatch = ThreadPoolConfig.dispatchThreadPool;
dispatch.submit(
() -> taskHandler.doSomething(taskList)
);
这样设计,经过测试是可以达到 1 分钟 1000 条任务、但不知道是否合理,还有如果多个并发的开销是否过大
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.