• 请不要在回答技术问题时复制粘贴 AI 生成的内容
luxinfl
V2EX  ›  程序员

发现了个很奇怪的现象,关于 parallelStream 的

  •  
  •   luxinfl · Oct 28, 2021 · 2599 views
    This topic created in 1664 days ago, the information mentioned may be changed or developed.

    这个接口的处理类似这样

    List<Data> dataList = xxx.paralleStream().map(e->dealData(e)).collect(Collector.toList());
    

    其中有的 dealData 会用 http 调用外部接口。我在每个 dealData 都打印了时间,基本都在 50ms 左右徘徊。按理说这个方法的处理时间应该是耗时最长的那个,但是最终结果却差的很远。这个接口除了并发流基本就没什么操作了。服务器间的时延可以忽略不计。好神奇

    Supplement 1  ·  Oct 28, 2021
    public static void test1() throws ExecutionException, InterruptedException {
            ForkJoinPool forkJoinPool = new ForkJoinPool(4);
            for( int i=0;i<1000;i++) {
                int index = i;
                long time = System.currentTimeMillis();
                List<String> list = forkJoinPool.submit(() -> Stream.of(ary).parallel().map(s -> getString(index)).collect(Collectors.toList())).get();
                System.out.println("cost time :" + (System.currentTimeMillis() - time));
    
            }   
        }
        
        public static void test2(){
            for( int i=0;i<1000;i++) {
                int index = i;
                long time = System.currentTimeMillis();
                List<String> list = Stream.of(ary).parallel().map(s -> getString(index)).collect(Collectors.toList());
                System.out.println("cost time :" + (System.currentTimeMillis() - time));
    
            }
        }
    
    
        public static String getString(int index) {
            int i = new Random().nextInt(50);
            try {
                Thread.sleep(i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + ":" +index + "cost time " + i);
            return "";
        }
    

    试着跑了一下,test1自定义线程池之后,会一直用到池里面的线程。 test2用的共享池,竟然还会用主线程main在跑。所以时间差距是不是在这块。

    Supplement 2  ·  Oct 29, 2021
    public static void test1() throws ExecutionException, InterruptedException {
            ForkJoinPool forkJoinPool = new ForkJoinPool(30);
            long time = System.currentTimeMillis();
            List<String> list = forkJoinPool.submit(() -> Stream.of(ary).parallel().map(s -> getString()).collect(Collectors.toList())).get();
            System.out.println("cost time :" + (System.currentTimeMillis() - time));
    
        }
    
    public static String getString() {
            int i = new Random().nextInt(50);
            try {
                Thread.sleep(i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "";
        }
    

    我用jmeter20并发压测发现,这个性能也并不是很好。要比50ms多两倍多

    13 replies    2021-11-01 22:27:55 +08:00
    Jooooooooo
        1
    Jooooooooo  
       Oct 28, 2021
    相差很远啥意思, 怎么统计的耗时?
    aureole999
        2
    aureole999  
       Oct 28, 2021
    这个只是告诉 java 你这个流可以并行执行,不代表一定要并行执行,更不代表所有的子项全都同时并行,或者说根本不会所有子项都同时执行。具体好像是通过 ForkJoinPool 线程池取得线程执行,默认线程数不超过你的 cpu 的个数。你在每个 dealData 打出当前线程看就知道了。
    luxinfl
        3
    luxinfl  
    OP
       Oct 28, 2021
    @Jooooooooo 意思是就好像没有并行一样
    @aureole999 还有这种说法么
    Jooooooooo
        4
    Jooooooooo  
       Oct 28, 2021
    @luxinfl 很有可能是 2 楼说的问题
    546L5LiK6ZOt
        5
    546L5LiK6ZOt  
       Oct 28, 2021 via iPhone
    我也遇到过 2 楼说的问题,一般都是自定义一个 forkjoinpool
    dqzcwxb
        6
    dqzcwxb  
       Oct 29, 2021
    Fork/Join 的策略就是会用主线程跑看源码就知道了
    luxinfl
        7
    luxinfl  
    OP
       Oct 29, 2021
    @dqzcwxb 这个以前用得少,没注意过。。而且我发现用 jmeter 压测,ForkJoinPool 的性能并不是太好的样子。。和线程数有很大关系
    dqzcwxb
        8
    dqzcwxb  
       Oct 29, 2021   ❤️ 1
    @luxinfl #7 parallelStream 默认是 cpu 核心数的线程数,而且只推荐 cpu 密集型运算时使用
    io 密集型请使用 Completablefuture+Fork/Join 线程池
    ForkJoinPool 因为有工作窃取机制性能比其他线程池要高得多,多测试即可知道
    dqzcwxb
        9
    dqzcwxb  
       Oct 29, 2021
    luxinfl
        10
    luxinfl  
    OP
       Oct 29, 2021 via Android
    @dqzcwxb 主要是有多个 io 调用,而且 parallel 并发调用写起来方便。。没太搞明白 CompletableFuture 怎么写才能并发调用。要写多个 supplyAsync 调用麽
    ZeroDu
        11
    ZeroDu  
       Oct 29, 2021
    确实如楼上所说的,forkjoinpool 线程数默认是 cpu 核心数,而且这个只适合 CPU 密集型任务。贴主这种情况 io 密集型得用 CompletableFuture 但是这个写起来当然么有 parallelStream 丝滑
    dqzcwxb
        12
    dqzcwxb  
       Oct 29, 2021
    @luxinfl #10 不要再使用 parallelStream 写 io 操作因为它使用的是 forkjoinpool 是全局共用的,如果出现 io 阻塞会直接影响到其他使用 parallelStream 的代码
    CompletableFuture 值得你花时间学习
    luxinfl
        13
    luxinfl  
    OP
       Nov 1, 2021
    @ZeroDu 我现在新接口用的 CompletableFutrue ,自定义线程城市,用了 join 方法,但是发现还是没啥效果
    @dqzcwxb 我有自定义线程池
    About   ·   Help   ·   Advertise   ·   Blog   ·   API   ·   FAQ   ·   Solana   ·   5293 Online   Highest 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 54ms · UTC 07:06 · PVG 15:06 · LAX 00:06 · JFK 03:06
    ♥ Do have faith in what you're doing.