[ Java ]CrudBoy 想请教一个多线程处理的问题

2019-07-12 10:20:04 +08:00
 rykinia

想用多线程把数据库的数据写入 elasticsearch

如果发生异常,要立即终止整个循环,所以用了 Future

代码如下

private Result loadDataFromDbIntoEs(Long maxId) {
    ExecutorService pool = Executors.newWorkStealingPool();
    LinkedTransferQueue<Future<Result>> futureQueue = new LinkedTransferQueue<>();
    try {
        //遍历数据库的表
        for (long i = 0; i <= maxId; i += getDbPageSize()) {
            //创建任务
            Callable<Result> task = createTask(i);
            //任务入队
            futureQueue.put(pool.submit(task));

            //队列超过一定长度后,先执行掉一部分再继续
            if (futureQueue.size() > QUEUE_SIZE * 8) {
                if ((i % 100000) == 0) {
                    log.debug("{} - Iterating future list, {} of {}", esEntityClassName, i, maxId);
                }
                //执行一部分任务
                checkFuture(futureQueue);
            }
        }
        pool.shutdown();
        //处理队列中剩余的任务
        while (!futureQueue.isEmpty()) {
            checkFuture(futureQueue);
        }
        log.info("{} - sync complete", esEntityClassName);
    } catch (SyncException | InterruptedException | ExecutionException e) {
        //throw...
    }
    return Result.ok();
}


/**
 * 创建任务
 */
private Callable<Result> createTask(Long currentId) {
    return () -> {
        List<D> dbList = dbRepository.findByIdBetween(currentId, currentId + getDbPageSize() - 1);
        if (dbList.isEmpty()) {
            //忽略没有数据的 id 区间
            return Result.ok();
        }

        //写入 es
        return bulkCreate(dbListToEsList(dbList));
    };
}


/**
 * 消费任务
 */
private void checkFuture(LinkedTransferQueue<Future<Result>> futureQueue) throws ExecutionException, InterruptedException {
    for (int i = 0; i < QUEUE_SIZE; i++) {
        Future<Result> future = futureQueue.poll();
        if (future != null) {
            Result result = future.get();
            if (!Result.REQUEST_SUCCESS.equals(result.getStatus())) {
                throw new SyncException(result.getMessage());
            }
        }
    }
}

现在的问题是,服务器 16 核,cpu 占用率并不高,大多数时候只有 es 的进程占了 20% 不知道是哪里有问题导致效率太低?

1222 次点击
所在节点    问与答
4 条回复
gosansam
2019-07-12 11:49:43 +08:00
Result result = future.get();
这个获取结果是阻塞的
zhady009
2019-07-12 13:30:03 +08:00
guava 的 ListeningExecutorService submit 返回 ListenableFuture 应该可以解决
softtwilight
2019-07-12 13:49:47 +08:00
可以试试用 completableFuture,checkFuture 可以改 join,如果 io 耗时多,Executors.newWorkStealingPool() 的线程可能有点少,completableFuture 也可以自定义线程池
rykinia
2019-07-16 14:33:52 +08:00
谢谢各位的解答。
研究了几天,感觉主要问题还是 es 的阻塞比较久,不过把这里改成了主线程消费 queue 里面的,线程池里异步往 queue 添加,稍微好了点。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/582262

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX