隊(duì)列是一種 FIFO
(先進(jìn)先出)的數(shù)據(jù)結(jié)構(gòu),本文要講的 BlockingQueue
也是一種隊(duì)列,而且強(qiáng)調(diào)了線程安全的特性。
BlockingQueue
全稱:java.util.concurrent.BlockingQueue
。它是是一個(gè)線程安全的隊(duì)列接口,多個(gè)線程能夠以并發(fā)的方式從隊(duì)列中插入數(shù)據(jù),取出數(shù)據(jù)的同時(shí)不會(huì)出現(xiàn)線程安全的問題。
生產(chǎn)者和消費(fèi)者例子
BlockingQueue
通常用于消費(fèi)者線程向隊(duì)列存入數(shù)據(jù),消費(fèi)者線程從隊(duì)列中取出數(shù)據(jù),具體如下
- 生產(chǎn)者線程不停的向隊(duì)列中插入數(shù)據(jù),直到隊(duì)列滿了,生產(chǎn)者線程被阻塞
- 消費(fèi)者線程不停的從隊(duì)列中取出數(shù)據(jù),直到隊(duì)列為空,消費(fèi)者線程被阻塞
(推薦教程:Java教程)
BlockingQueue 方法
BlockingQueue
提供 4 種不同類型的方法用于插入數(shù),取出數(shù)據(jù)以及檢查數(shù)據(jù),具體如下
- 操作失敗,拋出異常
- 無論成功/失敗,立即返回
true/false
- 如果隊(duì)列為空/滿,阻塞當(dāng)前線程
- 如果隊(duì)列為空/滿,阻塞當(dāng)前線程并有超時(shí)機(jī)制插入
add(o)
offer(o)
put(o)
offer(o, timeout, timeunit)
取出remove(o)
poll()
take()
poll(timeout, timeunit)
檢查element()
peek()
BlockingQueue 的具體實(shí)現(xiàn)類
BlockingQueue
只是一個(gè)接口,在實(shí)際開發(fā)中有如下的類實(shí)現(xiàn)了該接口。
- ArrayBlockingQueue
- DelayQueue
- LinkedBlockingQueue
- PriorityBlockingQueue
- SynchronousQueue
ArrayBlockingQueue 的使用
這里以 BlockingQueue
接口的具體實(shí)現(xiàn)類 ArrayBlockingQueue
舉例。通過 ArrayBlockingQueue
實(shí)現(xiàn)一個(gè)消費(fèi)者和生產(chǎn)者多線程模型。
核心內(nèi)容如下:
- 以
ArrayBlockingQueue
作為生產(chǎn)者和消費(fèi)者的數(shù)據(jù)容器 - 通過
ExecutorService
啟動(dòng) 3 個(gè)線程,2 兩個(gè)生產(chǎn)者,1 個(gè)消費(fèi)者 - 指定數(shù)據(jù)總量
(推薦微課:Java微課)
生產(chǎn)者線程
ArrayBlockingQueueProducer
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 生產(chǎn)者線程向容器存入指定總量的 任務(wù)
*
*/
public class ArrayBlockingQueueProducer implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(ArrayBlockingQueueProducer.class);
// 容器
private ArrayBlockingQueue<String> queue;
// 生產(chǎn)指定的數(shù)量
private AtomicInteger numberOfElementsToProduce;
public ArrayBlockingQueueProducer(ArrayBlockingQueue<String> queue, AtomicInteger numberOfElementsToProduce) {
this.queue = queue;
this.numberOfElementsToProduce = numberOfElementsToProduce;
}
@Override
public void run() {
try {
while (numberOfElementsToProduce.get() > 0) {
try {
// 向隊(duì)列中存入任務(wù)
String task = String.format("task_%s", numberOfElementsToProduce.getAndUpdate(x -> x-1));
queue.put(task);
logger.info("thread {}, produce task {}", Thread.currentThread().getName(), task);
// 任務(wù)為0,生產(chǎn)者線程退出
if (numberOfElementsToProduce.get() == 0) {
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (Exception e) {
logger.error(this.getClass().getName().concat(". has error"), e);
}
}
}
消費(fèi)者線程
ArrayBlockingQueueConsumer
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 消費(fèi)者線程向容器 消費(fèi) 指定總量的任務(wù)
*
*/
public class ArrayBlockingQueueConsumer implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(ArrayBlockingQueueConsumer.class);
private ArrayBlockingQueue<String> queue;
private AtomicInteger numberOfElementsToProduce;
public ArrayBlockingQueueConsumer(ArrayBlockingQueue<String> queue, AtomicInteger numberOfElementsToProduce) {
this.queue = queue;
this.numberOfElementsToProduce = numberOfElementsToProduce;
}
@Override
public void run() {
try {
while (!queue.isEmpty() || numberOfElementsToProduce.get() >= 0) {
// 從隊(duì)列中獲取任務(wù),并執(zhí)行任務(wù)
String task = queue.take();
logger.info("thread {} consume task {}", Thread.currentThread().getName(),task);
// 隊(duì)列中數(shù)據(jù)為空,消費(fèi)者線程退出
if (queue.isEmpty()) {
break;
}
}
} catch (Exception e) {
logger.error(this.getClass().getName().concat(". has error"), e);
}
}
}
測(cè)試 TestBlockingQueue
import com.ckjava.synchronizeds.appCache.WaitUtils;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 1. 以 ArrayBlockingQueue 作為生產(chǎn)者和消費(fèi)者的數(shù)據(jù)容器 <br>
* 2. 通過 ExecutorService 啟動(dòng) 3 個(gè)線程,2 兩個(gè)生產(chǎn)者,1 個(gè)消費(fèi)者 <br>
* 3. 指定數(shù)據(jù)總量
*/
public class TestBlockingQueue {
public static void main(String[] args) {
ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(10);
/*BlockingQueue delayQueue = new DelayQueue();
BlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(10);
BlockingQueue<String> priorityBlockingQueue = new PriorityBlockingQueue<>(10);
BlockingQueue<String> synchronousQueue = new SynchronousQueue<>();*/
ExecutorService executorService = Executors.newFixedThreadPool(3);
// 最多生產(chǎn) 5 個(gè)數(shù)據(jù)
AtomicInteger numberOfElementsToProduce = new AtomicInteger(5);
// 2 個(gè)生產(chǎn)者線程
executorService.submit(new ArrayBlockingQueueProducer(arrayBlockingQueue, numberOfElementsToProduce));
executorService.submit(new ArrayBlockingQueueProducer(arrayBlockingQueue, numberOfElementsToProduce));
// 1 個(gè)消費(fèi)者線程
executorService.submit(new ArrayBlockingQueueConsumer(arrayBlockingQueue, numberOfElementsToProduce));
executorService.shutdown();
WaitUtils.waitUntil(() -> executorService.isTerminated(), 1000L);
}
}
輸出如下:
13:54:17.884 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_5
13:54:17.884 [pool-1-thread-1] INFO c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-1, produce task task_5
13:54:17.884 [pool-1-thread-2] INFO c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-2, produce task task_4
13:54:17.887 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_4
13:54:17.887 [pool-1-thread-2] INFO c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-2, produce task task_2
13:54:17.887 [pool-1-thread-1] INFO c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-1, produce task task_3
13:54:17.887 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_3
13:54:17.887 [pool-1-thread-2] INFO c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-2, produce task task_1
13:54:17.887 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_2
13:54:17.887 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_1
(推薦內(nèi)容:Java面試基礎(chǔ)題)
以上就是關(guān)于JUC
之 BlockingQueue
接口以及 ArrayBlockingQueue
實(shí)現(xiàn)類的相關(guān)介紹了,希望對(duì)大家有所幫助。