1. 程式人生 > >java並發容器(Map、List、BlockingQueue)具體解釋

java並發容器(Map、List、BlockingQueue)具體解釋

current 兩種 避免 由於 新的 rom 就會 family err

Java庫本身就有多種線程安全的容器和同步工具,當中同步容器包含兩部分:一個是VectorHashtable。另外還有JDK1.2中增加的同步包裝類。這些類都是由Collections.synchronizedXXX工廠方法。

同步容器都是線程安全的,可是對於復合操作。缺有些缺點:

① 叠代:在查覺到容器在叠代開始以後被改動,會拋出一個未檢查異常ConcurrentModificationException,為了避免這個異常,須要在叠代期間,持有一個容器鎖。可是鎖的缺點也非常明顯。就是對性能的影響。

② 隱藏叠代器:StringBuildertoString方法會通過叠代容器中的每一個元素,另外容器的

hashCodeequals方法也會間接地調用叠代。

類似地,contailAllremoveAllretainAll方法。以及容器作為參數的構造函數,都會對容器進行叠代。

③ 缺少即增加等一些復合操作

public static Object getLast(Vector list) {

int lastIndex = list.size() - 1;

return list.get(lastIndex);

}

public static void deleteLast(Vector list) {

int lastIndex = list.size() - 1;

list.remove(lastIndex);

}

getLastdeleteLast都是復合操作。由先前對原子性的分析能夠推斷,這依舊存在線程安全問題。有可能會拋出ArrayIndexOutOfBoundsException的異常,錯誤產生的邏輯例如以下所看到的:


解決的方法就是通過對這些復合操作加鎖

1 並發容器類

正是因為同步容器類有以上問題,導致這些類成了雞肋,於是Java 5推出了並發容器類,Map相應的有ConcurrentHashMapList相應的有CopyOnWriteArrayList。與同步容器類相比。它有下面特性:

1.1 ConcurrentHashMap

· 更加細化的鎖機制。同步容器直接把容器對象做為鎖,這樣就把全部操作串行化,事實上這是不是必需的,過於悲觀,而並發容器採用更細粒度的鎖機制。名叫分離鎖。保證一些不會發生並發問題的操作進行並行運行

· 附加了一些原子性的復合操作。比方putIfAbsent方法

· 叠代器的弱一致性。而非“及時失敗”。它在叠代過程中不再拋出Concurrentmodificationexception異常,而是弱一致性。

· 在並發高的情況下,有可能sizeisEmpty方法不準確。但真正在並發環境下這些方法也沒什麽作用。

· 另外。它另一些附加的原子操作。缺少即增加、相等便移除、相等便替換

putIfAbsent(K key, V value),缺少即增加(假設該鍵已經存在,則不增加)
假設指定鍵已經不再與某個值相關聯。則將它與給定值關聯。

類似於以下的操作

If(!map.containsKey(key)){

return map.put(key,value);

}else{

return map.get(key);

}

remove(Object key, Object value),相等便移除
僅僅有眼下將鍵的條目映射到給定值時,才移除該鍵的條目。

類似於以下的:

if(map.containsKey(key) && map.get(key).equals(value)){

Map.remove();

return true;

}else{

return false;

}

replace(K key, V value)

replace(K key, V oldValue, V newValue),相等便替換。
僅僅有眼下將鍵的條目映射到某一值時,才替換該鍵的條目。

上面提到的三個。都是原子的。在一些緩存應用中能夠考慮取代HashMap/Hashtable

1.2 CopyOnWriteArrayListCopyOnWriteArraySet

· CopyOnWriteArrayList採用寫入時復制的方式避開並發問題。這事實上是通過冗余和不可變性來解決並發問題,在性能上會有比較大的代價,但假設寫入的操作遠遠小於叠代和讀操作,那麽性能就區別不大了。

應用它們的場合通常在數組相對較小。而且遍歷操作的數量大大超過可變操作的數量時。這樣的場合應用它們很好。它們全部可變的操作都是先取得後臺數組的副本,對副本進行更改。然後替換副本。這樣能夠保證永遠不會拋出ConcurrentModificationException移除。

2 隊列

Java中的隊列接口就是Queue。它有會拋出異常的addremove方法。在隊尾插入元素以及對頭移除元素。還有不會拋出異常的,相應的offerpoll方法。

2.1 LinkedList

List實現了deque接口以及List接口,能夠將它看做是這兩種的不論什麽一種。

Queue queue=new LinkedList();

queue.offer("testone");

queue.offer("testtwo");

queue.offer("testthree");

queue.offer("testfour");

System.out.println(queue.poll());//testone

2.2 PriorityQueue

一個基於優先級堆(簡單的使用鏈表的話。可能插入的效率會比較低O(N)的無界優先級隊列。

優先級隊列的元素依照其自然順序進行排序,或者依據構造隊列時提供的 Comparator 進行排序,詳細取決於所使用的構造方法。

優先級隊列不同意使用 null 元素。依靠自然順序的優先級隊列還不同意插入不可比較的對象。

queue=new PriorityQueue();

queue.offer("testone");

queue.offer("testtwo");

queue.offer("testthree");

queue.offer("testfour");

System.out.println(queue.poll());//testfour

2.3 ConcurrentLinkedQueue

基於鏈節點的。線程安全的隊列。

並發訪問不須要同步。在隊列的尾部加入元素,並在頭部刪除他們。

所以僅僅要不須要知道隊列的大小。並發隊列就是比較好的選擇。

3 堵塞隊列

3.1 生產者和消費者模式

生產者和消費者模式,生產者不須要知道消費者的身份或者數量,甚至根本沒有消費者。他們僅僅負責把數據放入隊列。

類似地。消費者也不須要知道生產者是誰,以及是誰給他們安排的工作。

Java知道大家清楚這個模式的並發復雜性,於是乎提供了堵塞隊列(BlockingQueue)來滿足這個模式的需求。堵塞隊列說起來非常easy,就是當隊滿的時候寫線程會等待,直到隊列不滿的時候;當隊空的時候讀線程會等待。直到隊不空的時候。實現這樣的模式的方法非常多,其差別也就在於誰的消耗更低和等待的策略更優。以LinkedBlockingQueue的詳細實現為例,它的put源代碼例如以下:

public void put(E e) throws InterruptedException {

if (e == null) throw new NullPointerException();

int c = -1;

final ReentrantLock putLock = this.putLock;

final AtomicInteger count = this.count;

putLock.lockInterruptibly();

try {

try {

while (count.get() == capacity)

notFull.await();

} catch (InterruptedException ie) {

notFull.signal();

// propagate to a non-interrupted thread

throw ie;

}

insert(e);

c = count.getAndIncrement();

if (c + 1 < capacity)

notFull.signal();

} finally {

putLock.unlock();

}

if (c == 0)

signalNotEmpty();

}

撇開其鎖的詳細實現,其流程就是我們在操作系統課上學習到的標準生產者模式,看來那些枯燥的理論還是實用武之地的。當中,最核心的還是Java的鎖實現,有興趣的朋友能夠再進一步深究一下

堵塞隊列Blocking queue,提供了可堵塞的puttake方法,他們與可定時的offerpoll方法是等價。Put方法簡化了處理。假設是有界隊列。那麽當隊列滿的時候,生成者就會堵塞,從而改消費者很多其它的追趕速度。

3.2 ArrayBlockingQueueLinkedBlockingQueue

FIFO的隊列。與LinkedList(由鏈節點支持。無界)ArrayList(由數組支持。有界)相似(Linked有更好的插入和移除性能。Array有更好的查找性能,考慮到堵塞隊列的特性,移除頭部,增加尾部,兩個都差別不大)。可是卻擁有比同步List更好的並發性能。

另外,LinkedList永遠不會等待,由於他是無界的。

BlockingQueue<String> queue=new ArrayBlockingQueue<String>(5);

Producer p=new Producer(queue);

Consumer c1=new Consumer(queue);

Consumer c2=new Consumer(queue);

new Thread(p).start();

new Thread(c1).start();

new Thread(c2).start();

class Producer implements Runnable {

private final BlockingQueue queue;

Producer(BlockingQueue q) { queue = q; }

public void run() {

try {

for(int i=0;i<100;i++){

queue.put(produce());

}

} catch (InterruptedException ex) {}

}

String produce() {

String temp=""+(char)(‘A‘+(int)(Math.random()*26));

System.out.println("produce"+temp);

return temp;

}

}

class Consumer implements Runnable {

private final BlockingQueue queue;

Consumer(BlockingQueue q) { queue = q; }

public void run() {

try {

for(int i=0;i<100;i++){

consume(queue.take());

}

} catch (InterruptedException ex) {}

}

void consume(Object x) {

System.out.println("cousume"+x.toString());

}

}

輸出:

produceK

cousumeK

produceV

cousumeV

produceQ

cousumeQ

produceI

produceD

produceI

produceG

produceA

produceE

cousumeD

3.3 PriorityBlockingQueue

一個按優先級堆支持的無界優先級隊列隊列,假設不希望依照FIFO的順序進行處理,它很實用。

它能夠比較元素本身的自然順序。也能夠使用一個Comparator排序。

3.4 DelayQueue

一個優先級堆支持的。基於時間的調度隊列。加入到隊列中的元素必須實現新的Delayed接口(僅僅有一個方法。Long getDelay(java.util.concurrent.TimeUnit unit))。加入能夠理馬上返回。可是在延遲時間過去之前。不能從隊列中取出元素,假設多個元素的延遲時間已到。那麽最早失效鏈接/失效時間最長的元素將第一個取出。

static class NanoDelay implements Delayed{

long tigger;

NanoDelay(long i){

tigger=System.nanoTime()+i;

}

public boolean equals(Object other){

return ((NanoDelay)other).tigger==tigger;

}

public long getDelay(TimeUnit unit) {

long n=tigger-System.nanoTime();

return unit.convert(n, TimeUnit.NANOSECONDS);

}

public long getTriggerTime(){

return tigger;

}

public int compareTo(Delayed o) {

long i=tigger;

long j=((NanoDelay)o).tigger;

if(i<j){

return -1;

}

if(i>j)

return 1;

return 0;

}

}

public static void main(String[] args) throws InterruptedException{

Random random=new Random();

DelayQueue<NanoDelay> queue=new DelayQueue<NanoDelay>();

for(int i=0;i<5;i++){

queue.add(new NanoDelay(random.nextInt(1000)));

}

long last=0;

for(int i=0;i<5;i++){

NanoDelay delay=(NanoDelay)(queue.take());

long tt=delay.getTriggerTime();

System.out.println("Trigger time:"+tt);

if(i!=0){

System.out.println("Data: "+(tt-last));

}

last=tt;

}

}

3.5 SynchronousQueue

不是一個真正的隊列,由於它不會為隊列元素維護不論什麽存儲空間,只是它維護一個排隊的線程清單。這些線程等待把元素增加(enqueue)隊列或者移出(dequeue)隊列。

也就是說。它很直接的移交工作,降低了生產者和消費者之間移動數據的延遲時間,另外。也能夠更快的知道反饋信息,當移交被接受時,它就知道消費者已經得到了任務。

由於SynChronousQueue沒有存儲的能力。所以除非還有一個線程已經做好準備,否則puttake會一直阻止。它僅僅有在消費者比較充足的時候比較合適

4 雙端隊列(Deque

JAVA6中新增了兩個容器DequeBlockingDeque,他們分別擴展了QueueBlockingQueueDeque它是一個雙端隊列,同意高效的在頭和尾分別進行插入和刪除。它的實現各自是ArrayDequeLinkedBlockingQueue

雙端隊列使得他們可以工作在一種稱為“竊取工作”的模式上面。

5 最佳實踐

1..同步的(synchronized+HashMap,假設不存在,則計算,然後增加,該方法須要同步。

HashMap cache=new HashMap();

public synchronized V compute(A arg){

V result=cace.get(arg);

Ii(result==null){

result=c.compute(arg);

Cache.put(result);

}

Return result;

}

2.用ConcurrentHashMap取代HashMap+同步.。這種在getset的時候也基本能保證原子性。可是會帶來反復計算的問題.

Map<A,V> cache=new ConcurrentHashMap<A,V>();

public V compute(A arg){

V result=cace.get(arg);

Ii(result==null){

result=c.compute(arg);

Cache.put(result);

}

Return result;

}

3.採用FutureTask取代直接存儲值,這樣能夠在一開始創建的時候就將Task增加

Map<A,FutureTask<V>> cache=new ConcurrentHashMap<A,FutureTask<V>>();

public V compute(A arg){

FutureTask <T> f=cace.get(arg);

//檢查再執行的缺陷

Ii(f==null){

Callable<V> evel=new Callable(){

Public V call() throws ..{

return c.compute(arg);

}

};

FutureTask <T> ft=new FutureTask<T>(evel);

f=ft;

cache.put(arg,ft;

ft.run();

}

Try{

//堵塞,直到完畢

return f.get();

}cach(){

}

}

4.上面還有檢查再執行的缺陷,在高並發的情況下啊,兩方都沒發現FutureTask,而且都放入Map(後一個被前一個替代),都開始了計算。

這裏的解決方式在於。當他們都要放入Map的時候。假設能夠有原子方法。那麽已經有了以後,後一個FutureTask就增加,而且啟動。

public V compute(A arg){

FutureTask <T> f=cace.get(arg);

//檢查再執行的缺陷

Ii(f==null){

Callable<V> evel=new Callable(){

Public V call() throws ..{

return c.compute(arg);

}

};

FutureTask <T> ft=new FutureTask<T>(evel);

f=cache.putIfAbsent(args,ft); //假設已經存在。返回存在的值。否則返回null

if(f==null){f=ft;ft.run();} //曾經不存在。說明應該開始這個計算

else{ft=null;} //取消該計算

}

Try{

//堵塞,直到完畢

return f.get();

}cach(){

}

}

5.上面的程序上來看已經完美了。只是可能帶來緩存汙染的可能性。假設一個計算被取消或者失敗,那麽這個鍵以後的值永遠都是失敗了;一種解決方式是,發現取消或者失敗的task,就移除它,假設有Exception,也移除。

6.另外,假設考慮緩存過期的問題,能夠為每一個結果關聯一個過去時間。並周期性的掃描,清除過期的緩存

(過期時間能夠用Delayed接口實現,參考DelayQueue,給他一個大於當前時間XXX的時間,,而且不斷減去當前時間,直到返回負數,說明延遲時間已到了。

Java集合容器總結。


按數據結構主要有下面幾類


1,內置容器:數組
2,list容器:Vetor,Stack。ArrayList,LinkedList,
CopyOnWriteArrayList(1.5),AttributeList(1.5),RoleList(1.5),RoleUnresolvedList(1.5),
ConcurrentLinkedQueue(1.5),ArrayBlockingQueue(1.5),LinkedBlockingQueue(1.5)。
PriorityQueue(1.5)。PriorityBlockingQueue(1.5),SynchronousQueue(1.5)
3,set容器:HashSet(1.2),LinkedHashSet(1.4),TreeSet(1.2),
CopyOnWriteArraySet(1.5)。EnumSet(1.5),JobStateReasons。
4,map容器:Hashtable,HashMap(1.2),TreeMap(1.2)。LinkedHashMap(1.4),WeakHashMap(1.2)。
IdentityHashMap(1.4)。ConcurrentMap(1.5),concurrentHashMap(1.5)。
Set 接口繼承 Collection,但不同意反復,使用自己內部的一個排列機制。
List 接口繼承 Collection。同意反復。以元素安插的次序來放置元素,不會又一次排列。
Map接口是一組成對的鍵-值對象,即所持有的是key-value pairs。

Map中不能有反復的key。擁有自己的內部排列機制


按新舊主要有下面幾類
Java1.2前的容器:Vector,Stack,Hashtable。
Java1.2的容器:HashSet,TreeSet。HashMap。TreeMap,WeakHashMap
Java1.4的容器:LinkedHashSet,LinkedHashMap,IdentityHashMap,ConcurrentMap。concurrentHashMap
java1.5新增:CopyOnWriteArrayList,AttributeList,RoleList,RoleUnresolvedList,
ConcurrentLinkedQueue,ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue
ArrayBlockingQueue,CopyOnWriteArraySet。EnumSet。
未知:JobStateReasons
按線程安全主要有下面幾類
線程安全
一。使用鎖
全然不支持並發
list容器:Vetor,Stack。CopyOnWriteArrayList,ArrayBlockingQueue,
LinkedBlockingQueue,PriorityBlockingQueue,SynchronousQueue
set容器:CopyOnWriteArraySet
map容器:Hashtable
部分支持並發
list容器:無
set容器:無
map容器:concurrentHashMap
使用非堵塞算法
list容器:ConcurrentLinkedQueue
set容器:無
map容器:無
二。非線程安全
list容器:ArrayList,LinkedList。AttributeList,RoleList,RoleUnresolvedList,PriorityQueue
set容器:HashSet。TreeSet,LinkedHashSet。EnumSet
map容器:HashMap,TreeMap,LinkedHashMap,WeakHashMap,IdentityHashMap,EnumMap
按遍歷安全主要有下面幾類
一,遍歷安全
可並發遍歷
list容器:CopyOnWriteArrayList。ConcurrentLinkedQueue
set容器:CopyOnWriteArraySet,EnumSet,EnumMap
map容器:無
不可並發遍歷
list容器:Vetor。Stack,Hashtable。ArrayBlockingQueue,
LinkedBlockingQueue,PriorityBlockingQueue,SynchronousQueue
set容器:無
map容器:Hashtable,concurrentHashMap
註意1:concurrentHashMap叠代器它們不會拋出ConcurrentModificationException。

只是,叠代器被設計成每次僅由一個線程使用。
二,遍歷不安全
會拋異常ConcurrentModificationException:
list容器:ArrayList,LinkedList,AttributeList,RoleList,RoleUnresolvedList
set容器:HashSet,TreeSet,TreeSet,LinkedHashSet
map容器:HashMap。TreeMap,LinkedHashMap,WeakHashMap,IdentityHashMap
註意1:返回的叠代器是弱一致 的:它們不會拋出 ConcurrentModificationException,
也不一定顯示在叠代進行時發生的不論什麽映射改動的效果的容器有:
EnumSet,EnumMap
按遍歷是否有序性分類
存儲數據有序
list容器: ConcurrentLinkedQueue(1.5),ArrayBlockingQueue(1.5),LinkedBlockingQueue(1.5),
SynchronousQueue(1.5)
set容器:TreeSet(1.2).(他們實現了set接口),
CopyOnWriteArraySet(1.5),EnumSet(1.5),JobStateReasons。
map容器:TreeMap(1.2)。LinkedHashMap(1.4)
一定規則下存儲數據有序
list容器:Stack,Vetor,ArrayList,LinkedList,CopyOnWriteArrayList(1.5),AttributeList(1.5),RoleList(1.5),RoleUnresolvedList(1.5)
set容器:無
map容器:無
遍歷無序但移除有序
list容器:PriorityQueue(1.5)。PriorityBlockingQueue(1.5)
set容器:無
map容器:無
不管怎樣都無序
list容器:無
set容器:HashSet(1.2),LinkedHashSet(1.4)
map容器:Hashtable,HashMap(1.2),WeakHashMap(1.2)。IdentityHashMap(1.4)。
ConcurrentMap(1.5),concurrentHashMap(1.5)
能夠按自然順序(參見 Comparable)或比較器進行排序的有
list容器:PriorityQueue(1.5)。PriorityBlockingQueue
set容器:TreeSet(1.2)
map容器:TreeMap(1.2)
實現了RandomAccess接口的有
ArrayList, AttributeList, CopyOnWriteArrayList, RoleList, RoleUnresolvedList, Stack, Vector
RandomAccess接口是List 實現所使用的標記接口,用來表明其支持高速(一般是固定時間)隨機訪問。此接口的主要目的是同意一般的算法更改其行為,從而在將其應用到隨機或連續訪問列表時能提供良好的性能。


在對List特別的遍歷算法中。要盡量來推斷是屬於 RandomAccess(如ArrayList)還是SequenceAccess(如LinkedList),
由於適合RandomAccess List的遍歷算法,用在SequenceAccess List上就區別非常大。
即對於實現了RandomAccess接口的類實例而言,此循環
for (int i=0, i<list.size(); i++)
list.get(i);
的執行速度要快於下面循環:
for (Iterator i=list.iterator(); i.hasNext(); )
i.next();

java並發容器(Map、List、BlockingQueue)具體解釋