Java併發程式設計實戰:併發基礎構建模組
一、同步容器類
同步容器類包括Vector和Hashtable以及一些功能相似的類,這些同步的封裝器類是由Collections.synchronizedXxx等工廠方法建立的。這些類實現執行緒安全的方式是:將它們的狀態封裝起來(即設為私有,使得外部無法直接訪問,只能通過公有方法來訪問),並對每個公有方法都進行同步,使得每次只有一個執行緒能訪問容器的狀態。
同步容器類的問題:
同步容器類都是執行緒安全的,但在某些情況下可能需要額外的客戶端加鎖來保證複合操作。如下程式碼:
考慮這種情況:執行緒A在包含1個元素的Vector上呼叫getLast並執行到(1)語句,同時執行緒B在同一個Vector上呼叫了deleteLast並已經執行完(4)語句,這是執行緒A在往下執行(2)語句時就會丟擲異常。public static Object getLast(Vector list){ int lastIndex = list.size() - 1; //(1) return list.get(lastIndex); //(2) } public static void deleteLast(Vector list){ int lastIndex = list.size() - 1; //(3) list.remove(lastIndex); //(4) }
同步容器類通過其自身的鎖來保護它的每個方法,通過獲得容器類的鎖,我們可以使getLast和deleteLast成為原子操作,並確保Vector的大小在呼叫size和get之間不會發生變化,如下程式碼所示:
而在進行list的迭代時,也要加鎖。public static Object getLast(Vector list){ synchronized(list){ int lastIndex = list.size() - 1; return list.get(lastIndex); } } public static void deleteLast(Vector list){ synchronized(list){ int lastIndex = list.size() - 1; list.remove(lastIndex); } }
二、併發容器
同步容器將所有對容器狀態的訪問都序列化,以實現它們的執行緒安全性。這種方法嚴重降低併發性。Java5.0提供了多種併發容器類來改進同步容器的效能。1、ConcurrentHashMap
用來替代同步且基於雜湊的Map。在Hashtable和synchronizedMap中,獲得Map的鎖能防止其他執行緒訪問這個Map。ConcurrentHashMap並不是每個方法都在同一個鎖上同步並使得每次只有一個執行緒訪問容器,而是使用一種粒度更細的加鎖機制來實現更大程度的共享,這種機制稱為分段鎖。在這種機制中,任意數量的讀取執行緒可以併發地訪問Map,執行讀取操作的執行緒和執行寫入操作的執行緒可以併發地訪問Map,並且一定數量的寫入執行緒可以併發地修改Map。(ConcurrentHashMap詳細介紹參照部落格)ConcurrentHashMap提供的迭代器不會丟擲ConcurrentModificationException,因此不需要在迭代過程中對容器加鎖。
在ConcurrentMap中還聲明瞭一下複合的操作,並保證了這些操作的原子性,如下程式碼所示:
public interface ConcurrentMap<K,V> extends Map<K,V>{ //僅當K沒有相應的對映值時才插入 V putIfAbsent(K key,V value); //僅當K被對映到V時才移除 boolean putIfAbsent(K key,V value); //僅當K被對映到oldValue時才替換為newValue boolean replace(K key,V oldValue,V newValue); //僅當K被對映到某個值時才替換為newValue V replace(K key,V newValue); }
2、CopyOnWriteArrayList/CopyOnWriteArraySet
CopyOnWriteArrayList用於在遍歷操作為主要操作的情況下代替同步的List。CopyOnWriteArraySet用於替代同步Set。CopyOnWrite容器即寫時複製的容器。當我們往一個容器新增元素時,不直接往當前容器新增,而是先將當前容器進行copy,複製出一個新的容器,然後往新的容器裡新增元素,新增完元素之後,再將原容器的引用指向新的容器。這樣做的好處是我們可以對CopyOnWrite容器進行併發的讀,而不需要加鎖,因為讀的容器時舊的容器,而我們操作的
容器是新複製的容器。所以CopyOnWrite容器也是一種讀寫分離的思想,讀和寫的是不同的容器。
以下程式碼是向CopyOnWriteArrayList中add元素的方法實現,可以發現在新增的時候是需要加鎖的,否則多執行緒寫的時候會copy出N個副本。
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
CopyOnWrite併發容器用於讀多寫少的併發場景,也有很明顯的缺點:(1)記憶體佔用問題:在進行寫操作的時候,記憶體裡會同時有兩個物件的空間佔用,舊的物件和新寫入的物件。如果這些物件佔用的記憶體比較大,那這個時候很有可能造成頻繁的Yong GC和Full GC。針對記憶體佔用問題,可以通過壓縮容器中的元素的方法來減少大物件的記憶體消耗。
(2)資料一致性問題:CopyOnWrite只能保證資料的最終一致性,不能保證資料的實時一致性。
三、阻塞佇列
阻塞佇列提供了可阻塞的put和take方法,以及支援定時的offer和poll方法。如果佇列已經滿了,那麼put方法將阻塞直到有空間可用;如果佇列為空,那麼take方法將會阻塞直到有元素可用。佇列可以是有界的也可以是無界的,無界佇列永遠都不會充滿,因此無界佇列上的put方法也永遠不會阻塞。
在類庫中包含了BlockingQueue的多種實現,其中,LinkedBlockingQueue和ArrayBlockingQueue是FIFO佇列,二者分別與LinkedList和ArrayList類似,但比同步List擁有更好的併發效能。
PriorityBlockingQueue是一個按優先順序排序的佇列,當希望按照某種順序而不是FIFO來處理元素時,這個佇列將非常有用。PriorityBlockingQueue既可以根據元素的自然順序來比較元素(如果它們實現了Comparable方法),也可以使用Comparator來比較。
SynchronousQueue也實現了BlockingQueue,與其他佇列不同的是,它維護一組執行緒,這些執行緒在等待著把元素加入或移除佇列。SynchronousQueue是一個沒有資料緩衝的BlockingQueue,生產者執行緒對其的插入操作put必須等待消費者的移除操作take,反過來也一樣。不能在同步佇列上進行peek,因為僅在試圖要取得元素時,該元素才存在;除非另一個執行緒試圖移除某個 元素,否則也不能(使用任何方法)新增元素;也不能迭代佇列,因為其中沒有元素可用於迭代。佇列的頭是嘗試新增到佇列中的首個已排隊執行緒元素;如果沒有已排隊執行緒,則不新增元素並且頭為null。Java6增加了兩種容器型別,Deque和BlockingDeque,它們分別對Queue和BlockingQueue進行了擴充套件。Deque是一個雙端佇列,實現了在佇列頭和佇列尾的高效插入和移除。具體實現包括ArrayDeque和LinkedBlockingDeque。阻塞佇列適用於生產者-消費者設計,而雙端佇列適用於工作密取模式。所謂的工作密取就是每個消費者都有自己的雙端佇列,如果一個消費者完成了自己雙端佇列中的全部工作,那麼它可以從其他消費者雙端佇列末尾祕密地獲取工作。工作密取非常適用於即使消費者也是生產者的問題。
四、同步工具類
同步工具類可以是任何一個物件,只要它根據其自身的狀態來協調執行緒的控制流。阻塞佇列可以作為同步工具類,其他型別的同步工具類還包括訊號量(Semaphore)、柵欄( Barrier)一集閉鎖(Latch),還可以建立自己的同步工具類。 所有的同步工具類都包含一些特定的結構化屬性:它們封裝了一些狀態,這些狀態將決定執行同步工具類的執行緒是繼續執行還是等待,此外還提供了以下方法對狀態進行操作,以及另一些方法用於高效地等待同步工具類進入到預期狀態。1、閉鎖
閉鎖是一種同步工具類,可以延遲執行緒的進度直到其達到終止狀態,閉鎖可以用來確保某些活動直到其他活動都完成後才繼續執行。(1)CountDownLatch是一種閉鎖的實現,它可以使一個或多個執行緒等待一組事件發生。閉鎖狀態包括一個計數器,該計數器被初始化為一個正數,表示需要等待的事件數量。countDown方法遞減計數器,表示有一個事件已經發生,而await方法等待計數器達到零,這表示所有需要等待的事件都已經發生。如果計數器的值非零,那麼await會一直阻塞直到計數器為0,或者等待中的執行緒中斷,或者等待超時。如下程式碼演示了CountDownLatch的用法
import java.util.concurrent.CountDownLatch;
class Task implements Runnable{
@Override
public void run() {
long count = 0;
// TODO Auto-generated method stub
for(int i = 0; i < Integer.MAX_VALUE; i++){
count += i;
}
System.out.println(count);
}
}
public class TestClass{
public void timeTasks(int nThreads,final Runnable task) throws InterruptedException{
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for(int i = 0;i < nThreads; i++){
Thread t = new Thread(){
public void run(){
try{
startGate.await();
try{
task.run();
}finally{
endGate.countDown();
}
}catch(InterruptedException ignored){}
}
};
t.start();
}
long start = System.currentTimeMillis();
startGate.countDown();
endGate.await();
long end = System.currentTimeMillis();
System.out.println("共運行了"+(end-start)+"毫秒");
}
public static void main(String[] args) throws InterruptedException{
Task task = new Task();
TestClass test = new TestClass();
test.timeTasks(10, task);
}
}
(2)FutureTask也可以用做閉鎖。FutureTask表示的計算是通過Callable來實現的,相當於一種可生成結果的Runnable。
Future.get的行為取決於任務的狀態。如果任務已經完成,那麼get會立即返回結果,否則get將阻塞直到任務進入完成狀態,然後返回結果或者丟擲異常。FutrureTask將計算結果從執行計算的執行緒傳遞到獲取這個結果的執行緒。以下程式碼是FutureTask的用法:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
class Task implements Callable<Long>{
@Override
public Long call() throws Exception {
long count = 0;
// TODO Auto-generated method stub
for(int i = 0; i < Integer.MAX_VALUE; i++){
count += i;
}
return count;
}
}
public class TestClass{
public static void main(String[] args) throws InterruptedException{
Task task = new Task();
FutureTask<Long> future = new FutureTask<Long>(task);
Thread thread = new Thread(future);
thread.start();
try {
long result = future.get();
System.out.println(result);
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
2、訊號量
計數訊號量用來控制同時訪問某個特定資源的運算元量,或者同時執行某個指定操作的數量。計數訊號量還可以用來實現某種資源池,或者對容器施加邊界。 Semaphore中管理著一組虛擬的許可(permit),許可的初始數量可通過建構函式來指定。在執行操作時可以首先獲得許可(只要還有剩餘的許可),並在使用以後釋放許可。如果沒有許可,那麼acquire將阻塞直到有許可(或者直接被中斷或者操作超時)。release方法將返回一個許可給訊號量。計算訊號量是一種簡化形式是二值訊號量,即初始值為1的Semaphore。二值訊號量可以做互斥體,並具備不可重入的加鎖語義:誰擁有這個唯一的許可,誰就擁有了互斥鎖。 Semaphore可以將任何一種容器變成有界阻塞容器,如下程式碼可以根據給定的bound來建立可以盛放bound個元素的容器:class BoundedHashSet<T>{
private final Set<T> set;
private final Semaphore sem;
public BoundedHashSet(int bound){
this.set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(bound);
}
public boolean add(T o){
boolean wasAdded = false;
try {
sem.acquire();
try{
wasAdded = set.add(o);
}
finally{
if(!wasAdded)
sem.release();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return wasAdded;
}
public boolean remove(Object o){
boolean wasRemoved = set.remove(o);
if(wasRemoved)
sem.release();
return wasRemoved;
}
public int getSize(){
return set.size();
}
}
3、柵欄
它允許一組執行緒互相等待,直到到達某個公共屏障點。利用柵欄,可以使執行緒互相等待,直到所有的執行緒都達到某一點,然後柵欄將開啟,所有執行緒將通過柵欄繼續執行。CyclicBarrier支援一個可選的Runnable引數,當執行緒通過柵欄時,runnable物件將被呼叫。建構函式CyclicBarrier(int parties,Runnable barrierAction),當執行緒在CyclicBarrier物件上呼叫await()方法時,柵欄的計數器將增加1,當計數器為parties時,柵欄將開啟。
柵欄和閉鎖的區別是:閉鎖用於所有執行緒等待一個外部事件的發生;柵欄則是所有執行緒互相等待,直到所有執行緒都到達某一點時才打開柵欄,然後執行緒可以繼續執行。
如下是和閉鎖程式對應的用柵欄實現的程式碼:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
class Task implements Runnable{
private CyclicBarrier barrier;
public Task(CyclicBarrier barrier){
this.barrier = barrier;
}
@Override
public void run() {
try {
long count = 0;
// TODO Auto-generated method stub
for(int i = 0; i < Integer.MAX_VALUE; i++){
count += i;
}
System.out.println(count);
barrier.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public class TestClass{
public static void main(String[] args) throws InterruptedException{
final long start = System.currentTimeMillis();
CyclicBarrier barrier = new CyclicBarrier(10,new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
long end = System.currentTimeMillis();
System.out.println("共運行了" + (end-start) + "毫秒");
}
});
for(int i = 0; i < 10; i++){
Thread t = new Thread(new Task(barrier));
t.start();
}
}
}
另一種形式的柵欄是Exchanger,它是一種兩方柵欄,各方在柵欄位置上交換資料。當兩方執行不對稱的操作時,Exchange會非常有用,例如當一個執行緒想緩衝區寫入資料,而另一個執行緒從緩衝區讀取資料時,這些執行緒可以使用Exchanger來匯合,並將滿的緩衝區與空的緩衝區交換。如下程式碼生產者開啟了10個執行緒生產資料,一旦達到7個數據就開始和消費者交換資料:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Exchanger;
class Worker implements Runnable{
private CyclicBarrier barrier;
List<Long> list;
public Worker(CyclicBarrier barrier,List<Long> list){
this.barrier = barrier;
this.list = list;
}
@Override
public void run() {
// TODO Auto-generated method stub
try {
Thread.sleep((long) (Math.random() * 5000));
long count = 0;
// TODO Auto-generated method stub
for(int i = 0; i < 1000; i++){
count += Math.random() * 1000;
}
list.add(count);
barrier.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
class Producer extends Thread{
List<Long> list = new ArrayList<Long>();
Exchanger<List<Long>> exchanger = null;
public Producer(Exchanger<List<Long>> exchanger){
this.exchanger = exchanger;
}
@Override
public void run() {
CyclicBarrier barrier = new CyclicBarrier(7,new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
while(true){
try {
list = exchanger.exchange(list);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
for(int i = 0; i < 10; i++){
new Thread(new Worker(barrier, list)).start();
}
}
}
class Consumer extends Thread{
List<Long> list = new ArrayList<Long>();
Exchanger<List<Long>> exchanger = null;
public Consumer(Exchanger<List<Long>> exchanger){
this.exchanger = exchanger;
}
public void run(){
try {
list = exchanger.exchange(list);
for(int i = 0; i < list.size(); i++){
System.out.println(list.get(i));
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public class TestClass{
public static void main(String[] args) throws InterruptedException{
Exchanger<List<Long>> exchanger = new Exchanger<>();
new Producer(exchanger).start();
new Consumer(exchanger).start();
}
}
相關推薦
Java併發程式設計實戰:併發基礎構建模組
一、同步容器類 同步容器類包括Vector和Hashtable以及一些功能相似的類,這些同步的封裝器類是由Collections.synchronizedXxx等工廠方法建立的。這些類實現執行緒安全的方式是:將它們的狀態封裝起來(即設為私有,使得外部無法直接訪問,只能通過公
java併發程式設計實戰:基礎構建模組筆記
同步容器類 同步容器類包括兩個部分:一個是Vector一個是hashtable。 jdk 1.2引入同步包裝類。 Collections.sychronizedxxx 在利用迭代器foreach運用interator過程中,可能會丟擲ConcurrentModificatio
Java併發程式設計實戰:閉鎖CountDownLatch,柵欄CyclicBarrier與訊號量Semaphore
整體上對三個概念進行一個說明: CountDownLatch和CyclicBarrier都能夠實現執行緒之間的等待,只不過它們側重點不同: CountDownLatch是閉鎖,相當於一扇門:在閉鎖達到結束狀態之前,這扇門一直是關閉的,並且沒有任何執行緒能夠通過,當到達結束
java併發程式設計實戰:避免活躍性危險筆記
死鎖 鎖順序死鎖 簡單的鎖順序死鎖示例: public class LeftRightDeadlock { private final Object left = new Object(); private final Object right = ne
java併發程式設計實戰:取消與關閉筆記
在Java中無法搶佔式地停止一個任務的執行,而是通過中斷機制實現了一種協作式的方式來取消任務的執行。 設定取消標誌 public class MyTask implements Runnable { private final ArrayList<BigIntege
java併發程式設計實戰:組合物件筆記
設計執行緒安全的類 設計一個執行緒安全的類要報案下面三個要素: 1. 找出構成物件的狀態的所有變數。 - 物件的所有域構成了物件的狀態。如果物件的域是基本變數構成,那麼這些域構成了物件的全部狀態。如果物件的域中引用了其他物件,那麼物件的狀態也包含其引用物件的域。如ArrayLis
java併發程式設計實戰:物件的共享筆記
可見性 volatile關鍵字: http://www.cnblogs.com/paddix/p/5428507.html volatile關鍵字的兩層語義: - 保證了不同執行緒對這個變數進行操作時的可見性,即一個執行緒修改了某個變數的值,這新值對其他執行緒來說是立即可見的
java併發程式設計實戰:執行緒安全性筆記
執行緒安全性 可以在多個執行緒中呼叫,並且線上程之間不會出現錯誤的互動。 原子性 原子性:即一個操作或者多個操作 要麼全部執行並且執行的過程不會被任何因素打斷,要麼就都不執行。 i++ 和 ++i就不是原子性。 ++i 讀取值,將值加1,將值寫入i.”讀取,修改,寫入
Java併發程式設計實戰:執行緒池的應用
一、任務與執行策略間的隱形耦合 1、執行緒飢餓死鎖 當任務都是同類、獨立的時候,執行緒池才會有最好的工作表現。如果將耗時的短期的任務混合在一起,除非執行緒池很大,否則會有“塞車”的風險;如果提交的任務要依賴其他的任務,除非池是無限的,否則有產生死鎖的風險。如下程式碼所示,對
併發實戰 之「 基礎構建模組」
委託是建立執行緒安全類的一個最有效的策略:只需讓現有的執行緒安全類管理所有的狀態即可。在本篇博文中,主要介紹一些比較有用的併發構建模組,特別是在 Java 5.0 和 Java 6.0 中引入的一些新模組,以及在使用這些模組來構造應用程式時的一些常用模式。 同步容器類 最早出現的
Java併發程式設計三:併發(Concurrent)與並行(Parallel)的區別(一)
併發(Concurrent)與並行(Parallel)是一個大家比較容易混淆的概念。大家在解釋併發與並行的時候一般這樣說: 多執行緒是併發執行的; 多核CPU是並行執行的,單核CPU是不可以不行執行的; 以上說法也是可以理解的,大家都是基於場景來描述的。
Java併發程式設計實戰 01併發程式設計的Bug源頭
摘要 編寫正確的併發程式對我來說是一件極其困難的事情,由於知識不足,只知道synchronized這個修飾符進行同步。 本文為學習極客時間:Java併發程式設計實戰 01的總結,文章取圖也是來自於該文章 併發Bug源頭 在計算機系統中,程式的執行速度為:CPU > 記憶體 > I/O裝置 ,為了平
Java併發程式設計實戰筆記3:基礎構建模組
在上文已經說明,委託是構造執行緒安全類的一個最有效策略,也就是讓現有的執行緒安全類管理所有的狀態即可。以下將介紹這些基礎構建模組。 同步容器類 同步容器類包括Vector和Hashtable以及由Collections.synchronizedXxx等工廠方法建立的同步封裝器類。這些類實現執行緒安全的方式
《JAVA併發程式設計實戰》基礎構建模組
文章目錄 同步容器類 同步容器類的問題 迭代器和ConcurrentModificationException 隱藏迭代器 併發容器 ConcurrentHashMap 額外的原子Map操作
《Java併發程式設計實戰》讀書筆記-第5章 基礎構建模組
第五章,基礎構建模組 1,同步容器類。 Vector、HashTable此類的容器是同步容器。但也有一些問題,例如,一個執行緒在使用Vector的size()方法進行迴圈每一個元素的時候,而另一個執行緒對Vector的元素進行刪除時,可能會發生ArrayIn
《Java 併發程式設計實戰》讀書筆記之一:可重入內建鎖
每個Java物件都可以用做一個實現同步的鎖,這些鎖被稱為內建鎖或監視器鎖。執行緒在進入同步程式碼塊之前會自動獲取鎖,並且在退出同步程式碼塊時會自動釋放鎖。獲得內建鎖的唯一途徑就是進入由這個鎖保護的同步程式碼塊或方法。 當某個執行緒請求一個由其他執行緒持有的鎖時,發出請求的
【Java併發程式設計實戰】—– AQS(四):CLH同步佇列
在【Java併發程式設計實戰】—–“J.U.C”:CLH佇列鎖提過,AQS裡面的CLH佇列是CLH同步鎖的一種變形。其主要從兩方面進行了改造:節點的結構與節點等待機制。在結構上引入了頭結點和尾節點,他們
Java併發程式設計規則:構建執行緒安全的共享物件
構建執行緒安全的共享物件,使其在多執行緒環境下能夠提供安全的訪問。編寫正確的併發程式關鍵在於控制共享、可變的狀態進行訪問管理。synchornized關鍵字既可以阻塞程式,也可以維護操作的原子性,它是一個執行緒安全與非執行緒安全的臨界區標識,通過它我們可以控制物件的記憶體可
[Java併發程式設計實戰] 基礎知識
什麼是執行緒安全性? 定義執行緒安全性:當多個執行緒訪問某個類時,這個類始終都能表現正確的行為,那麼就稱這個類是執行緒安全的。 單執行緒:所見及所知(we know it when we see
《java併發程式設計實戰》:執行緒同步輔助類之訊號量(semaphore)
1.訊號量的概念: 訊號量是一種計數器,用來保護一個或者多個共享資源的訪問,它是併發程式設計的一種基礎工具,大多數程式語言都提供了這個機制。 2、訊號量控制執行緒訪問流程: 如果執行緒要訪問一個共享資源,它必須先獲得訊號量。如果訊號量的內部計數器大於0,訊號量將減1,然後