关于手动创建线程池的开销

2022-10-28 23:17:50 +08:00
 uselessVisitor

业务上提出了一个流程的批量发起,要求在 1 分钟内异步发起 1000 个任务。因此构造了如下的线程池模型 1 、调度线程池 单例、防止并发过多


 //调度线程池
    public static final ThreadPoolTaskExecutor dispatchThreadPool = threadPoolDispatchTaskExecutor();

    private static ThreadPoolTaskExecutor threadPoolDispatchTaskExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(5);
        threadPoolTaskExecutor.setMaxPoolSize(10);
        threadPoolTaskExecutor.setQueueCapacity(10);
        threadPoolTaskExecutor.setThreadNamePrefix("all-task-dispatch-");
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }


2 、任务执行线程池、每次执行获取

public static ThreadPoolTaskExecutor threadPoolHandleTaskExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(5);
        threadPoolTaskExecutor.setMaxPoolSize(10);
        threadPoolTaskExecutor.setQueueCapacity(10);
        threadPoolTaskExecutor.setThreadNamePrefix("task-handler-");
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }

3 、 执行任务方法


public void doSomething(List<String> taskList) {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = ThreadPoolConfig.threadPoolHandleTaskExecutor();
        //分配处理线程池的 max + queue 可以充分利用线程,防止进入拒绝策略
        List<List<String>> taskGroup = ListUtil.split(taskList, threadPoolTaskExecutor.getMaxPoolSize() + threadPoolTaskExecutor.getQueueSize());
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        for (List<String> taskMembers : taskGroup) {
            for (String task : taskMembers) {
                CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                    try {
                        Thread.sleep(5000);
                        log.info("执行完任务 :{}", task);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }, threadPoolTaskExecutor).exceptionally(e -> {
                    log.error("异常: {}, {}", e.getMessage(), e);
                    return null;
                });
                log.debug("分发 task: {}, 完毕", task);
                futures.add(future);
            }
            //阻塞每组任务线程 防止超发
            log.debug("阻塞每组任务线程 防止超发");
            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).join();
            log.debug("每组任务线程执行结束");
        }
        log.debug("执行结束");
        threadPoolTaskExecutor.shutdown();
    }

4 、 调度线程池分发任务


TaskHandler taskHandler = new TaskHandler();
        ThreadPoolTaskExecutor dispatch = ThreadPoolConfig.dispatchThreadPool;
        dispatch.submit(
                () -> taskHandler.doSomething(taskList)
        );

这样设计,经过测试是可以达到 1 分钟 1000 条任务、但不知道是否合理,还有如果多个并发的开销是否过大

441 次点击
所在节点    问与答
2 条回复
zhlxsh
2022-10-28 23:39:58 +08:00
是不是应该看 CPU 上下文切换?然后判断最大的创建线程数量?
uselessVisitor
2022-10-28 23:51:27 +08:00
@zhlxsh 这个我想要看上线后的效果再调整。。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/890857

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX