1. 程式人生 > >Java中佇列的解析

Java中佇列的解析

定義

佇列是一種特殊的線性表,遵循的原則就是“先入先出”。在我們日常使用中,經常會用來併發操作資料。在併發程式設計中,有時候需要使用執行緒安全的佇列。如果要實現一個執行緒安全的佇列通常有兩種方式:一種是使用阻塞佇列,另一種是使用執行緒同步鎖。

什麼是阻塞佇列?

假設有一個麵包房,裡面有一個客人吃麵包,一個師傅烤麵包。籃子裡面最多放2個麵包,師傅考完了麵包放到籃子裡,而客人吃麵包則從籃子裡面往外拿,為了保證客人吃麵包的時候籃子裡有面包或者師傅烤麵包的時候籃子不會溢位,這時候就需要引用出來阻塞佇列的概念,就是我們常說的生產者消費者的模式。

阻塞佇列是一個支援兩個附加操作的佇列。這兩個附加的操作支援阻塞的插入和移除方法。

1)支援阻塞的插入方法:意思是當佇列滿時,佇列會阻塞插入元素的執行緒,直到佇列不滿。

2)支援阻塞的移除方法:意思是在佇列為空時,獲取元素的執行緒會等待佇列變為非空。阻塞佇列常用於生產者和消費者的場景,生產者是向佇列裡新增元素的執行緒,消費者是從佇列裡取元素的執行緒。阻塞佇列就是生產者用來存放元素、消費者用來獲取元素的容器。

系統內不阻塞佇列:PriorityQueue  ConcurrentLinkedQueue

我們來看一下不阻塞佇列的關係(以PriorityQueue 為例):


  PriorityQueue 類繼承自AbstractQueue,實現了Serializable介面。實質上維護了一個有序列表,

PriorityQueue位於Java util包中,觀其名字前半部分的單詞Priority是優先的意思,實際上這個佇列就是具有“優先順序”。加入到 Queue 中的元素根據它們的天然排序(通過其 java.util.Comparable 實現)或者根據傳遞給建構函式的 java.util.Comparator 實現來定位。
  ConcurrentLinkedQueue 是基於連結節點的、執行緒安全的佇列。併發訪問不需要同步。因為它在佇列的尾部新增元素並從頭部刪除它們,所以不需要知道佇列的大小, ConcurrentLinkedQueue 對公共集合的共享訪問就可以工作得很好。收集關於佇列大小的資訊會很慢,需要遍歷佇列;
ConcurrentLinkedQueue是一個基於連結節點的無界執行緒安全佇列,它採用先進先出的規則對節點進行排序,當我們新增一個元素的時候,它會新增到佇列的尾部;當我們獲取一個元素時,它會返回佇列頭部的元素。

實現阻塞介面的佇列:

java.util.concurrent 中加入了 BlockingQueue 介面和五個阻塞佇列類。它實質上就是一種帶有一點扭曲的 FIFO 資料結構。不是立即從佇列中新增或者刪除元素,執行緒執行操作阻塞,直到有空間或者元素可用。
五個佇列所提供的各有不同:

ArrayBlockingQueue :一個由陣列支援的有界佇列。

LinkedBlockingQueue :一個由連結節點支援的可選有界佇列。

PriorityBlockingQueue :一個由優先順序堆支援的無界優先順序佇列。

DelayQueue :一個由優先順序堆支援的、基於時間的排程佇列。

SynchronousQueue :一個利用 BlockingQueue 介面的簡單聚集(rendezvous)機制。

我們看一下ArrayBlockingQueue 和LinkedBlockingQueue 的繼承關係:

                

                 

通過檢視兩個類的繼承關係,我們可以知道,他們也是繼承自AbstractQueue,實現了Serializable介面;不同的是他們同時實現了BlockingQueue介面。

簡單介紹下其中的幾個:

LinkedBlockingQueueLinkedBlockingQueue預設大小是Integer.MAX_VALUE,可以理解為一個快取的有界等待佇列,可以選擇指定其最大容量,它是基於連結串列的佇列,此佇列按 FIFO(先進先出)排序元素。當生產者往佇列中放入一個數據時,快取在佇列內部,當佇列緩衝區達到最大值快取容量時(LinkedBlockingQueue可以通過建構函式指定該值),阻塞生產者佇列,直到消費者從佇列中消費掉一份資料,生產者執行緒會被喚醒,反之對於消費者同理。

ArrayBlockingQueue在構造時需要指定容量, 並可以選擇是否需要公平性,如果公平引數被設定true,等待時間最長的執行緒會優先得到處理(其實就是通過將ReentrantLock設定為true來 達到這種公平性的:即等待時間最長的執行緒會先操作)。通常,公平性會使你在效能上付出代價,只有在的確非常需要的時候再使用它。它是基於陣列的阻塞迴圈佇列,此佇列按FIFO(先進先出)原則對元素進行排序。

PriorityBlockingQueue是一個帶優先順序的 佇列,而不是先進先出佇列。元素按優先順序順序被移除,該佇列也沒有上限(看了一下原始碼,PriorityBlockingQueue是對 PriorityQueue的再次包裝,是基於堆資料結構的,而PriorityQueue是沒有容量限制的,與ArrayList一樣,所以在優先阻塞 佇列上put時是不會受阻的。雖然此佇列邏輯上是無界的,但是由於資源被耗盡,所以試圖執行新增操作可能會導致 OutOfMemoryError),但是如果佇列為空,那麼取元素的操作take就會阻塞,所以它的檢索操作take是受阻的。另外,往入該佇列中的元 素要具有比較能力。

關於ConcurrentLinkedQueueLinkedBlockingQueue

也可以理解為阻塞佇列和非阻塞佇列的區別:

1.LinkedBlockingQueue是使用鎖機制,ConcurrentLinkedQueue是使用CAS演算法,雖然LinkedBlockingQueue的底層獲取鎖也是使用的CAS演算法

2.關於取元素,ConcurrentLinkedQueue不支援阻塞去取元素,LinkedBlockingQueue支援阻塞的take()方法。

3.關於插入元素的效能,但在實際的使用過程中,尤其在多cpu的伺服器上,有鎖和無鎖的差距便體現出來了,ConcurrentLinkedQueue會比LinkedBlockingQueue快很多。

生產者消費者程式碼:

在網上看到一個生產者消費者的小例子,對於理解阻塞佇列非常有幫助,程式碼如下:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BlockingQueueTest {
    public static class Basket {
        BlockingQueue<String> basket = new ArrayBlockingQueue<>(3);

        private void produce() throws InterruptedException {
            basket.put("蘋果");
        }

        private void consume() throws InterruptedException {
            basket.take();
        }

        private int getAppleNumber() {
            return basket.size();
        }
    }

    private static void testBasket() {
        final Basket basket = new Basket();
        class Producer implements Runnable {
            public void run() {
                try {
                    while (true) {
                        System.out.println("生產者開始生產蘋果###");
                        basket.produce();
                        System.out.println("生產者生產蘋果完畢###");
                        System.out.println("籃子中的蘋果數量:" + basket.getAppleNumber() + "個");
                        Thread.sleep(300);
                    }
                } catch (InterruptedException e) {}
            }
        }

        class Consumer implements Runnable {
            public void run() {
                try {
                    while (true) {
                        System.out.println("消費者開始消費蘋果***");
                        basket.consume();
                        System.out.println("消費者消費蘋果完畢***");
                        System.out.println("籃子中的蘋果數量:" + basket.getAppleNumber() + "個");
                        Thread.sleep(1000);
                    }
                } catch (InterruptedException e) {}
            }
        }
        ExecutorService service = Executors.newCachedThreadPool();
        Producer producer = new Producer();
        Consumer consumer = new Consumer();
        service.submit(producer);
        service.submit(consumer);
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {}
        service.shutdownNow();
    }

    public static void main(String[] args) {
        BlockingQueueTest.testBasket();
    }
}