1. 程式人生 > >併發佇列 ConcurrentLinkedQueue 及 BlockingQueue 介面實現的四種佇列

併發佇列 ConcurrentLinkedQueue 及 BlockingQueue 介面實現的四種佇列

佇列是一種特殊的線性表,它只允許在表的前端(front)進行刪除操作,而在表的後端(rear)進行插入操作。進行插入操作的端稱為隊尾,進行刪除操作的端稱為隊頭。佇列中沒有元素時,稱為空佇列。

在佇列這種資料結構中,最先插入的元素將是最先被刪除的元素;反之最後插入的元素將是最後被刪除的元素,因此佇列又稱為“先進先出”(FIFO—first in first out)的線性表。

在併發佇列上JDK提供了兩套實現,一個是以ConcurrentLinkedQueue為代表的高效能佇列,一個是以BlockingQueue介面為代表的阻塞佇列,無論哪種都繼承自Queue。


一、ConcurrentLinkedQueue

是一個適用於高併發場景下的佇列,通過無鎖的方式,實現了高併發狀態下的高效能,通常ConcurrentLikedQueue效能好於BlockingQueue。

它是一個基於連線節點的無界執行緒安全佇列。該佇列的元素遵循先進先出的原則。頭是最先加入的,尾是最近加入的,該佇列不允許null元素。

ConcurrentLinkedQueue重要方法:

add()和offer()都是加入元素的方法(在ConcurrentLinkedQueue中,這兩個方法沒有任何區別)。

poll()和peek()都是取頭元素節點,區別在於前者會刪除元素,後者不會。

下面看一個例子:

  /* 
   * 一個基於連結節點的、無界的、執行緒安全的佇列。此佇列按照 FIFO(先進先出)原則對元素進行排序。佇列的頭部 是佇列中時間最長的元素。佇列的尾部 
   * 是佇列中時間最短的元素。新的元素插入到佇列的尾部,佇列檢索操作從佇列頭部獲得元素。當許多執行緒共享訪問一個公共 collection 
   * 時,ConcurrentLinkedQueue 是一個恰當的選擇。此佇列不允許 null 元素。 
   */    
  private void concurrentLinkedQueueTest() {    
      ConcurrentLinkedQueue<String> concurrentLinkedQueue = new ConcurrentLinkedQueue<String>();    
      concurrentLinkedQueue.add("a");    
      concurrentLinkedQueue.add("b");    
      concurrentLinkedQueue.add("c");    
      concurrentLinkedQueue.offer("d"); // 將指定元素插入到此佇列的尾部。    
      concurrentLinkedQueue.peek(); // 檢索並移除此佇列的頭,如果此佇列為空,則返回 null。    
      concurrentLinkedQueue.poll(); // 檢索並移除此佇列的頭,如果此佇列為空,則返回 null。    
  
      for (String str : concurrentLinkedQueue) {    
          System.out.println(str);    
      }    
  }  

注意:ConcurrentLinkedQueue的API.size() 是要遍歷一遍集合的,速很慢,所以判空時,儘量要避免用size(),而改用isEmpty()。

二、BlockingQueue介面

ArrayBlockingQueue:基於陣列的阻塞佇列實現,在ArrayBlockingQueue內部,維護了一個定長陣列,以便快取佇列中的資料物件,其內部沒實現讀寫分離,也就意味著生產和消費不能完全並行,長度是需要定義的,可以指定先進先出或者先進後出,也叫有界佇列,在很多場合非常適合使用。

package concurrent;  
  
import java.util.concurrent.ArrayBlockingQueue;  
import java.util.concurrent.TimeUnit;  
  
public class ArrayBlockingQueueTest {  
  
    private ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10);  
  
    public static void main(String[] args) throws InterruptedException {  
        final ArrayBlockingQueueTest arrayBlockingQueueTest = new ArrayBlockingQueueTest();  
        new Thread(new Runnable() {  
            public void run() {  
                try {  
                    arrayBlockingQueueTest.producer();  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
        }).start();  
        new Thread(new Runnable() {  
            public void run() {  
                try {  
                    arrayBlockingQueueTest.consumer();  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
        }).start();  
  
  
    }  
  
    private void producer() throws InterruptedException {  
        for(int i=0; i<100; i++) {  
            System.out.println("arrayBlockingQueue.size()="+arrayBlockingQueue.size());  
            //Thread.sleep(1000);  
            //佇列滿了之後會直接丟擲異常  
            //arrayBlockingQueue.add(i);  
            //佇列滿了之後會等待佇列騰出空間  
            //arrayBlockingQueue.put(i);  
            //將指定的元素插入到此佇列的尾部(如果立即可行且不會超過該佇列的容量),在成功時返回 true,如果此佇列已滿,則返回 false。  
            arrayBlockingQueue.offer(i);  
  
        }  
    }  
  
    private void consumer() throws InterruptedException {  
        while(true) {  
            //Thread.sleep(1000);  
            //獲取並移除此佇列的頭部,在指定的等待時間前等待可用的元素。如果已經沒有可用的元素,則沒10s返回一個null  
            // System.out.println(arrayBlockingQueue.poll(10000, TimeUnit.MILLISECONDS));  
            //獲取並移除此佇列的頭部,在元素變得可用之前一直等待  
            System.out.println(arrayBlockingQueue.take());  
            //獲取但不移除此佇列的頭;如果此佇列為空,則返回 null  
            //System.out.println(arrayBlockingQueue.peek());  
        }  
    }  
}

關於佇列中各方法的說明:

操作丟擲異常返回個特殊值阻塞到佇列可用一定時間後退出操作方式
新增元素add(e)offer(e)put(e)offer(e,time,unit)新增到隊尾
移除元素remove()poll()take()poll(e,time,unit)獲取頭元素並移除
查詢元素element()peek()獲取頭元素不移除

LinkedBlockingQueue:基於連結串列的阻塞佇列,同ArrayBlockingQueue類似,其內部也是維護著一個數據緩衝佇列(該佇列有一個連結串列構成),LinkedBlockingQueue之所以能夠高效的處理併發資料,是因為其內部實現採用分離鎖(讀寫分離兩個鎖),從而實現生產者和消費者操作的完全並行執行。它是一個無界佇列。

private LinkedBlockingQueue<String> queue;//禮物的佇列
private final static int GET_QUEUE_GIFT = 0;//從佇列中獲取禮物
private Handler handler = new Handler() {
    @Override
    public void handleMessage(Message msg) {
        super.handleMessage(msg);
        switch (msg.what) {
            case GET_QUEUE_GIFT://如果是從佇列中獲取禮物實體的訊息
                if (!queue.isEmpty()) {
                    String vo = queue.poll();
                    if (vo != null) {//如果從佇列中獲取的禮物不為空,那麼就將禮物展示在介面上
                        Log.e("------", "------獲取的------" + vo);
                        handler.sendEmptyMessageDelayed(GET_QUEUE_GIFT, 1000);
                    }
                } else {//如果這次從佇列中獲取的訊息是禮物是空的,則一秒之後重新獲取
                    Log.e("------", "------獲取的------isEmpty");
                }
                break;
        }
    }
};

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);
    findViewById(R.id.addqueue).setOnClickListener(new View.OnClickListener() {
        @Override
        public void onClick(View v) {
            for (int i = 0; i < 6; i++) {
                queue.add("我是佇列中的-----第" + (i + 6) + "個");
            }
            handler.sendEmptyMessageDelayed(GET_QUEUE_GIFT, 1000);//輪詢佇列獲取禮物
        }
    });
    queue = new LinkedBlockingQueue<>();
    for (int i = 0; i < 6; i++) {
        queue.add("我是佇列中的第" + i + "個");
    }
    handler.sendEmptyMessageDelayed(GET_QUEUE_GIFT, 1000);//輪詢佇列獲取禮物
}

PriorityBlockingQueue:基於優先順序的阻塞佇列(優先順序的判斷通過建構函式傳入的Compator物件來決定,也就是說傳入佇列的物件必須實現Comparable介面),在實現PriorityBlockingQueue時,內部控制執行緒同步的鎖採用的是公平鎖,他也是一個無界的佇列。add()並不進行排序操作,只有在取資料時才進行排序

public static PriorityBlockingQueue<User> queue = new PriorityBlockingQueue<User>();  
  
public static void main(String[] args) {  
    queue.add(new User(1,"wu"));  
    queue.add(new User(5,"wu5"));  
    queue.add(new User(23,"wu23"));  
    queue.add(new User(55,"wu55"));  
    queue.add(new User(9,"wu9"));  
    queue.add(new User(3,"wu3"));  
    for (User user : queue) {  
        try {  
            System.out.println(queue.take().name);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
}  
  
//靜態內部類  
static class User implements Comparable<User>{  
  
    public User(int age,String name) {  
        this.age = age;  
        this.name = name;  
    }  
  
    int age;  
    String name;  
  
    @Override  
    public int compareTo(User o) {  
        return this.age > o.age ? -1 : 1;  
    }  
}  

DelayQueue:帶有延遲時間的queue,其中的元素只有當其指定的延遲時間到了,才能夠從佇列中獲取到該元素。DelayQueue中的元素必須實現Delayed介面,DelayQueue是一個沒有大小限制的佇列,應用場景很多,比如對快取超時的資料進行移除、任務超時處理、空閒連線的關閉等等。

import java.util.concurrent.ArrayBlockingQueue;  
import java.util.concurrent.DelayQueue;  
import java.util.concurrent.Delayed;  
import java.util.concurrent.TimeUnit;  
  
  
public class DelayQueueTest {  
  
    private static DelayQueue delayQueue = new DelayQueue();  
  
    private static long count = 0L;  
  
    private static final int taskNum = 4;  
  
    public static void main(String[] args) throws InterruptedException {  
  
        Object num = new Object();  
  
        final DelayQueueTest delayQueueTest = new DelayQueueTest();  
        new Thread(new Runnable() {  
            public void run() {  
                try {  
                    delayQueueTest.producer();  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
        }).start();  
        while(true) {  
            if(delayQueue.size()==taskNum) {  
                break;  
            }  
        }  
        new Thread(new Runnable() {  
            public void run() {  
                try {  
                    delayQueueTest.consumer();  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
        }).start();  
  
  
        new Thread(new Runnable() {  
            public void run() {  
                try {  
                    delayQueueTest.count();  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
        }).start();  
    }  
  
    private void count() throws InterruptedException {  
        while(true) {  
            Thread.sleep(1000);  
            count++;  
            System.out.println("時間值="+count);  
            if(taskNum==count) {  
                break;  
            }  
        }  
    }  
  
    private void producer() throws InterruptedException {  
        for(int i=0; i<taskNum; i++) {  
            DelayedItem temp = new DelayedItem(i+"",i,(i+1));  
            System.out.println("生產者="+temp);  
            delayQueue.put(temp);  
        }  
    }  
  
    private void consumer() throws InterruptedException {  
        while(true) {  
            System.out.println("消費者="+delayQueue.take());  
            count = 0;  
        }  
    }  
  
  
    static class DelayedItem<T> implements  Delayed{  
        private String key;  
        private T item;  
        private long liveTime;  
        private long removeTime;  
  
        public DelayedItem(String key,T item,long liveTime) {  
            this.key = key;  
            this.item = item;  
            this.liveTime = liveTime;  
            this.removeTime = liveTime;  
        }  
  
        /** 
         * 當返回值小於等於0時則快取時間到達,take將取出元素 
         * @param unit 
         * @return 
         */  
        public long getDelay(TimeUnit unit) {  
  
            return removeTime-count;  
        }  
  
        public int compareTo(Delayed o) {  
            if(o instanceof DelayedItem) {  
                //已經在佇列中存在的物件  
                DelayedItem<T> tmpDelayedItem = (DelayedItem<T>)o;  
                //System.out.println("比較物件==="+tmpDelayedItem.key+"==="+this.key);  
                //失效時間越長的排到隊尾  
                if(this.removeTime > tmpDelayedItem.removeTime) {  
                    return 1;  
                } else if(this.removeTime == tmpDelayedItem.removeTime) {  
                    return 0;  
                } else {  
                    return -1;  
                }  
            }  
            return -1;  
        }  
  
        @Override  
        public String toString() {  
            return "DelayedItem{" +  
                   "key='" + key + '\'' +  
                   ", item=" + item +  
                   ", liveTime=" + liveTime +  
                   ", removeTime=" + removeTime +  
                   '}';  
        }  
    }  
} 

執行結果:

生產者=DelayedItem{key='0', item=0, liveTime=1, removeTime=1}  
生產者=DelayedItem{key='1', item=1, liveTime=2, removeTime=2}  
生產者=DelayedItem{key='2', item=2, liveTime=3, removeTime=3}  
生產者=DelayedItem{key='3', item=3, liveTime=4, removeTime=4}  
時間值=1  
消費者=DelayedItem{key='0', item=0, liveTime=1, removeTime=1}  
時間值=1  
時間值=2  
消費者=DelayedItem{key='1', item=1, liveTime=2, removeTime=2}  
時間值=1  
時間值=2  
時間值=3  
消費者=DelayedItem{key='2', item=2, liveTime=3, removeTime=3}  
時間值=1  
時間值=2  
時間值=3  
時間值=4  
消費者=DelayedItem{key='3', item=3, liveTime=4, removeTime=4}  

SynchronousQueue:一種沒有緩衝的佇列,生產者產生的資料直接被消費者獲取並消費。一個沒有容量的併發佇列有什麼用了?或者說存在的意義是什麼?SynchronousQueue 的實現非常複雜,SynchronousQueue 內部沒有容量,但是由於一個插入操作總是對應一個移除操作,反過來同樣需要滿足。那麼一個元素就不會再SynchronousQueue 裡面長時間停留,一旦有了插入執行緒和移除執行緒,元素很快就從插入執行緒移交給移除執行緒。也就是說這更像是一種通道(管道),資源從一個方向快速傳遞到另一方 向。需要特別說明的是,儘管元素在SynchronousQueue 內部不會“停留”,但是並不意味之SynchronousQueue 內部沒有佇列。實際上SynchronousQueue 維護者執行緒佇列,也就是插入執行緒或者移除執行緒在不同時存在的時候就會有執行緒佇列。既然有佇列,同樣就有公平性和非公平性特性,公平性保證正在等待的插入線 程或者移除執行緒以FIFO的順序傳遞資源。顯然這是一種快速傳遞元素的方式,也就是說在這種情況下元素總是以最快的方式從插入著(生產者)傳遞給移除著(消費者),這在多工佇列中是最快處理任務的方式。線上程池的相關章節中還會更多的提到此特性。

它模擬的功能類似於生活中一手交錢一手交貨這種情形,像那種貨到付款或者先付款後發貨模型不適合使用SynchronousQueue。首先要知道SynchronousQueue沒有容納元素的能力,即它的isEmpty()方法總是返回true,但是給人的感覺卻像是隻能容納一個元素。

import java.util.Random;  
import java.util.concurrent.SynchronousQueue;  
import java.util.concurrent.TimeUnit;  
  
public class SynchronousQueueTest {  
    public static void main(String[] args) throws InterruptedException {  
        SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();  
      
        new Product(queue).start();  
        new Customer(queue).start();  
    }  
    static class Product extends Thread{  
        SynchronousQueue<Integer> queue;  
        public Product(SynchronousQueue<Integer> queue){  
            this.queue = queue;  
        }  
        @Override  
        public void run(){  
            while(true){  
                int rand = new Random().nextInt(1000);  
                System.out.println("生產了一個產品:"+rand);  
                System.out.println("等待三秒後運送出去...");  
                try {  
                    TimeUnit.SECONDS.sleep(3);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
                try {  
                    queue.put(rand);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
                  
                System.out.println(queue.isEmpty());  
            }  
        }  
    }  
    static class Customer extends Thread{  
        SynchronousQueue<Integer> queue;  
        public Customer(SynchronousQueue<Integer> queue){  
            this.queue = queue;  
        }  
        @Override  
        public void run(){  
            while(true){  
                try {  
                    System.out.println("消費了一個產品:"+queue.take());  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
                System.out.println("------------------------------------------");  
            }  
        }  
    }  
}

執行結果:

生產了一個產品:542  
等待三秒後運送出去...  
true  
消費了一個產品:542  
生產了一個產品:183  
等待三秒後運送出去...  
------------------------------------------  
true  
消費了一個產品:183  
------------------------------------------  
生產了一個產品:583  
等待三秒後運送出去...  
true  
消費了一個產品:583  
------------------------------------------