App下載

帶你快速搞定Java并發(fā)隊(duì)列的內(nèi)容

櫥窗的光 2021-08-17 15:26:48 瀏覽數(shù) (1743)
反饋

前言

如果按照用途與特性進(jìn)行粗略的劃分,JUC 包中包含的工具大體可以分為 6 類:

  1. 執(zhí)行者與線程池
  2. 并發(fā)隊(duì)列
  3. 同步工具
  4. 并發(fā)集合
  5. 原子變量

在并發(fā)系列中,主要講解了 執(zhí)行者與線程池,同步工具,鎖 , 在分析源碼時(shí),或多或少的提及到了「隊(duì)列」,隊(duì)列在 JUC 中也是多種多樣存在,所以本文就以「遠(yuǎn)看」視角,幫助大家快速了解與區(qū)分這些看似「雜亂」的隊(duì)列

并發(fā)隊(duì)列

Java 并發(fā)隊(duì)列按照實(shí)現(xiàn)方式來(lái)進(jìn)行劃分可以分為 2 種:

  1. 阻塞隊(duì)列
  2. 非阻塞隊(duì)列

如果你已經(jīng)看完并發(fā)系列鎖的實(shí)現(xiàn),你已經(jīng)能夠知道他們實(shí)現(xiàn)的區(qū)別:

前者就是基于鎖實(shí)現(xiàn)的,后者則是基于 CAS 非阻塞算法實(shí)現(xiàn)的

常見(jiàn)的隊(duì)列有下面這幾種:

瞬間懵逼?看到這個(gè)沒(méi)有人性的圖想直接走人? 客觀先別急,一會(huì)就柳暗花明了

當(dāng)下你也許有個(gè)問(wèn)題:

為什么會(huì)有這么多種隊(duì)列的存在?

鎖有應(yīng)對(duì)各種情形的鎖,隊(duì)列也自然有應(yīng)對(duì)各種情形的隊(duì)列了, 是不是也有點(diǎn)單一職責(zé)原則的意思呢?

所以我們要了解這些隊(duì)列到底是怎么設(shè)計(jì)的?以及用在了哪些地方?

先來(lái)看下圖

如果你在 IDE 中打開(kāi)以上非阻塞隊(duì)列和阻塞隊(duì)列,查看其實(shí)現(xiàn)方法,你就會(huì)發(fā)現(xiàn),阻塞隊(duì)列較非阻塞隊(duì)列 額外支持兩種操作:

  1. 阻塞的插入 當(dāng)隊(duì)列滿時(shí),隊(duì)列會(huì)阻塞插入元素的線程,直到隊(duì)列不滿
  2. 阻塞的移除 當(dāng)隊(duì)列為空時(shí),獲取元素的線程會(huì)阻塞,直到隊(duì)列變?yōu)榉强?/li>

綜合說(shuō)明入隊(duì)/出隊(duì)操作,看似雜亂的方法,用一個(gè)表格就能概括了

拋出異常

  • 當(dāng)隊(duì)列滿時(shí),此時(shí)如果再向隊(duì)列中插入元素,會(huì)拋出 IllegalStateException (這很好理解)
  • 當(dāng)隊(duì)列空時(shí),此時(shí)如果再?gòu)年?duì)列中獲取元素,會(huì)拋出 NoSuchElementException (這也很好理解)

返回特殊值

  • 當(dāng)向隊(duì)列插入元素時(shí),會(huì)返回元素是否插入成功,成功則返回 true
  • 當(dāng)從隊(duì)列移除元素時(shí),如果沒(méi)有則返回 null

一直阻塞

  • 當(dāng)隊(duì)列滿時(shí),如果生產(chǎn)者線程向隊(duì)列 put 元素,隊(duì)列會(huì)一直阻塞生產(chǎn)者線程,直到隊(duì)列可用或者響應(yīng)中斷退出
  • 當(dāng)隊(duì)列為空時(shí),如果消費(fèi)者線程 從隊(duì)列里面 take 元素,隊(duì)列會(huì)阻塞消費(fèi)者線程,直到隊(duì)列不為空

關(guān)于阻塞,我們其實(shí)早在 并發(fā)編程之等待通知機(jī)制 就已經(jīng)充分說(shuō)明過(guò)了,你還記得下面這張圖嗎?原理其實(shí)是一樣一樣滴

超時(shí)退出

和鎖一樣,因?yàn)橛凶枞?,為了靈活使用,就一定支持超時(shí)退出,阻塞時(shí)間達(dá)到超時(shí)時(shí)間,就會(huì)直接返回

至于為啥插入和移除這么多種單詞表示形式,我也不知道,為了方便記憶,只需要記住阻塞的方法形式即可:

單詞 put 和 take 字母 t 首位相連,一個(gè)放,一個(gè)拿

到這里你應(yīng)該對(duì) Java 并發(fā)隊(duì)列有了個(gè)初步的認(rèn)識(shí)了,原來(lái)看似雜亂的方法貌似也有了規(guī)律。接下來(lái)就到了瘋狂串知識(shí)點(diǎn)的時(shí)刻了,借助前序章節(jié)的知識(shí),分分鐘就理解全部隊(duì)列了

ArrayBlockingQueue

之前也說(shuō)過(guò),JDK中的命名還是很講究滴,一看這名字,底層就是數(shù)組實(shí)現(xiàn)了,是否有界,那就看在構(gòu)造的時(shí)候是否需要指定 capacity 值了

填鴨式的說(shuō)明也容易忘,這些都是哪看到的呢?在所有隊(duì)列的 Java docs 的第一段,一句話就概括了該隊(duì)列的主要特性,所以強(qiáng)烈建議大家自己在看源碼時(shí),簡(jiǎn)單瞄一眼 docs 開(kāi)頭,心中就有多半個(gè)數(shù)了

在講 Java AQS隊(duì)列同步器以及ReentrantLock的應(yīng)用 時(shí)我們介紹了公平鎖與非公平鎖的概念,ArrayBlockingQueue 也有同樣的概念,看它的構(gòu)造方法,就有 ReentrantLock 來(lái)輔助實(shí)現(xiàn)

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

默認(rèn)情況下,依舊是不保證線程公平訪問(wèn)隊(duì)列(公平與否是指阻塞的線程能否按照阻塞的先后順序訪問(wèn)隊(duì)列,先阻塞線訪問(wèn),后阻塞后訪問(wèn))

到這我也要臨時(shí)問(wèn)一個(gè)說(shuō)過(guò)多次的面試送分題了:

為什么默認(rèn)采用非公平鎖的方式?它較公平鎖方式有什么好處,又可能帶來(lái)哪些問(wèn)題?

知道了以上內(nèi)容,結(jié)合上面表格中的方法,ArrayBlockingQueue 就可以輕松過(guò)關(guān)了

和數(shù)組相對(duì)的自然是鏈表了

LinkedBlockingQueue

LinkedBlockingQueue 也算是一個(gè)有界阻塞隊(duì)列 ,從下面的構(gòu)造函數(shù)中你也可以看出,該隊(duì)列的默認(rèn)和最大長(zhǎng)度為 Integer.MAX_VALUE ,這也就 docs 說(shuō) optionally-bounded 的原因了

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
  if (capacity <= 0) throw new IllegalArgumentException();
  this.capacity = capacity;
  last = head = new Node<E>(null);
}

正如 Java 集合一樣,鏈表形式的隊(duì)列,其存取效率要比數(shù)組形式的隊(duì)列高。但是在一些并發(fā)程序中,數(shù)組形式的隊(duì)列由于具有一定的可預(yù)測(cè)性,因此可以在某些場(chǎng)景中獲得更高的效率

看到 LinkedBlockingQueue 是不是也有些熟悉呢? 為什么要使用線程池? 就已經(jīng)和它多次照面了

創(chuàng)建單個(gè)線程池

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

創(chuàng)建固定個(gè)數(shù)線程池

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

面試送分題又來(lái)了

使用 Executors 創(chuàng)建線程池很簡(jiǎn)單,為什么大廠嚴(yán)格要求禁用這種創(chuàng)建方式呢?

PriorityBlockingQueue

PriorityBlockingQueue 是一個(gè)支持優(yōu)先級(jí)的無(wú)界的阻塞隊(duì)列,默認(rèn)情況下采用自然順序升序排列,當(dāng)然也有非默認(rèn)情況自定義優(yōu)先級(jí),需要排序,那自然要用到 Comparator 來(lái)定義排序規(guī)則了

可以定義優(yōu)先級(jí),自然也就有相應(yīng)的限制,以及使用的注意事項(xiàng)

按照上圖說(shuō)明,隊(duì)列中不允許存在 null 值,也不允許存在不能排序的元素

對(duì)于排序值相同的元素,其序列是不保證的,但你可以繼續(xù)自定義其他可以區(qū)分出來(lái)優(yōu)先級(jí)的值,如果你有嚴(yán)格的優(yōu)先級(jí)區(qū)分,建議有更完善的比較規(guī)則,就像 Java docs 這樣

class FIFOEntry<E extends Comparable<? super E>>
  implements Comparable<FIFOEntry<E>> {
    static final AtomicLong seq = new AtomicLong(0);
    final long seqNum;
    final E entry;
    public FIFOEntry(E entry) {
    seqNum = seq.getAndIncrement();
    this.entry = entry;
  }
  public E getEntry() { return entry; }
  public int compareTo(FIFOEntry<E> other) {
    int res = entry.compareTo(other.entry);
    if (res == 0 && other.entry != this.entry)
    res = (seqNum < other.seqNum ? -1 : 1);
    return res;
  }
}

隊(duì)列容量是沒(méi)有上限的,但是如果插入的元素超過(guò)負(fù)載,有可能會(huì)引起OutOfMemory異常(這是肯定的),這也是為什么我們通常所說(shuō),隊(duì)列無(wú)界,心中有界

PriorityBlockingQueue 也有 put 方法,這是一個(gè)阻塞的方法,因?yàn)樗菬o(wú)界的,自然不會(huì)阻塞,所以就有了下面比較聰明的做法

public void put(E e) {
    offer(e); // never need to block  請(qǐng)自行對(duì)照上面表格
}

可以給定初始容量,這個(gè)容量會(huì)按照一定的算法自動(dòng)擴(kuò)充

// Default array capacity.
private static final int DEFAULT_INITIAL_CAPACITY = 11;

public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}

這里默認(rèn)的容量是 11,由于也是基于數(shù)組,那面試送分題又來(lái)了

你通常是怎樣定義容器/集合初始容量的?有哪些依據(jù)?

DelayQueue

DelayQueue 是一個(gè)支持延時(shí)獲取元素的無(wú)界阻塞隊(duì)列

  • 是否延時(shí)肯定是和某個(gè)時(shí)間(通常和當(dāng)前時(shí)間) 進(jìn)行比較
  • 比較過(guò)后還要進(jìn)行排序,所以也是存在一定的優(yōu)先級(jí)

看到這也許覺(jué)得這有點(diǎn)和 PriorityBlockingQueue 很像,沒(méi)錯(cuò),DelayQueue 的內(nèi)部也是使用 PriorityQueue

上圖綠色框線也告訴你,DelayQueue 隊(duì)列的元素必須要實(shí)現(xiàn) Depayed 接口:

所以從上圖可以看出使用 DelayQueue 非常簡(jiǎn)單,只需要兩步:

實(shí)現(xiàn) getDelay() 方法,返回元素要延時(shí)多長(zhǎng)時(shí)間

public long getDelay(TimeUnit unit) {
  	// 最好采用納秒形式,這樣更精確
    return unit.convert(time - now(), NANOSECONDS);
}

實(shí)現(xiàn) compareTo() 方法,比較元素順序

public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

上面的代碼哪來(lái)的呢?如果你打開(kāi) ScheduledThreadPoolExecutor 里的 ScheduledFutureTask,你就看到了 (ScheduledThreadPoolExecutor 內(nèi)部就是應(yīng)用 DelayQueue)

所以綜合來(lái)說(shuō),下面兩種情況非常適合使用 DelayQueue

  • 緩存系統(tǒng)的設(shè)計(jì):用 DelayQueue 保存緩存元素的有效期,使用一個(gè)線程循環(huán)查詢 DelayQueue,如果能從 DelayQueue 中獲取元素,說(shuō)明緩存有效期到了
  • 定時(shí)任務(wù)調(diào)度:用 DelayQueue 保存當(dāng)天會(huì)執(zhí)行的任務(wù)以及時(shí)間,如果能從 DelayQueue 中獲取元素,任務(wù)就可以開(kāi)始執(zhí)行了。比如 TimerQueue 就是這樣實(shí)現(xiàn)的

SynchronousQueue

這是一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列,不存儲(chǔ)元素還叫隊(duì)列?

沒(méi)錯(cuò),SynchronousQueue 直譯過(guò)來(lái)叫同步隊(duì)列,如果在隊(duì)列里面呆久了應(yīng)該就算是“異步”了吧

所以使用它,每個(gè)put() 操作必須要等待一個(gè) take() 操作,反之亦然,否則不能繼續(xù)添加元素

實(shí)際中怎么用呢?假如你需要兩個(gè)線程之間同步共享變量,如果不用 SynchronousQueue 你可能會(huì)選擇用 CountDownLatch 來(lái)完成,就像這樣:

ExecutorService executor = Executors.newFixedThreadPool(2);
AtomicInteger sharedState = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(1);



Runnable producer = () -> {
    Integer producedElement = ThreadLocalRandom
      .current()
      .nextInt();
    sharedState.set(producedElement);
    countDownLatch.countDown();
};



Runnable consumer = () -> {
    try {
        countDownLatch.await();
        Integer consumedElement = sharedState.get();
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

這點(diǎn)小事就用計(jì)數(shù)器來(lái)實(shí)現(xiàn),顯然很不合適,用 SynchronousQueue 改造一下,感覺(jué)瞬間就不一樣了

ExecutorService executor = Executors.newFixedThreadPool(2);
SynchronousQueue<Integer> queue = new SynchronousQueue<>();

Runnable producer = () -> {
    Integer producedElement = ThreadLocalRandom
      .current()
      .nextInt();
    try {
        queue.put(producedElement);
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

Runnable consumer = () -> {
    try {
        Integer consumedElement = queue.take();
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

其實(shí) Executors.newCachedThreadPool() 方法里面使用的就是 SynchronousQueue

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

看到前面 LinkedBlockingQueue 用在 newSingleThreadExecutor 和 newFixedThreadPool 上,而newCachedThreadPool 卻用 SynchronousQueue,這是為什么呢?

因?yàn)閱尉€程池和固定線程池中,線程數(shù)量是有限的,因此提交的任務(wù)需要在LinkedBlockingQueue隊(duì)列中等待空余的線程;

而緩存線程池中,線程數(shù)量幾乎無(wú)限(上限為Integer.MAX_VALUE),因此提交的任務(wù)只需要在SynchronousQueue 隊(duì)列中同步移交給空余線程即可, 所以有時(shí)也會(huì)說(shuō) SynchronousQueue 的吞吐量要高于 LinkedBlockingQueue 和 ArrayBlockingQueue

LinkedTransferQueue

簡(jiǎn)單來(lái)說(shuō),TransferQueue提供了一個(gè)場(chǎng)所,生產(chǎn)者線程使用 transfer 方法傳入一些對(duì)象并阻塞,直至這些對(duì)象被消費(fèi)者線程全部取出。

你有沒(méi)有覺(jué)得,剛剛介紹的 SynchronousQueue 是否很像一個(gè)容量為 0 的 TransferQueue。

但 LinkedTransferQueue 相比其他阻塞隊(duì)列多了三個(gè)方法

  • transfer(E e) 如果當(dāng)前有消費(fèi)者正在等待消費(fèi)元素,transfer 方法就可以直接將生產(chǎn)者傳入的元素立刻 transfer (傳輸) 給消費(fèi)者;如果沒(méi)有消費(fèi)者等待消費(fèi)元素,那么 transfer 方法會(huì)把元素放到隊(duì)列的 tail(尾部)節(jié)點(diǎn),一直阻塞,直到該元素被消費(fèi)者消費(fèi)才返回
  • tryTransfer(E e) tryTransfer,很顯然是一種嘗試,如果沒(méi)有消費(fèi)者等待消費(fèi)元素,則馬上返回 false ,程序不會(huì)阻塞
  • tryTransfer(E e, long timeout, TimeUnit unit) 帶有超時(shí)限制,嘗試將生產(chǎn)者傳入的元素 transfer 給消費(fèi)者,如果超時(shí)時(shí)間到,還沒(méi)有消費(fèi)者消費(fèi)元素,則返回 false

你瞧,所有阻塞的方法都是一個(gè)套路:

  1. 阻塞方式
  2. 帶有 try 的非阻塞方式
  3. 帶有 try 和超時(shí)時(shí)間的非阻塞方式

看到這你也許感覺(jué) LinkedTransferQueue 沒(méi)啥特點(diǎn),其實(shí)它和其他阻塞隊(duì)列的差別還挺大的:

BlockingQueue 是如果隊(duì)列滿了,線程才會(huì)阻塞;但是 TransferQueue 是如果沒(méi)有消費(fèi)元素,則會(huì)阻塞 (transfer 方法)

這也就應(yīng)了 Doug Lea 說(shuō)的那句話:

LinkedTransferQueue is actually a superset of ConcurrentLinkedQueue, SynchronousQueue (in “fair” mode), and unbounded
LinkedBlockingQueues. And it's made better by allowing you to mix and
match those features as well as take advantage of higher-performance i
mplementation techniques.

簡(jiǎn)單翻譯:

LinkedTransferQueue 是ConcurrentLinkedQueue, SynchronousQueue (在公平模式下), 無(wú)界的LinkedBlockingQueues等的超集; 允許你混合使用阻塞隊(duì)列的多種特性

所以,在合適的場(chǎng)景中,請(qǐng)盡量使用LinkedTransferQueue

上面都看的是單向隊(duì)列 FIFO,接下來(lái)我們看看雙向隊(duì)列

LinkedBlockingDeque

LinkedBlockingDeque 是一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列,凡是后綴為 Deque 的都是雙向隊(duì)列意思,后綴的發(fā)音為deck——/dek/, 剛接觸它時(shí)我以為是這個(gè)冰激凌的發(fā)音

所謂雙向隊(duì)列值得就是可以從隊(duì)列的兩端插入和移除元素。所以:

雙向隊(duì)列因?yàn)槎嗔艘粋€(gè)操作隊(duì)列的入口,在多線程同時(shí)入隊(duì)是,也就會(huì)減少一半的競(jìng)爭(zhēng)

隊(duì)列有頭,有尾,因此它又比其他阻塞隊(duì)列多了幾個(gè)特殊的方法

  • addFirst
  • addLast
  • xxxxFirs
  • txxxxLast
  • ... ...

這么一看,雙向阻塞隊(duì)列確實(shí)很高效,

那雙向阻塞隊(duì)列應(yīng)用在什么地方了呢?

不知道你是否聽(tīng)過(guò) “工作竊取”模式,看似不太厚道的一種方法,實(shí)則是高效利用線程的好辦法。下一篇文章,我們就來(lái)看看 ForkJoinPool 是如何應(yīng)用 “工作竊取”模式的

總結(jié)

到這關(guān)于 Java 隊(duì)列(其實(shí)主要介紹了阻塞隊(duì)列)就快速的區(qū)分完了,將看似雜亂的方法做了分類整理,方便快速理解其用途,同時(shí)也說(shuō)明了這些隊(duì)列的實(shí)際用途。相信你帶著更高的視角來(lái)閱讀源碼會(huì)更加輕松,最后也希望大家認(rèn)真看兩個(gè)隊(duì)列的源碼實(shí)現(xiàn),在遇到隊(duì)列的問(wèn)題,腦海中的畫(huà)面分分鐘就可以搞定了

以上就是Java并發(fā)隊(duì)列的詳細(xì)內(nèi)容,更多關(guān)于Java多線程的資料請(qǐng)關(guān)注W3Cschool其它相關(guān)文章!


0 人點(diǎn)贊