1. 程式人生 > >淺談Java佇列Queue

淺談Java佇列Queue

佇列Queue就是一個先進先出的資料結構,與List、Set同一級別,繼承了Collection介面。

一、Queue的實現

1、阻塞佇列(BlockingQueue)

① 插入:佇列不滿時可執行插入元素執行緒,直到佇列滿。

② 移除:佇列不為空都可移除,直到佇列為空。

丟擲異常:滿的時候插入,空的時候取出都會拋異常。

返回特殊值:插入成功返回true

一直阻塞:滿時put和空時take會阻塞執行緒,直到佇列可用。

超時退出:滿時會阻塞插入一段時間,如果超過一定時間,執行緒就會退出。

JDK7提供了7個阻塞佇列:

① ArrayListBlockingQueue

ArrayBlockingQueue是一個由陣列支援的有界阻塞佇列。在讀寫操作上都需要鎖住整個容器,因此吞吐量與一般的實現是相似的,適合於實現“生產者消費者”模式。 
② LinkedBlockingQueue

一個由連結串列結構組成的雙向阻塞佇列。

基於連結串列的阻塞佇列,內部維持這一關資料緩衝佇列。當生產者往佇列中放入一個數據時,佇列會從生產者手中獲取資料,並快取在佇列內部,而生產者立即返回;當佇列緩衝區達到最大緩衝容量時,才會阻塞生產者佇列,直到消費者從佇列中消費掉一份資料,生成者執行緒才會被喚醒,反之對消費者的處理也基於同樣的原理。

LinkedBlockingQueue之所以能夠高效的處理併發資料,還因為其對於生產者和消費者分別採用了獨立的鎖來控制資料同步,也就意味著在高併發的情況下生產者和消費者可以並行的操作佇列中的資料,以此來提高整個佇列的併發效能。

③ SynchronousQueue

一個不儲存元素的阻塞佇列,在某次新增元素後必須等待其他執行緒取走後才能繼續新增。

④ PriorityBlockingQueue 

是一個帶優先順序的佇列,而不是先進先出佇列,該佇列也沒有上限,但是如果佇列為空,那麼取元素的操作take就會阻塞。

⑤ DelayQueue

是一個存放Delayed 元素的無界阻塞佇列,只有在延遲期滿時才能從中提取元素。

ArrayListBlockingQueue和LinkedBlockingQueue的區別?

① 佇列中鎖的實現不同

ArrayBlockingQueue生產者消費者使用同一個鎖。

LinkedBlockingQueue生產用的是putLock,消費是takeLock。

② 在生產和消費時操作不同

ArrayBlockingQueue實現的佇列中在生產和消費的時候,是直接將列舉物件插入或移除的;

LinkedBlockingQueue實現的佇列中在生產和消費的時候,需要把列舉物件轉換為節點進行插入或移除,會影響效能。

③ 佇列大小初始化方式不同

ArrayBlockingQueue實現的佇列必須指定大小

LinkedBlockingQueue可以不指定大小,預設是Integer.MAX_VALUE

ArrayBlockingQueue效能要比LinkedBlockingQueue效能要好,執行速度更快,ArrayBlockingQueue優先使用!

2、非阻塞佇列

ConcurrentLinkedQueue是一個基於連結節點的無界執行緒安全佇列,它採用先進先出的規則對節點進行排序,當我們新增一個元素的時候,它會新增到佇列的尾部;當我們獲取一個元素時,它會返回佇列頭部的元素。

入隊和出隊操作均利用CAS(compare and set)更新,這樣允許多個執行緒併發執行,並且不會因為加鎖而阻塞執行緒,使得併發效能更好。

注:CAS用於實現多執行緒同步的原子指令,它將記憶體位置的內容與給定值進行比較,只有在相同的情況下,將該記憶體位置的內容修改為新的給定值。 

二、程式碼例項

package OSChina.Client;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BlockingQueueTest {
    static final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
    //定義裝蘋果的籃子
    public static class Basket{
        // 籃子,能夠容納3個蘋果
       static BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);
        //生產蘋果,放入籃子
        public void produce() throws InterruptedException{
            queue.put("An apple");
        }
        // 消費蘋果,從籃子中取走
        public String consume() throws InterruptedException{
            String apple = queue.take();
            return apple;
        }
        public static int getAppleNumber(){
            return queue.size();
        }
    }

    public static void testBasket(){
        // 建立一個裝蘋果的籃子
        final Basket basket = new Basket();
        //定義蘋果生產者
        class Producer implements Runnable{
            @Override
            public void run() {
                try {
                    while (true){
                        System.out.println("生產者準備生產蘋果:"+sdf.format(new Date())+",籃子中蘋果數量:"+Basket.getAppleNumber());
                        basket.produce();
                        System.out.println("生產者生產蘋果完畢:" + sdf.format(new Date())+",籃子中蘋果數量:"+Basket.getAppleNumber());
                        Thread.sleep(300);
                    }
                }catch (InterruptedException  ex){
                }
            }
        }
        //定義蘋果消費者
        class Consumer implements Runnable{
            @Override
            public void run() {
                try {
                    while (true){
                        System.out.println("消費者準備消費蘋果:" + sdf.format(new Date())+",籃子中蘋果數量:"+Basket.getAppleNumber());
                        basket.consume();
                        System.out.println("消費完後有蘋果:" + sdf.format(new Date())+",籃子中蘋果數量:"+Basket.getAppleNumber());
                        Thread.sleep(1000);
                    }
                }catch (InterruptedException ex){
                }
            }
        }

        ExecutorService service = Executors.newFixedThreadPool(2);
        Producer producer = new Producer();
        Consumer consumer = new Consumer();
        service.submit(producer);
        service.submit(consumer);
        // 程式執行10s後,所有任務停止
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        service.shutdownNow();
    }

    public static void main(String[] args) {
        BlockingQueueTest.testBasket();
    }
}

 

江疏