App下載

parallelStream的坑,不踩不知道,一踩嚇一跳

猿友 2020-09-15 14:14:23 瀏覽數(shù) (2986)
反饋

文章來源于公眾號:小姐姐味道

很多同學(xué)喜歡使用lambda表達(dá)式,它允許你定義短小精悍的函數(shù),體現(xiàn)你高超的編碼水平。當(dāng)然,這個功能在某些以代碼行數(shù)來衡量工作量的公司來說,就比較吃虧一些。

比如下面的代碼片段,讓人閱讀的時候就像是讀詩一樣。但是一旦用不好,也是會要命的。

List<Integer> transactionsIds =
widgets.stream()
             .filter(b -> b.getColor() == RED)
             .sorted((x,y) -> x.getWeight() - y.getWeight())
             .mapToInt(Widget::getWeight)
             .sum();

這段代碼有一個關(guān)鍵的函數(shù),那就是stream。通過它,可以將一個普通的 list ,轉(zhuǎn)化為流,然后就可以使用類似于管道的方式對 list 進(jìn)行操作。總之,用過的都說好。

問題來了

假如我們把stream換成parallelStream,會發(fā)生什么情況?

根據(jù)字面上的意思,流會從串行 變成并行

既然是并行,那用屁股想一想,就知道這里面肯定會有線程安全問題。不過我們這里討論的并不是要你使用線程安全的集合,這個話題太低級。現(xiàn)階段,知道在線程不安全的環(huán)境中使用線程安全的集合,已經(jīng)是一個基本的技能。

這次踩坑的地方,是并行流的性能問題。

我們用代碼來說話。

下面的代碼,開啟了8個線程,這8個線程都在使用并行流進(jìn)行數(shù)據(jù)計算。在執(zhí)行的邏輯中,我們讓每個任務(wù)都 sleep 1秒鐘,這樣就能夠模擬一些 I/O 請求的耗時等待。

使用stream,程序會在30秒后返回,但我們期望程序能夠在1秒多返回,因為它是并行流,得對得起這個稱號。

測試發(fā)現(xiàn),我們等了好久,任務(wù)才執(zhí)行完畢。

static void paralleTest() {
    List<Integer> numbers = Arrays.asList(
            0, 1, 2, 3, 4, 5, 6, 7, 8, 9,
            10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
            20, 21, 22, 23, 24, 25, 26, 27, 28, 29
    );
    final long begin = System.currentTimeMillis();
    numbers.parallelStream().map(k -> {
        try {
            Thread.sleep(1000);
            System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return k;
    }).collect(Collectors.toList());
}


public static void main(String[] args) {
//    System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
    new Thread(() -> paralleTest()).start();
    new Thread(() -> paralleTest()).start();
    new Thread(() -> paralleTest()).start();
    new Thread(() -> paralleTest()).start();
    new Thread(() -> paralleTest()).start();
    new Thread(() -> paralleTest()).start();
    new Thread(() -> paralleTest()).start();
    new Thread(() -> paralleTest()).start();
}

實際上,在不同的機器上執(zhí)行,這段代碼花費的時間都不一樣。

既然是并行,那肯定得有個并行度。太低了,體現(xiàn)不到并行的能能力;太大了,又浪費了上下文切換的時間。我是很沮喪的發(fā)現(xiàn),很多高級研發(fā),將線程池的各種參數(shù)背的滾瓜爛熟,各種調(diào)優(yōu),竟然敢睜一只眼閉一只眼的在 I/O 密集型業(yè)務(wù)中用上parallelStream

要了解這個并行度,我們需要查看具體的構(gòu)造方法。在ForkJoinPool類中找到這樣的代碼。

try {  // ignore exceptions in accessing/parsing properties
    String pp = System.getProperty
        ("java.util.concurrent.ForkJoinPool.common.parallelism");
    if (pp != null)
        parallelism = Integer.parseInt(pp);
    fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty(
        "java.util.concurrent.ForkJoinPool.common.threadFactory");
    handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty(
        "java.util.concurrent.ForkJoinPool.common.exceptionHandler");
} catch (Exception ignore) {
}


if (fac == null) {
    if (System.getSecurityManager() == null)
        fac = defaultForkJoinWorkerThreadFactory;
    else // use security-managed default
        fac = new InnocuousForkJoinWorkerThreadFactory();
}
if (parallelism < 0 && // default 1 less than #cores
    (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
    parallelism = 1;
if (parallelism > MAX_CAP)
    parallelism = MAX_CAP;

可以看到,并行度到底是多少,是由下面的參數(shù)來控制的。如果無法獲取這個參數(shù),則默認(rèn)使用 CPU個數(shù)-1 的并行度。

可以看到,這個函數(shù)是為了計算密集型業(yè)務(wù)去設(shè)計的。如果你喂給它一大堆任務(wù),它就會由并行執(zhí)行退變成類似于串行的效果。

-Djava.util.concurrent.ForkJoinPool.common.parallelism=N

即使你使用-Djava.util.concurrent.ForkJoinPool.common.parallelism=N設(shè)置了一個初始值大小,它依然有問題。

因為,parallelism這個變量是 final 的,一旦設(shè)定,不允許修改。也就是說,上面的參數(shù)只會生效一次。

張三可能使用下面的代碼,設(shè)置了并行度大小為20。

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");

李四可能用同樣的方式,設(shè)置了這個值為30。那實際在項目中用的是哪個值,那就得問 JVM 是怎么加載的類信息了。

這種方式并不太非??孔V。

一種解決方式

我們可以通過提供外置的forkjoinpool,也就是改變提交方式,來實現(xiàn)不同類型的任務(wù)分離。

代碼如下所示,通過顯式的代碼提交,即可實現(xiàn)任務(wù)分離。

ForkJoinPool pool = new ForkJoinPool(30);


final long begin = System.currentTimeMillis();
try {
    pool.submit(() ->
            numbers.parallelStream().map(k -> {
                try {
                    Thread.sleep(1000);
                    System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return k;
            }).collect(Collectors.toList())).get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

這樣,不同的場景,就可以擁有不同的并行度。這種方式和CountDownLatch有異曲同工之妙,我們需要手動管理資源。

使用了這種方式,代碼量增加,已經(jīng)和優(yōu)雅關(guān)系不大了,不僅不優(yōu)雅,而且丑的要命。白天鵝變成了丑小鴨,你還會愛它么?

以上就是W3Cschool編程獅關(guān)于parallelStream的坑,不踩不知道,一踩嚇一跳的相關(guān)介紹了,希望對大家有所幫助。

0 人點贊