1. 程式人生 > >阻塞佇列BlokingQueue與雙向佇列Deque

阻塞佇列BlokingQueue與雙向佇列Deque

Queue: 基本上,一個佇列就是一個先入先出(FIFO)的資料結構

Queue介面與List、Set同一級別,都是繼承了Collection介面。LinkedList實現了Deque接 口。

Queue的實現

1、沒有實現的阻塞介面的LinkedList: 實現了java.util.Queue介面和java.util.AbstractQueue介面   內建的不阻塞佇列: PriorityQueue 和 ConcurrentLinkedQueue   PriorityQueue 和 ConcurrentLinkedQueue 類在 Collection Framework 中加入兩個具體集合實現。    PriorityQueue 類實質上維護了一個有序列表。加入到 Queue 中的元素根據它們的天然排序(通過其 java.util.Comparable 實現)或者根據傳遞給建構函式的 java.util.Comparator 實現來定位。   ConcurrentLinkedQueue 是基於連結節點的、執行緒安全的佇列。併發訪問不需要同步。因為它在佇列的尾部新增元素並從頭部刪除它們,所以只要不需要知道佇列的大 小,          ConcurrentLinkedQueue 對公共集合的共享訪問就可以工作得很好。收集關於佇列大小的資訊會很慢,需要遍歷佇列。

2)實現阻塞介面的:   java.util.concurrent 中加入了 BlockingQueue 介面和五個阻塞佇列類。它實質上就是一種帶有一點扭曲的 FIFO 資料結構。不是立即從佇列中新增或者刪除元素,執行緒執行操作阻塞,直到有空間或者元素可用。 五個佇列所提供的各有不同:   * ArrayBlockingQueue :一個由陣列支援的有界佇列。   * LinkedBlockingQueue :一個由連結節點支援的可選有界佇列。   * PriorityBlockingQueue :一個由優先順序堆支援的無界優先順序佇列。   * DelayQueue :一個由優先順序堆支援的、基於時間的排程佇列。   * SynchronousQueue :一個利用 BlockingQueue 介面的簡單聚集(rendezvous)機制。

下表顯示了jdk1.5中的阻塞佇列的操作:

  add        增加一個元索                     如果佇列已滿,則丟擲一個IIIegaISlabEepeplian異常  remove   移除並返回佇列頭部的元素    如果佇列為空,則丟擲一個NoSuchElementException異常  element  返回佇列頭部的元素             如果佇列為空,則丟擲一個NoSuchElementException異常  offer       新增一個元素並返回true       如果佇列已滿,則返回false  poll         移除並返問佇列頭部的元素    如果佇列為空,則返回null  peek

       返回佇列頭部的元素             如果佇列為空,則返回null  put         新增一個元素                      如果佇列滿,則阻塞  take        移除並返回佇列頭部的元素     如果佇列為空,則阻塞

remove、element、offer 、poll、peek 其實是屬於Queue介面。 

阻塞佇列的操作可以根據它們的響應方式分為以下三類:aad、removee和element操作在你試圖為一個已滿的佇列增加元素或從空佇列取得元素時 丟擲異常。當然,在多執行緒程式中,佇列在任何時間都可能變成滿的或空的,所以你可能想使用offer、poll、peek方法。這些方法在無法完成任務時 只是給出一個出錯示而不會丟擲異常。

注意:poll和peek方法出錯進返回null。因此,向佇列中插入null值是不合法的

最後,我們有阻塞操作put和take。put方法在佇列滿時阻塞,take方法在佇列空時阻塞。

LinkedBlockingQueue的容量是沒有上限的(說的不準確,在不指定時容量為Integer.MAX_VALUE,不要然的話在put時怎麼會受阻呢),但是也可以選擇指定其最大容量,它是基於連結串列的佇列,此佇列按 FIFO(先進先出)排序元素。

ArrayBlockingQueue在構造時需要指定容量, 並可以選擇是否需要公平性,如果公平引數被設定true,等待時間最長的執行緒會優先得到處理(其實就是通過將ReentrantLock設定為true來 達到這種公平性的:即等待時間最長的執行緒會先操作)。通常,公平性會使你在效能上付出代價,只有在的確非常需要的時候再使用它。它是基於陣列的阻塞迴圈隊 列,此佇列按 FIFO(先進先出)原則對元素進行排序。

PriorityBlockingQueue是一個帶優先順序的 佇列,而不是先進先出佇列。元素按優先順序順序被移除,該佇列也沒有上限(看了一下原始碼,PriorityBlockingQueue是對 PriorityQueue的再次包裝,是基於堆資料結構的,而PriorityQueue是沒有容量限制的,與ArrayList一樣,所以在優先阻塞 佇列上put時是不會受阻的。雖然此佇列邏輯上是無界的,但是由於資源被耗盡,所以試圖執行新增操作可能會導致 OutOfMemoryError),但是如果佇列為空,那麼取元素的操作take就會阻塞,所以它的檢索操作take是受阻的。另外,往入該佇列中的元 素要具有比較能力。

DelayQueue(基於PriorityQueue來實現的)是一個存放Delayed 元素的無界阻塞佇列,只有在延遲期滿時才能從中提取元素。該佇列的頭部是延遲期滿後儲存時間最長的 Delayed 元素。如果延遲都還沒有期滿,則佇列沒有頭部,並且poll將返回null。當一個元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一個小於或等於零的值時,則出現期滿,poll就以移除這個元素了。此佇列不允許使用 null 元素。

package com.lx.bale;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;

import org.junit.Test;

public class QueueTest {
	public static void main(String[] args) throws InterruptedException {
		final BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>(3);
		queue.add("11");
		queue.add("22");
		queue.add("33");
		new Thread(new Runnable() {
			public void run() {
				try {
					Thread.sleep(3000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println(queue.poll());
			}
		}).start();
		queue.put("44");
		System.out.println(queue.poll());
		System.out.println(queue.poll());
		System.out.println(queue.poll());

	}
	
	@Test
	public void fun01() throws InterruptedException{
		/**
		 * ArrayBlockingQueue 等待時間長的執行緒優先加入,但LinkedBlokingQueue貌似也是一樣 等待長的先加入
		 */
		final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(1);
		queue.offer("baseData");
		
		//執行緒1
		final Thread t1 = new Thread(new Runnable() {
			@Override
			public void run() {
				int count = 0;
				while (true) {
					try {
						Thread.sleep(1000);
						count ++;
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					if (count > 8) {
						//System.out.println("跳出迴圈");
						break;
					}
				}
			}
		});
		t1.start();
		// 啟動執行緒2
		final Thread t2 = new Thread(new Runnable() {
			@Override
			public void run() {
				int count = 0;
				while (true) {
					try {
						Thread.sleep(1000);
						count ++;
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					if (count > 8) {
						//System.out.println("跳出迴圈");
						break;
					}
				}
			}
		});
		t2.start();
		
		
		//插入t1 阻塞
		Thread t1Insert = new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					System.out.println("t1插入阻塞");
					queue.put(t1);
					System.out.println("t1插入成功");
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		});
		t1Insert.start();
		Thread.sleep(1000);
		//插入t2 阻塞
		Thread t2Insert = new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					System.out.println("t2插入阻塞");
					queue.put(t2);
					System.out.println("t2插入成功");
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		});
		t2Insert.start();
		
		
		
		Thread.sleep(3000);
		System.out.println("取出原始物件"+queue.poll());
		
		/*while (true) {
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			if (t1.isAlive()) {
				System.out.println("執行緒存活");
			}else{
				System.out.println("執行緒完畢");
			}
		}*/
		
	}
	@Test
	public void fun02(){
		PriorityBlockingQueue<Object> queue = new PriorityBlockingQueue<Object>();
		queue.put(new String());
	}
}

雙向佇列Deque

一.概述

  Deque是Queue的子介面,我們知道Queue是一種佇列形式,而Deque則是雙向佇列,它支援從兩個端點方向檢索和插入元素,因此Deque既可以支援LIFO形式也可以支援LIFO形式.Deque介面是一種比Stack和Vector更為豐富的抽象資料形式,因為它同時實現了以上兩者.

二.主要方法

修飾符和返回值 方法名 描述
新增功能
void push(E) 向佇列頭部插入一個元素,失敗時丟擲異常 
void addFirst(E) 向佇列頭部插入一個元素,失敗時丟擲異常
void  addLast(E) 向佇列尾部插入一個元素,失敗時丟擲異常
boolean  offerFirst(E) 向佇列頭部加入一個元素,失敗時返回false
boolean  offerLast(E) 向佇列尾部加入一個元素,失敗時返回false
獲取功能
E getFirst() 獲取佇列頭部元素,佇列為空時丟擲異常
getLast() 獲取佇列尾部元素,佇列為空時丟擲異常
peekFirst() 獲取佇列頭部元素,佇列為空時返回null
peekLast() 獲取佇列尾部元素,佇列為空時返回null
刪除功能
boolean removeFirstOccurrence(Object) 刪除第一次出現的指定元素,不存在時返回false
boolean  removeLastOccurrence(Object) 刪除最後一次出現的指定元素,不存在時返回false
彈出功能
E pop() 彈出佇列頭部元素,佇列為空時丟擲異常
E removeFirst() 彈出佇列頭部元素,佇列為空時丟擲異常
removeLast() 彈出佇列尾部元素,佇列為空時丟擲異常
pollFirst() 彈出佇列頭部元素,佇列為空時返回null 
pollLast() 彈出佇列尾部元素,佇列為空時返回null 
迭代器
Iterator<E> descendingIterator() 返回佇列反向迭代器

可以看出Deque在Queue的方法上新添了對佇列頭尾元素的操作,add,remove,get形式的方法會在有界佇列滿員和空佇列時丟擲異常,offer,poll,peek形式的方法則會返回false或null.

此外方法表中需要注意push = addFirst,pop = removeFirst,只是使用了不同的方法名體現隊列表示棧結構時的特點.

三.實現

  同Queue一樣Deque的實現也可以劃分成通用實現和併發實現.

  通用實現主要有兩個實現類ArrayDeque和LinkedList.

  ArrayDeque是個可變陣列,它是在Java 6之後新新增的,而LinkedList是一種連結串列結構的list.LinkedList要比ArrayDeque更加靈活,因為它也實現了List介面的所有操作,並且可以插入null元素,這在ArrayDeque中是不允許的.

  從效率來看,ArrayDeque要比LinkedList在兩端增刪元素上更為高效,因為沒有在節點建立刪除上的開銷.最適合使用LinkedList的情況是迭代佇列時刪除當前迭代的元素.此外LinkedList可能是在遍歷元素時最差的資料結構,並且也LinkedList佔用更多的記憶體,因為LinkedList是通過連結串列連線其整個佇列,它的元素在記憶體中是隨機分佈的,需要通過每個節點包含的前後節點的記憶體地址去訪問前後元素.

  總體ArrayDeque要比LinkedList更優越,在大佇列的測試上有3倍與LinkedList的效能,最好的是給ArrayDeque一個較大的初始化大小,以避免底層陣列擴容時資料拷貝的開銷.

  LinkedBlockingDeque是Deque的併發實現,在佇列為空的時候,它的takeFirst,takeLast會阻塞等待佇列處於可用狀態