想用多线程把数据库的数据写入 elasticsearch
如果发生异常,要立即终止整个循环,所以用了 Future
代码如下
private Result loadDataFromDbIntoEs(Long maxId) {
ExecutorService pool = Executors.newWorkStealingPool();
LinkedTransferQueue<Future<Result>> futureQueue = new LinkedTransferQueue<>();
try {
//遍历数据库的表
for (long i = 0; i <= maxId; i += getDbPageSize()) {
//创建任务
Callable<Result> task = createTask(i);
//任务入队
futureQueue.put(pool.submit(task));
//队列超过一定长度后,先执行掉一部分再继续
if (futureQueue.size() > QUEUE_SIZE * 8) {
if ((i % 100000) == 0) {
log.debug("{} - Iterating future list, {} of {}", esEntityClassName, i, maxId);
}
//执行一部分任务
checkFuture(futureQueue);
}
}
pool.shutdown();
//处理队列中剩余的任务
while (!futureQueue.isEmpty()) {
checkFuture(futureQueue);
}
log.info("{} - sync complete", esEntityClassName);
} catch (SyncException | InterruptedException | ExecutionException e) {
//throw...
}
return Result.ok();
}
/**
* 创建任务
*/
private Callable<Result> createTask(Long currentId) {
return () -> {
List<D> dbList = dbRepository.findByIdBetween(currentId, currentId + getDbPageSize() - 1);
if (dbList.isEmpty()) {
//忽略没有数据的 id 区间
return Result.ok();
}
//写入 es
return bulkCreate(dbListToEsList(dbList));
};
}
/**
* 消费任务
*/
private void checkFuture(LinkedTransferQueue<Future<Result>> futureQueue) throws ExecutionException, InterruptedException {
for (int i = 0; i < QUEUE_SIZE; i++) {
Future<Result> future = futureQueue.poll();
if (future != null) {
Result result = future.get();
if (!Result.REQUEST_SUCCESS.equals(result.getStatus())) {
throw new SyncException(result.getMessage());
}
}
}
}
现在的问题是,服务器 16 核,cpu 占用率并不高,大多数时候只有 es 的进程占了 20% 不知道是哪里有问题导致效率太低?
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.