【java】-- java並發包總結
1、同步容器類
1.1、Vector與ArrayList異同
1、Arraylist和Vector都是采用數組方式存儲數據,都允許直接序號索引元素,所以查找速度快,但是插入數據等操作涉及到數組元素移動等內存操作,所以插入數據慢
2、 Vector的方法都是同步的(Synchronized),是線程安全的(thread-safe),而ArrayList的方法不是,由於線程的同步必然要影響性能,因此,ArrayList的性能比Vector好。
3、當Vector或ArrayList中的元素超過它的初始大小時,Vector會將它的容量翻倍,而ArrayList只增加50%的大小,這樣,ArrayList就有利於節約內存空間。
1.2、HashTable與HashMap
1、hashTable是線程安全的,hashMap不是。它們的性能方面的比較類似 Vector和ArrayList
2、HashMap允許將null作為一個entry的key或者value,而Hashtable不允許。HashMap把Hashtable的contains方法去掉了,改成containsvalue和containsKey。
1.3、synchronizedMap
Collections.synchronized*(m) 將線程不安全額集合變為線程安全集合
1.4、ConcurrentHashMap
ConcurrentMap接口下有倆個重要的實現 :
ConcurrentskipListMap (支持並發排序功能。彌補ConcurrentHas hMa p)
ConcurrentHashMap內部使用段(Segment)來表示這些不同的部分,每個段其實就是一個
小的HashTable,它們有自己的鎖。只要多個修改操作發生在不同的段上,它們就可以並
發進行。把一個整體分成了16個段(Segment.也就是最高支持16個線程的並發修改操作。
這也是在重線程場景時減小鎖的粒度從而降低鎖競爭的一種方案。並且代碼中大多共享變
量使用volatile關鍵字聲明,目的是第一時間獲取修改的內容,性能非常好。
1.5、CountDownLatch
作用:CountDownLatch類位於java.util.concurrent包下,利用它可以實現類似計數器的功能。計數器的初始值為線程的數量。每當一個線程完成了自己的任務後,計數器的值就會減1。當計數器值到達0時,它表示所有的線程已經完成了任務,然後在閉鎖上等待的線程就可以恢復執行任務。
如何工作:
CountDownLatch構造器中的計數值(count參數)實際上就是閉鎖需要等待的線程數量。這個值只能被設置一次,而且CountDownLatch沒有提供任何機制去重新設置這個計數值。
與CountDownLatch的第一次交互是主線程等待其他線程。
主線程必須在啟動其他線程後立即調用CountDownLatch.await()方法。這樣主線程的操作就會在這個方法上阻塞,直到其他線程完成各自的任務。
每當一個線程完成了自己的任務後,計數器的值就會減1。當計數器值到達0時,它表示所有的線程已經完成了任務,然後在閉鎖上等待的線程就可以恢復執行任務。
應用舉例:有一個任務A(主線程),它要等待其他2個任務執行完畢之後才能執行,此時就可以利用CountDownLatch來實現這種功能了。
publicclass Test002 { publicstaticvoid main(String[] args) throws InterruptedException { System.out.println("等待子線程執行完畢..."); CountDownLatch countDownLatch = new CountDownLatch(2); new Thread(new Runnable() { @Override publicvoid run() { System.out.println("子線程," + Thread.currentThread().getName() + "開始執行..."); countDownLatch.countDown();// 每次減去1 System.out.println("子線程," + Thread.currentThread().getName() + "結束執行..."); } }).start(); new Thread(new Runnable() { @Override publicvoid run() { System.out.println("子線程," + Thread.currentThread().getName() + "開始執行..."); countDownLatch.countDown(); System.out.println("子線程," + Thread.currentThread().getName() + "結束執行..."); } }).start(); countDownLatch.await();// 調用當前方法主線程阻塞 countDown結果為0, 阻塞變為運行狀態 System.out.println("兩個子線程執行完畢...."); System.out.println("繼續主線程執行.."); } }
使用場景:
- 實現最大的並行性:有時我們想同時啟動多個線程,實現最大程度的並行性。例如,我們想測試一個單例類。如果我們創建一個初始計數為1的CountDownLatch,並讓所有線程都在這個鎖上等待,那麽我們可以很輕松地完成測試。我們只需調用 一次countDown()方法就可以讓所有的等待線程同時恢復執行。
- 開始執行前等待n個線程完成各自任務:例如應用程序啟動類要確保在處理用戶請求前,所有N個外部系統已經啟動和運行了。
- 死鎖檢測:一個非常方便的使用場景是,你可以使用n個線程訪問共享資源,在每次測試階段的線程數目是不同的,並嘗試產生死鎖。
1.6、CyclicBarrier
CyclicBarrier初始化時規定一個數目,然後計算調用了CyclicBarrier.await()進入等待的線程數。當線程數達到了這個數目時,所有進入等待狀態的線程被喚醒並繼續。
CyclicBarrier就象它名字的意思一樣,可看成是個障礙, 所有的線程必須到齊後才能一起通過這個障礙。
CyclicBarrier初始時還可帶一個Runnable的參數, 此Runnable任務在CyclicBarrier的數目達到後,所有其它線程被喚醒前被執行。
class Writer extends Thread { private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier cyclicBarrier){ this.cyclicBarrier=cyclicBarrier; } @Override public void run() { System.out.println("線程" + Thread.currentThread().getName() + ",正在寫入數據"); try { Thread.sleep(3000); } catch (Exception e) { // TODO: handle exception } System.out.println("線程" + Thread.currentThread().getName() + ",寫入數據成功.....");
try { cyclicBarrier.await(); } catch (Exception e) { } System.out.println("所有線程執行完畢.........."); }
}
public class Test001 {
public static void main(String[] args) { CyclicBarrier cyclicBarrier=new CyclicBarrier(5); for (int i = 0; i < 5; i++) { Writer writer = new Writer(cyclicBarrier); writer.start(); } }
}
|
Semaphore
Semaphore是一種基於計數的信號量。它可以設定一個閾值,基於此,多個線程競爭獲取許可信號,做自己的申請後歸還,超過閾值後,線程申請許可信號將會被阻塞。Semaphore可以用來構建一些對象池,資源池之類的,比如數據庫連接池,我們也可以創建計數為1的Semaphore,將其作為一種類似互斥鎖的機制,這也叫二元信號量,表示兩種互斥狀態。它的用法如下:
availablePermits函數用來獲取當前可用的資源數量
wc.acquire(); //申請資源
wc.release();// 釋放資源
// 創建一個計數閾值為5的信號量對象 // 只能5個線程同時訪問 Semaphore semp = new Semaphore(5);
try { // 申請許可 semp.acquire(); try { // 業務邏輯 } catch (Exception e) {
} finally { // 釋放許可 semp.release(); } } catch (InterruptedException e) {
} |
案例:
需求: 一個廁所只有3個坑位,但是有10個人來上廁所,那怎麽辦?假設10的人的編號分別為1-10,並且1號先到廁所,10號最後到廁所。那麽1-3號來的時候必然有可用坑位,順利如廁,4號來的時候需要看看前面3人是否有人出來了,如果有人出來,進去,否則等待。同樣的道理,4-10號也需要等待正在上廁所的人出來後才能進去,並且誰先進去這得看等待的人是否有素質,是否能遵守先來先上的規則。
代碼:
class Parent implements Runnable { private String name; private Semaphore wc; public Parent(String name,Semaphore wc){ this.name=name; this.wc=wc; } @Override public void run() { try { // 剩下的資源(剩下的茅坑) int availablePermits = wc.availablePermits(); if (availablePermits > 0) { System.out.println(name+"天助我也,終於有茅坑了..."); } else { System.out.println(name+"怎麽沒有茅坑了..."); } //申請茅坑 如果資源達到3次,就等待 wc.acquire(); System.out.println(name+"終於輪我上廁所了..爽啊"); Thread.sleep(new Random().nextInt(1000)); // 模擬上廁所時間。 System.out.println(name+"廁所上完了..."); wc.release();
} catch (Exception e) {
} } } public class TestSemaphore02 { public static void main(String[] args) { // 一個廁所只有3個坑位,但是有10個人來上廁所,那怎麽辦?假設10的人的編號分別為1-10,並且1號先到廁所,10號最後到廁所。那麽1-3號來的時候必然有可用坑位,順利如廁,4號來的時候需要看看前面3人是否有人出來了,如果有人出來,進去,否則等待。同樣的道理,4-10號也需要等待正在上廁所的人出來後才能進去,並且誰先進去這得看等待的人是否有素質,是否能遵守先來先上的規則。 Semaphore semaphore = new Semaphore(3); for (int i = 1; i <=10; i++) { Parent parent = new Parent("第"+i+"個人,",semaphore); new Thread(parent).start(); } } }
|
並發隊列
在並發隊列上JDK提供了兩套實現,一個是以ConcurrentLinkedQueue為代表的高性能隊
列,一個是以BlockingQueue接口為代表的阻塞隊列,無論哪種都繼承自Queue。
ConcurrentLinkedDeque
ConcurrentLinkedQueue : 是一個適用於高並發場景下的隊列,通過無鎖的方式,實現
了高並發狀態下的高性能,通常ConcurrentLinkedQueue性能好於BlockingQueue.它
是一個基於鏈接節點的無界線程安全隊列。該隊列的元素遵循先進先出的原則。頭是最先
加入的,尾是最近加入的,該隊列不允許null元素。
ConcurrentLinkedQueue重要方法:
add 和offer() 都是加入元素的方法(在ConcurrentLinkedQueue中這倆個方法沒有任何區別)
poll() 和peek() 都是取頭元素節點,區別在於前者會刪除元素,後者不會。
ConcurrentLinkedDeque q = new ConcurrentLinkedDeque(); q.offer("余勝軍"); q.offer("碼雲"); q.offer("螞蟻課堂"); q.offer("張傑"); q.offer("艾姐"); //從頭獲取元素,刪除該元素 System.out.println(q.poll()); //從頭獲取元素,不刪除該元素 System.out.println(q.peek()); //獲取總長度 System.out.println(q.size()); |
BlockingQueue
阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:
在隊列為空時,獲取元素的線程會等待隊列變為非空。
當隊列滿時,存儲元素的線程會等待隊列可用。
阻塞隊列常用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裏拿元素。
BlockingQueue即阻塞隊列,從阻塞這個詞可以看出,在某些情況下對阻塞隊列的訪問可能會造成阻塞。被阻塞的情況主要有如下兩種:
1. 當隊列滿了的時候進行入隊列操作
2. 當隊列空了的時候進行出隊列操作
因此,當一個線程試圖對一個已經滿了的隊列進行入隊列操作時,它將會被阻塞,除非有另一個線程做了出隊列操作;同樣,當一個線程試圖對一個空隊列進行出隊列操作時,它將會被阻塞,除非有另一個線程進行了入隊列操作。
在Java中,BlockingQueue的接口位於java.util.concurrent 包中(在Java5版本開始提供),由上面介紹的阻塞隊列的特性可知,阻塞隊列是線程安全的。
在新增的Concurrent包中,BlockingQueue很好的解決了多線程中,如何高效安全“傳輸”數據的問題。通過這些高效並且線程安全的隊列類,為我們快速搭建高質量的多線程程序帶來極大的便利。本文詳細介紹了BlockingQueue家庭中的所有成員,包括他們各自的功能以及常見使用場景。
認識BlockingQueue
阻塞隊列,顧名思義,首先它是一個隊列,而一個隊列在數據結構中所起的作用大致如下圖所示:
從上圖我們可以很清楚看到,通過一個共享的隊列,可以使得數據由隊列的一端輸入,從另外一端輸出;
常用的隊列主要有以下兩種:(當然通過不同的實現方式,還可以延伸出很多不同類型的隊列,DelayQueue就是其中的一種)
先進先出(FIFO):先插入的隊列的元素也最先出隊列,類似於排隊的功能。從某種程度上來說這種隊列也體現了一種公平性。
後進先出(LIFO):後插入隊列的元素最先出隊列,這種隊列優先處理最近發生的事件。
多線程環境中,通過隊列可以很容易實現數據共享,比如經典的“生產者”和“消費者”模型中,通過隊列可以很便利地實現兩者之間的數據共享。假設我們有若幹生產者線程,另外又有若幹個消費者線程。如果生產者線程需要把準備好的數據共享給消費者線程,利用隊列的方式來傳遞數據,就可以很方便地解決他們之間的數據共享問題。但如果生產者和消費者在某個時間段內,萬一發生數據處理速度不匹配的情況呢?理想情況下,如果生產者產出數據的速度大於消費者消費的速度,並且當生產出來的數據累積到一定程度的時候,那麽生產者必須暫停等待一下(阻塞生產者線程),以便等待消費者線程把累積的數據處理完畢,反之亦然。然而,在concurrent包發布以前,在多線程環境下,我們每個程序員都必須去自己控制這些細節,尤其還要兼顧效率和線程安全,而這會給我們的程序帶來不小的復雜度。好在此時,強大的concurrent包橫空出世了,而他也給我們帶來了強大的BlockingQueue。(在多線程領域:所謂阻塞,在某些情況下會掛起線程(即阻塞),一旦條件滿足,被掛起的線程又會自動被喚醒)
下面兩幅圖演示了BlockingQueue的兩個常見阻塞場景:
ArrayBlockingQueue
ArrayBlockingQueue是一個有邊界的阻塞隊列,它的內部實現是一個數組。有邊界的意思是它的容量是有限的,我們必須在其初始化的時候指定它的容量大小,容量大小一旦指定就不可改變。
ArrayBlockingQueue是以先進先出的方式存儲數據,最新插入的對象是尾部,最新移出的對象是頭部。下面
是一個初始化和使用ArrayBlockingQueue的例子:
ArrayBlockingQueue<String> arrays = new ArrayBlockingQueue<String>(3); arrays.add("李四"); arrays.add("張軍"); arrays.add("張軍"); // 添加阻塞隊列 arrays.offer("張三", 1, TimeUnit.SECONDS); |
LinkedBlockingQueue
LinkedBlockingQueue阻塞隊列大小的配置是可選的,如果我們初始化時指定一個大小,它就是有邊界的,如果不指定,它就是無邊界的。說是無邊界,其實是采用了默認大小為Integer.MAX_VALUE的容量 。它的內部實現是一個鏈表。
和ArrayBlockingQueue一樣,LinkedBlockingQueue 也是以先進先出的方式存儲數據,最新插入的對象是尾部,最新移出的對象是頭部。下面是一個初始化和使LinkedBlockingQueue的例子:
LinkedBlockingQueuelinkedBlockingQueue = new LinkedBlockingQueue(3); linkedBlockingQueue.add("張三"); linkedBlockingQueue.add("李四"); linkedBlockingQueue.add("李四"); System.out.println(linkedBlockingQueue.size()); |
PriorityBlockingQueue
PriorityBlockingQueue是一個沒有邊界的隊列,它的排序規則和 java.util.PriorityQueue一樣。需要註
意,PriorityBlockingQueue中允許插入null對象。
所有插入PriorityBlockingQueue的對象必須實現 java.lang.Comparable接口,隊列優先級的排序規則就
是按照我們對這個接口的實現來定義的。
另外,我們可以從PriorityBlockingQueue獲得一個叠代器Iterator,但這個叠代器並不保證按照優先級順
序進行叠代。
下面我們舉個例子來說明一下,首先我們定義一個對象類型,這個對象需要實現Comparable接口:
SynchronousQueue
SynchronousQueue隊列內部僅允許容納一個元素。當一個線程插入一個元素後會被阻塞,除非這個元素被另一個線程消費。
使用BlockingQueue模擬生產者與消費者
class ProducerThread implements Runnable { private BlockingQueue queue; private volatile boolean flag = true; private static AtomicInteger count = new AtomicInteger(); public ProducerThread(BlockingQueue queue) { this.queue = queue; }
@Override public void run() { try { System.out.println("生產線程啟動..."); while (flag) { System.out.println("正在生產數據...."); String data = count.incrementAndGet()+""; // 將數據存入隊列中 boolean offer = queue.offer(data, 2, TimeUnit.SECONDS); if (offer) { System.out.println("生產者,存入" + data + "到隊列中,成功."); } else { System.out.println("生產者,存入" + data + "到隊列中,失敗."); } Thread.sleep(1000); } } catch (Exception e) {
} finally { System.out.println("生產者退出線程"); }
}
public void stop() { this.flag = false; } }
class ConsumerThread implements Runnable { private BlockingQueue<String> queue; private volatile boolean flag = true;
public ConsumerThread(BlockingQueue<String> queue) { this.queue = queue;
}
@Override public void run() { System.out.println("消費線程啟動..."); try { while (flag) { System.out.println("消費者,正在從隊列中獲取數據.."); String data = queue.poll(2, TimeUnit.SECONDS); if (data != null) { System.out.println("消費者,拿到隊列中的數據data:" + data); Thread.sleep(1000); } else { System.out.println("消費者,超過2秒未獲取到數據.."); flag = false; }
} } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("消費者退出線程..."); }
}
}
public class ProducerAndConsumer { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10); ProducerThread producerThread1 = new ProducerThread(queue); ProducerThread producerThread2 = new ProducerThread(queue); ConsumerThread consumerThread1 = new ConsumerThread(queue); Thread t1 = new Thread(producerThread1); Thread t2 = new Thread(producerThread2); Thread c1 = new Thread(consumerThread1); t1.start(); t2.start(); c1.start();
// 執行10s Thread.sleep(10 * 1000); producerThread1.stop(); producerThread2.stop();
} }
|
【java】-- java並發包總結