同步類容器和並發類容器
一 同步類容器
同步類容器都是線程安全的,但在某些場景中可能需要加鎖來保證復合操作。
符合操作如:叠代(反復訪問元素,遍歷完容器中所有元素)、跳轉(根據指定的順序找到當前元素的下一個元素)、條件運算。這些復合操作在多線程並發地修改容器時,可能會表現出意外的行為,最經典的ConcurrentModificationException,原因是當前容器叠代的過程中,被並發的修改了內容,這是由於早期叠代器設計的時候沒有考慮到並發修改。
同步類容器:如古老的Vector、HashTable。這些容器的同步功能其實都是有JDK的Collections.synchronized***等工廠方法去創建實現的。其底層的機制無非就是用傳統的synchronized關鍵字對每個共用方法都進行同步,使得每次只有一個線程訪問容器的狀態。這不符合今天互聯網高並發的需求。
二 並發類容器
jdk1.5之後,提供了多種並發類容器來替代同步類容器,從而改善性能。同步類容器都是串行化的,它們雖然實現了線程安全,但是嚴重降低了並發性,在多線程環境時,嚴重降低了性能。
並發類容器是專門針對並發設計的,使用ConcurrentHashMap來替代給予散列的傳統的HashTable,而且在ConcurrentHashMap中,添加了一些常見復合操作的支持。以及使用CopyOnWriteArrayList替代Vector,並發的CopyOnWriteArraySet,以及並發的Queue,ConcurrentLinkedQueue和LinkedBlockingQueue,前者是高性能隊列,後者是以阻塞形式的隊列,具體實現Queue還有很多,例如ArrayBlockingQueue、SynchronousQueue、PriorityBlockingQueue等。
2.1 ConcurrentMap
ConcurrentMap接口下兩個重要實現類:ConcurrentHashMap、ConcurrentSkipListMap(支持並發排序功能,彌補ConcurrentHashMap)
ConcurrentHashMap:內部使用段(Segment)來表示這些不同的部分,每個段其實就是一個小的HashTable,他們有自己的鎖。只要多個修改操作發生在不同的段上,它們就可以並發進行。把一個整體分成了16個段,也就是最高支持16個線程的並發修改操作。這也是在多線程場景時減小鎖的粒度從而降低鎖競爭的一種方案,並且代碼中大多共享變量使用volatile關鍵字聲明,目的是第一時間獲取修改的內容,性能非常好。
2.2 CopyOnWrite容器
Copy-On-Write簡稱COW,是一種用於程序設計中的優化策略。
JDK中的COW容器有兩種:CopyOnWriteArrayList、CopyOnWriteArraySet
CopyOnWrite容器即寫時復制的容器。通俗的理解是當我們往一個容器添加元素的時候,不直接往當前容器添加,而是先將當前容器進行Copy,復制出一個新的容器,然後新的容器裏添加元素,添加完後,再將原容器的引用指向新的容器。這樣做的好處是我們可以對CopyOnWrite容器進行並發的讀,而且不需要加鎖,因為當前容器不會添加任何元素。所以CopyOnWrite容器也是一種讀寫分離的思想,讀和寫不同容器。場景:讀多寫少
三 並發Queue
並發==隊列先進先出==,有兩套實現:ConcurrentLinkedQueue接口高性能隊列、BlockingQueue接口為代表的阻塞隊列
3.1 ConcurrentLinkedQueue類
ConcurrentLinkedQueue:是一個適用於高並發場景下的隊列,通過無鎖的方式,實現了高並發狀態下的高性能,通常ConcurrentLinkedQueue性能好於BlockingQueue,它是一個基於鏈接節點的無界線程安全隊列。該隊列的元素遵循先進先出的原則,頭是最先加入的,尾是最近加入的,該隊列不允許null元素。
ConcurrentLinkedQueue隊列的重要方法:
1. add()和offer():都是加入元素的方法
2. poll()、take()和peek():都是取頭元素節點,區別在於前者會刪除元素,後者不會
3.1 BlockingQueue接口
1. ArrayBlockingQueue:基於數組實現的阻塞隊列實現,在ArrayBlockingQueue內部,維護了一個定長數組,以便緩存隊列中的數據對象,其內部==沒實現讀寫分離==,也就意味著生產和消費不能完全同步並行,長度是需要定義的,可以指定先進先出或者後進後出,也叫==有界隊列==,在很多場合非常適用
2. LinkedBlockingQueue:基於鏈表的阻塞隊列,與ArrayBlockingQueue類似,其內部也維持著一個數據緩沖隊列(該隊列由一個鏈表構成),LinkedBlockingQueue之所以能夠高效的處理並發數據,是因為其內部實現采用分離鎖(==讀寫分離==兩個鎖),從而實現生產者和消費者並行,是一個==無界隊列==
3. SynchronousQueue:一種==沒有緩沖==的隊列,生產者生產的數據直接會被消費者獲取並消費。不能添加,元素先take()取出阻塞,當有元素添加,後才取出
``` BlockingQueue<String> synchronousQueue = new SynchronousQueue<>(); synchronousQueue.add("a"); ```
> 運行結果:
Exception in thread "main" java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(AbstractQueue.java:98)
at com.zys.test.test.StringTest.main(StringTest.java:13)
``` public class SynQueue { public static void main(String[] args) { SynchronousQueue<String> queue = new SynchronousQueue<>(); Thread t1 = new Thread(new Runnable() { @Override public void run() { try { System.out.println("-----" + queue.take() + "\n"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } },"t1"); t1.start(); Thread t2 = new Thread(new Runnable() { @Override public void run() { queue.add("aaaa");//先take()取出阻塞,當有元素添加,後才取出 queue.add("bbbb"); } },"t2"); t2.start(); } } ```
> 運行結果:
-----aaaa
Exception in thread "t2"
java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(AbstractQueue.java:98)
at Queue.SynQueue$2.run(SynQueue.java:25)
at java.lang.Thread.run(Thread.java:745)
4. PriorityBlockingQueue:基於優先級的阻塞隊列(優先級的判斷通過構造函數傳入的Compator對象來決定,也就是說傳入隊列的對象必須實現Comparable接口),在實現PriorityBlockingQueue時,內部控制線程同步的鎖采用的公平鎖,也是一個無界隊列
public class Task implements Comparable<Task>{ private int id; private String name; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public int compareTo(Task o) { return this.id > o.id ? 1 : (this.id < o.id ? -1 : 0); } @Override public String toString() { return this.id + "," + this.name; } } public class UsePriorityBlockingQueue { public static void main(String[] args) { PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>(); Task t1 = new Task(); t1.setId(3); t1.setName("task1"); Task t2 = new Task(); t2.setId(6); t2.setName("task2"); Task t3 = new Task(); t3.setId(1); t3.setName("task3"); queue.add(t1); queue.add(t2); queue.add(t3); System.out.println(queue); try { System.out.println(queue.peek());//peek和element取出但不移除 System.out.println(queue.take());//take、poll和remove取出且移除 //優先級排序,並返回優先級最高的元素,並在隊列中刪除取出的元素 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(queue); for (Iterator<Task> iter = queue.iterator() ; iter.hasNext();) { Task task = iter.next(); System.out.println(task.getName()); } } }
> 運行結果:
[1,task3, 6,task2, 3,task1]
1,task3
1,task3
[3,task1, 6,task2]
task1
task2
特點:添加元素時不排序,在取值時根據優先級排序
5. DelayQueue:帶有延遲時間的Queue,其中的元素只有當其指定的==延遲時間==到了,才能夠從隊列中獲取到該元素。DelayQueue中的元素必須實現Delayed接口,DelayQueue是一個沒有大小限制的隊列,應用場景很多,比如對緩存超時的數據進行移除、任務超時處理、空閑連接的關閉等等。
public class Wangmin implements Delayed{ private String id; private String name; private long endTime; private TimeUnit timeUnit = TimeUnit.SECONDS; //設定單位為秒 public Wangmin(String id, String name, long endTime) { super(); this.id = id; this.name = name; this.endTime = endTime; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public long getEndTime() { return endTime; } public void setEndTime(long endTime) { this.endTime = endTime; } /** * 上網時間相互比較,確定排序 */ @Override public int compareTo(Delayed o) { Wangmin min = (Wangmin)o; return this.getDelay(this.timeUnit) - min.getDelay(this.timeUnit) > 0 ? 1 : 0; } /** * 返回延遲時間 */ @Override public long getDelay(TimeUnit unit) { return this.endTime - System.currentTimeMillis(); } } public class WangBa implements Runnable{ private DelayQueue<Wangmin> delayQueue = new DelayQueue<Wangmin>(); public boolean yinye = true; public void shangji(String name, String id, int money) { Wangmin min = new Wangmin(id, name, money * 1000 + System.currentTimeMillis()); System.out.println("網民:" + min.getName() + ",身份證號碼為:" + min.getId() + ",網費:" + money + "\t開始上網,上網結束時間為:" + min.getEndTime()); this.delayQueue.add(min); } public void xiaji(Wangmin min) { System.out.println("網民:" + min.getName() + "下機時間到:" + System.currentTimeMillis()); } @Override public void run() { while(yinye) { try { Wangmin min = delayQueue.take(); xiaji(min); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public static void main(String[] args) { System.out.println("網吧開始營業..."); WangBa wangBa = new WangBa(); Thread shangwang = new Thread(wangBa); shangwang.start(); wangBa.shangji("路人甲", "111", 1); wangBa.shangji("路人乙", "222", 10); wangBa.shangji("路人丙", "333", 5); } }
> 運行結果:
網吧開始營業...
網民:路人甲,身份證號碼為:111,網費:1 開始上網,上網結束時間為:1532587709166
網民:路人乙,身份證號碼為:222,網費:10 開始上網,上網結束時間為:1532587718167
網民:路人丙,身份證號碼為:333,網費:5 開始上網,上網結束時間為:1532587713168
網民:路人甲下機時間到:1532587709166
網民:路人丙下機時間到:1532587713168
網民:路人乙下機時間到:1532587718167
同步類容器和並發類容器