public class Code {
// 执行外层任务的线程池
static ExecutorService outerExecutor = Executors.newFixedThreadPool(8);
// 执行内层任务的线程池
static ExecutorService innerExecutor = Executors.newFixedThreadPool(16);
// 任务总数
static AtomicInteger taskCount = new AtomicInteger();
static String url = "url";
static Random random = ThreadLocalRandom.current();
public static void optimization() {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
// 模拟任务
final int maxTask = random.nextInt(1000);
System.out.println("外层总任务数:" + maxTask);
List<String> list = IntStream.rangeClosed(1, maxTask).mapToObj(String::valueOf).collect(Collectors.toList());
// 50 个任务一组
final List<List<String>> partition = Lists.partition(list, 50);
System.out.println("拆分任务数量:" + partition.size());
partition.parallelStream()
.map(task -> CompletableFuture.runAsync(new OuterTask(task), outerExecutor))
.forEach(CompletableFuture::join);
System.out.println("taskCount = " + taskCount);
stopWatch.stop();
System.out.println("耗时:" + stopWatch.getTotalTimeSeconds());
innerExecutor.shutdown();
outerExecutor.shutdown();
}
private static class OuterTask implements Runnable {
private final List<String> tasks;
public OuterTask(List<String> tasks) {
this.tasks = tasks;
}
@
Override public void run() {
tasks.parallelStream()
.map(task -> CompletableFuture.runAsync(new InnerTask(task), innerExecutor))
.forEach(CompletableFuture::join);
}
}
private static class InnerTask implements Runnable {
private final String body;
public InnerTask(String body) {
this.body = body;
}
@
Override public void run() {
final List<String> responseResult =
HttpRequest.post(url).body(body);
for (String aParam : responseResult) {
final String bParam = functionA(aParam);
final String cParam = functionB(bParam);
final String result = functionC(cParam);
// handle result...
taskCount.incrementAndGet();
}
}
}
}
考虑不周,仅作参考。。。