Java多執行緒併發程式設計一覽筆錄
知識體系圖:
1、執行緒是什麼?
執行緒是程序中獨立執行的子任務。
2、建立執行緒的方式
方式一:將類宣告為 Thread 的子類。該子類應重寫 Thread 類的 run 方法
方式二:宣告實現 Runnable 介面的類。該類然後實現 run 方法
推薦方式二,因為介面方式比繼承方式更靈活,也減少程式間的耦合。
3、獲取當前執行緒資訊?
Thread.currentThread()
4、執行緒的分類
執行緒分為守護執行緒、使用者執行緒。執行緒初始化預設為使用者執行緒。
setDaemon(true) 將該執行緒標記為守護執行緒或使用者執行緒。
特性:設定守護執行緒,會作為程序的守護者,如果程序內沒有其他非守護執行緒,那麼守護執行緒也會被銷燬,即使可能執行緒內沒有執行結束。
5、執行緒間的關係?
某執行緒a 中啟動另外一個執行緒 t,那麼我們稱 執行緒 t是 執行緒a 的一個子執行緒,而 執行緒a 是 執行緒t 的 父執行緒。
最典型的就是我們在main方法中 啟動 一個 執行緒去執行。其中main方法隱含的main執行緒為父執行緒。
6、執行緒API一覽:如何啟動、停止、暫停、恢復執行緒?
(1)start() 使執行緒處於就緒狀態,Java虛擬機器會呼叫該執行緒的run方法;
(2)stop() 停止執行緒,已過時,存在不安全性:
一是可能請理性的工作得不得完成;
二是可能對鎖定的物件進行“解鎖”,導致資料不同步不一致的情況。
推薦 使用 interrupt() +拋異常 中斷執行緒。
(3)suspend() 暫停執行緒,已過時。
resume() 恢復執行緒,已過時。
suspend 與resume 不建議使用,存在缺陷:
一是可能獨佔同步物件;
二是導致資料不一致。
(4)yield() 放棄當前執行緒的CPU資源。放棄時間不確認,也有可能剛剛放棄又獲得CPU資源。
(5)t.join() 等待該執行緒t 銷燬終止。
7、synchronized關鍵字用法
一 原子性(互斥性):實現多執行緒的同步機制,使得鎖內程式碼的執行必需先獲得對應的鎖,執行完後自動釋放對應的鎖。
二 記憶體可見性:在同一鎖情況下,synchronized鎖內程式碼保證變數的可見性。
三 可重入性:當一個執行緒獲取一個物件的鎖,再次請求該物件的鎖時是可以再次獲取該物件的鎖的。
如果在synchronized鎖內發生異常,鎖會被釋放。
總結:
(1)synchronized方法 與 synchronized(this) 程式碼塊 鎖定的都是當前物件,不同的只是同步程式碼的範圍
(2)synchronized (非this物件x) 將物件x本身作為“物件監視器”:
a、多個執行緒同時執行 synchronized(x) 程式碼塊,呈現同步效果。
b、當其他執行緒同時執行物件x裡面的 synchronized方法時,呈現同步效果。
c、當其他執行緒同時執行物件x裡面的 synchronized(this)方法時,呈現同步效果。
(3)靜態synchronized方法 與 synchronized(calss)程式碼塊 鎖定的都是Class鎖。Class 鎖與 物件鎖 不是同一個鎖,兩者同時使用情況可能呈非同步效果。
(4)儘量不使用 synchronized(string),是因為string的實際鎖為string的常量池物件,多個值相同的string物件可能持有同一個鎖。
8、volatile關鍵字用法
一 記憶體可見性:保證變數的可見性,執行緒在每次使用變數的時候,都會讀取變數修改後的最的值。
二 不保證原子性。
9、執行緒間的通訊方式
執行緒間通訊的方式主要為共享記憶體、執行緒同步。
執行緒同步除了synchronized互斥同步外,也可以使用wait/notify實現等待、通知的機制。
(1)wait/notify屬於Object類的方法,但wait和notify方法呼叫,必須獲取物件的物件級別鎖,即synchronized同步方法或同步塊中使用。
(2)wait()方法:在其他執行緒呼叫此物件的 notify() 方法或 notifyAll() 方法前,或者其他某個執行緒中斷當前執行緒,導致當前執行緒一直阻塞等待。等同wait(0)方法。
wait(long timeout) 在其他執行緒呼叫此物件的 notify() 方法或 notifyAll() 方法,或者其他某個執行緒中斷當前執行緒,或者已超過某個實際時間量前,導致當前執行緒等待。 單位為毫秒。
void wait(long timeout, int nanos) 與 wait(long timeout) 不同的是增加了額外的納秒級別,更精細的等待時間控制。
(3)notfiy方法:喚醒在此物件監視器上等待的單個執行緒。選擇是任意性的,並在對實現做出決定時發生。執行緒通過呼叫其中一個 wait 方法,在物件的監視器上等待。
(4)notifyAll方法:喚醒在此物件監視器上等待的所有執行緒。
需要:wait被執行後,會自動釋放鎖,而notify被執行後,鎖沒有立刻釋放,由synchronized同步塊結束時釋放。
應用場景:簡單的生產、消費問題。
synchronized (lock) {//獲取到物件鎖lock
try {
lock.wait();//等待通訊訊號, 釋放物件鎖lock
} catch (InterruptedException e) {
e.printStackTrace();
}
//接到通訊訊號
}
synchronized (lock) {//獲取到物件鎖lock
lock.notify();//通知並喚醒某個正等待的執行緒
//其他操作
}
//釋放物件鎖lock
10、ThreadLocal與InheritableThreadLocal
讓每個執行緒都有自己獨立的共享變數,有兩種方式:
一 該例項變數封存線上程類內部;如果該例項變數(非static)是引用型別,存在可能逸出的情況。
二 就是使用ThreadLocal在任意地方構建變數,即使是靜態的(static)。具有很好的隔離性。
(1)重寫initialValue()方法: 初始化ThreadLocal變數,解決get()返回null問題(
(2)InheritableThreadLocal 子執行緒可以讀取父執行緒的值,但反之不行
11、ReentrantLock的使用
一個簡單的示例:
private java.util.concurrent.locks.Lock lock = new ReentrantLock();
public void method() {
try {
lock.lock();
//獲取到鎖lock,同步塊
} finally {
lock.unlock();//釋放鎖lock
}
}
ReentrantLock 比 synchronized 功能更強大,主要體現:
(1)ReentrantLock 具有公平策略的選擇。
(2)ReentrantLock 可以在獲取鎖的時候,可有條件性地獲取,可以設定等待時間,很有效地避免死鎖。
如 tryLock() 和 tryLock(long timeout, TimeUnit unit)
(3)ReentrantLock 可以獲取鎖的各種資訊,用於監控鎖的各種狀態。
(4)ReentrantLock 可以靈活實現多路通知,即Condition的運用。
--------------------------------------------------------------------------------------
一、公平鎖與非公平鎖
ReentrantLock 預設是非公平鎖,允許執行緒“搶佔插隊”獲取鎖。公平鎖則是執行緒依照請求的順序獲取鎖,近似FIFO的策略方式。
二、鎖的使用:
(1)lock() 阻塞式地獲取鎖,只有在獲取到鎖後才處理interrupt資訊
(2)lockInterruptibly() 阻塞式地獲取鎖,立即處理interrupt資訊,並丟擲異常
(3)tryLock() 嘗試獲取鎖,不管成功失敗,都立即返回true、false,注意的是即使已將此鎖設定為使用公平排序策略,tryLock()仍然可以開啟公平性去插隊搶佔。如果希望遵守此鎖的公平設定,則使用 tryLock(0, TimeUnit.SECONDS),它幾乎是等效的(也檢測中斷)。
(4)tryLock(long timeout, TimeUnit unit)在timeout時間內阻塞式地獲取鎖,成功返回true,超時返回false,同時立即處理interrupt資訊,並丟擲異常。
如果想使用一個允許闖入公平鎖的定時 tryLock,那麼可以將定時形式和不定時形式組合在一起:
if (lock.tryLock() || lock.tryLock(timeout, unit) ) { ... }
private java.util.concurrent.locks.ReentrantLock lock = new ReentrantLock();
public void testMethod() {
try {
if (lock.tryLock(1, TimeUnit.SECONDS)) {
//獲取到鎖lock,同步塊
} else {
//沒有獲取到鎖lock
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (lock.isHeldByCurrentThread())//如果當前執行緒持有鎖lock,則釋放鎖lock
lock.unlock();
}
}
}
三、條件Condition的使用
條件Condition可以由鎖lock來建立,實現多路通知的機制。
具有await、signal、signalAll的方法,與wait/notify類似,需要在獲取鎖後方能呼叫。
private final java.util.concurrent.locks.Lock lock = new ReentrantLock();
private final java.util.concurrent.locks.Condition condition = lock.newCondition();
public void await() {
try {
lock.lock();
//獲取到鎖lock
condition.await();//等待condition通訊訊號,釋放condition鎖
//接到condition通訊
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();//釋放物件鎖lock
}
}
12、ReentrantReadWriteLock的使用
ReentrantReadWriteLock是對ReentrantLock 更進一步的擴充套件,實現了讀鎖readLock()(共享鎖)和寫鎖writeLock()(獨佔鎖),實現讀寫分離。讀和讀之間不會互斥,讀和寫、寫和讀、寫和寫之間才會互斥,提升了讀寫的效能。
讀鎖示例:
private final java.util.concurrent.locks.ReadWriteLock lock = new ReentrantReadWriteLock();
public void method() {
try {
lock.readLock().lock();
//獲取到讀鎖readLock,同步塊
} finally {
lock.readLock().unlock();//釋放讀鎖readLock
}
}
寫鎖示例:
private final java.util.concurrent.locks.ReadWriteLock lock = new ReentrantReadWriteLock();
public void method() {
try {
lock.writeLock().lock();
//獲取到寫鎖writeLock,同步塊
} finally {
lock.writeLock().unlock();//釋放寫鎖writeLock
}
}
13、同步容器與非同步容器概覽
(1)同步容器
包括兩部分:
一個是早期JDK的Vector、Hashtable;
一個是它們的同系容器,JDK1.2加入的同步包裝類,使用Collections.synchronizedXxx工廠方法建立。
Map<String, Integer> hashmapSync = Collections.synchronizedMap(new HashMap<String, Integer>());
同步容器都是執行緒安全的,一次只有一個執行緒訪問容器的狀態。
但在某些場景下可能需要加鎖來保護複合操作。
複合類操作如:新增、刪除、迭代、跳轉以及條件運算。
這些複合操作在多執行緒併發的修改容器時,可能會表現出意外的行為,
最經典的便是ConcurrentModificationException,
原因是當容器迭代的過程中,被併發的修改了內容,這是由於早期迭代器設計的時候並沒有考慮併發修改的問題。
其底層的機制無非就是用傳統的synchronized關鍵字對每個公用的方法都進行同步,使得每次只能有一個執行緒訪問容器的狀態。這很明顯不滿足我們今天網際網路時代高併發的需求,在保證執行緒安全的同時,也必須有足夠好的效能。
(2)併發容器
與Collections.synchronizedXxx()同步容器等相比,util.concurrent中引入的併發容器主要解決了兩個問題:
1)根據具體場景進行設計,儘量避免synchronized,提供併發性。
2)定義了一些併發安全的複合操作,並且保證併發環境下的迭代操作不會出錯。
util.concurrent中容器在迭代時,可以不封裝在synchronized中,可以保證不拋異常,但是未必每次看到的都是"最新的、當前的"資料。
Map<String, Integer> concurrentHashMap = new ConcurrentHashMap<String, Integer>()
ConcurrentHashMap 替代同步的Map即(Collections.synchronized(new HashMap()))。眾所周知,HashMap是根據雜湊值分段儲存的,同步Map在同步的時候會鎖住整個Map,而ConcurrentHashMap在設計儲存的時候引入了段落Segment定義,同步的時候只需要鎖住根據雜湊值鎖住了雜湊值所在的段落即可,大幅度提升了效能。ConcurrentHashMap也增加了對常用複合操作的支援,比如"若沒有則新增":putIfAbsent(),替換:replace()。這2個操作都是原子操作。注意的是ConcurrentHashMap 弱化了size()和isEmpty()方法,併發情況儘量少用,避免導致可能的加鎖(當然也可能不加鎖獲得值,如果map數量沒有變化的話)。
CopyOnWriteArrayList和CopyOnWriteArraySet分別代替List和Set,主要是在遍歷操作為主的情況下來代替同步的List和同步的Set,這也就是上面所述的思路:迭代過程要保證不出錯,除了加鎖,另外一種方法就是"克隆"容器物件。---缺點也明顯,佔有記憶體,且資料最終一致,但資料實時不一定一致,一般用於讀多寫少的併發場景。
ConcurrentSkipListMap可以在高效併發中替代SoredMap(例如用Collections.synchronzedMap包裝的TreeMap)。
ConcurrentSkipListSet可以在高效併發中替代SoredSet(例如用Collections.synchronzedSet包裝的TreeMap)。
ConcurrentLinkedQuerue是一個先進先出的佇列。它是非阻塞佇列。注意儘量用isEmpty,而不是size();
14、CountDownLatch閉鎖的使用
CountDownLatch是一個同步輔助類。
通常運用場景:
(1)作為啟動訊號:將計數 1 初始化的 CountDownLatch 用作一個簡單的開/關鎖存器,或入口。
通俗描述:田徑賽跑運動員等待(每位運動員為一個執行緒,都在await())的"發令槍",當發令槍countDown(),喊0的時候,所有運動員跳過await()起跑線併發跑起來了。
(2)作為結束訊號:在通過呼叫 countDown() 的執行緒開啟入口前,所有呼叫 await 的執行緒都一直在入口處等待。用 N 初始化的 CountDownLatch 可以使一個執行緒在 N 個執行緒完成某項操作之前一直等待,或者使其在某項操作完成 N 次之前一直等待。
通俗描述:某裁判,在終點等待所有運動員都跑完,每個運動員跑完就計數一次(countDown())當為0時,就可以往下繼續統計第一人到最後一個撞線的時間。
public long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
/**
*一個啟動訊號,在 driver 為繼續執行 worker 做好準備之前,它會阻止所有的 worker 繼續執行。
*/
final CountDownLatch startSignal = new CountDownLatch(1);
/**
* 一個完成訊號,它允許 driver 在完成所有 worker 之前一直等待。
*/
final CountDownLatch doneSignal = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
public void run() {
try {
startSignal.await();/** 阻塞於此,一直到startSignal計數為0,再往下執行 */
try {
task.run();
} finally {
doneSignal.countDown();/** doneSignal 計數減一,直到最後一個執行緒結束 */
}
} catch (InterruptedException ignored) {
}
}
};
t.start();
}
long start = System.currentTimeMillis();
startSignal.countDown();/** doneSignal 計數減一,為0,所有task開始併發執行run */
doneSignal.await();/** 阻塞於此,一直到doneSignal計數為0,再往下執行 */
long end = System.currentTimeMillis();
return end - start;
}
public static void main(String[] args) throws InterruptedException {
final Runnable task = new Runnable() {
@Override
public void run() {
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " end");
}
};
long time = new CountDownLatchTest().timeTasks(10, task);
System.out.println("耗時:" + time + "ms");
}
更多的api:
boolean await(long timeout, TimeUnit unit) 使當前執行緒在鎖存器倒計數至零之前一直等待,除非執行緒被中斷或超出了指定的等待時間。
15、CyclicBarrier關卡的使用
CyclicBarrier是一個同步輔助類。
CyclicBarrier讓一個執行緒達到屏障時被阻塞,直到最後一個執行緒達到屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續執行
CyclicBarrier(int parties, Runnable barrierAction)建構函式,用於在所有執行緒都到達屏障後優先執行barrierAction的run()方法
使用場景:
可以用於多執行緒計算以後,最後使用合併計算結果的場景;
通俗描述:某裁判,在終點(await()阻塞處)等待所有運動員都跑完,所有人都跑完就可以做吃炸雞啤酒(barrierAction),但是隻要一個人沒跑完就都不能吃炸雞啤酒,當然也沒規定他們同時跑(當然也可以,一起使用CountDownLatch)。
--------------------------------------------------------------------------------------
CyclicBarrier與CountDownLatch的區別:
CountDownLatch強調的是一個執行緒等待多個執行緒完成某件事,只能用一次,無法重置;
CyclicBarrier強調的是多個執行緒互相等待完成,才去做某個事情,可以重置。
public static class WorkerThread implements Runnable {
private final CyclicBarrier cyclicBarrier;
public WorkerThread(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " pre-working");
/**
* 執行緒在這裡等待,直到所有執行緒都到達barrier。
*/
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + " working");
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
int THREAD_NUM = 5;
final CyclicBarrier cyclicBarrier = new CyclicBarrier(THREAD_NUM, new Runnable() {
/**
* 當所有執行緒到達barrier時執行
*/
@Override
public void run() {
System.out.println("--------------Inside Barrier--------------");
}
});
for (int i = 0; i < THREAD_NUM; i++) {
new Thread(new WorkerThread(cyclicBarrier)).start();
}
}
更多api:
int await(long timeout, TimeUnit unit) 在所有參與者都已經在此屏障上呼叫 await 方法之前將一直等待,或者超出了指定的等待時間。
16、Semaphore訊號量的使用
Semaphore訊號量是一個計數訊號量。
可以認為,Semaphore維護一個許可集。如有必要,在許可可用前會阻塞每一個 acquire(),然後再獲取該許可。每個 release() 新增一個許可,從而可能釋放一個正在阻塞的獲取者。
通俗描述:某個車庫只有N個車位,車主們要泊車,請向車庫保安處阻塞 acquire()等待獲取許可證,當獲得許可證,車主們才可以去泊車。當某個車主離開車位的時候,交還許可證release() ,從而其他阻塞等待的車主有機會獲得許可證。
另外:
Semaphore 預設是非公平策略,允許執行緒“搶佔插隊”獲取許可證。公平策略則是執行緒依照請求的順序獲取許可證,近似FIFO的策略方式。
17、Executors框架(執行緒池)的使用
(1)執行緒池是什麼?
執行緒池是一種多執行緒的處理方式,利用已有執行緒物件繼續服務新的任務(按照一定的執行策略),而不是頻繁地建立銷燬執行緒物件,由此提供服務的吞吐能力,減少CPU的閒置時間。具體組成部分包括:
a、執行緒池管理器(ThreadPool)用於建立和管理執行緒池,包括建立執行緒池、銷燬執行緒池,新增新任務。
b、工作執行緒(Worker)執行緒池中的執行緒,閒置的時候處於等待狀態,可以迴圈回收利用。
c、任務介面(Task)每個任務必須實現的介面類,為工作執行緒提供呼叫,主要規定了任務的入口、任務完成的收尾工作、任務的狀態。
d、等待佇列(Queue)存放等待處理的任務,提供緩衝機制。
(2)Executors框架常見的執行策略
Executors框架提供了一些便利的執行策略。
java.util.concurrent.ExecutorService service = java.util.concurrent.Executors.newFixedThreadPool(100);
- newSingleThreadExecutor:建立一個單執行緒的執行緒池。 這個執行緒池只有一個執行緒在工作,也就是相當於單執行緒序列執行所有任務。如果這個唯一的執行緒因為異常結束,那麼會有一個新的執行緒來替代它。此執行緒池保證所有任務的執行順序按照任務的提交順序執行。 - newFixedThreadPool:建立固定大小的執行緒池。 每次提交一個任務就建立一個執行緒,直到執行緒達到執行緒池的最大大小。執行緒池的大小一旦達到最大值就會保持不變,如果某個執行緒因為執行異常而結束,那麼執行緒池會補充一個新執行緒。 - newCachedThreadPool:建立一個可快取的執行緒池。 如果執行緒池的大小超過了處理任務所需要的執行緒,那麼就會回收部分空閒(60秒不執行任務)的執行緒,當任務數增加時,此執行緒池又可以智慧的新增新執行緒來處理任務。此執行緒池不會對執行緒池大小做限制,執行緒池大小完全依賴於作業系統(或者說JVM)能夠建立的最大執行緒大小。 - newScheduledThreadPool:建立一個大小無限的執行緒池。 此執行緒池支援定時以及週期性執行任務的需求。 - newSingleThreadScheduledExecutor:建立一個單執行緒的執行緒池。 此執行緒池支援定時以及週期性執行任務的需求。
(3)ExecutorService執行緒池管理
ExecutorService的生命週期有3個狀態:執行、關閉(shutting down)、停止。
提交任務submit(xxx)擴充套件了基本方法 Executor.execute(java.lang.Runnable)。
<T> Future<T> submit(Callable<T> task) 提交一個返回值的任務用於執行,返回一個表示任務的未決結果的 Future。
Future<?> submit(Runnable task) 提交一個 Runnable 任務用於執行,並返回一個表示該任務的 Future。
<T> Future<T> submit(Runnable task, T result) 提交一個 Runnable 任務用於執行,並返回一個表示該任務的 Future。
shutdown() 啟動一次順序關閉,執行以前提交的任務,但不接受新任務。
List<Runnable> shutdownNow() 試圖停止所有正在執行的活動任務,暫停處理正在等待的任務,並返回等待執行的任務列表。
一個簡單的示例:
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("哈哈");
}
});
}
/**
* 如果不再需要新任務,請適當關閉executorService並拒絕新任務
*/
executorService.shutdown();
}
(3)ThreadPoolExecutor機制
ThreadPoolExecutor為Executors的執行緒池內部實現類。
建構函式詳解:
引數名 |
作用 |
---|---|
corePoolSize |
核心執行緒池大小 |
maximumPoolSize |
最大執行緒池大小 |
keepAliveTime |
執行緒池中超過corePoolSize數目的空閒執行緒最大存活時間; 可以allowCoreThreadTimeOut(true)使得核心執行緒有效時間 |
TimeUnit |
keepAliveTime時間單位 |
workQueue |
阻塞任務佇列 |
threadFactory |
新建執行緒工廠 |
RejectedExecutionHandler |
當提交任務數超過maxmumPoolSize+workQueue之和時, 任務會交給RejectedExecutionHandler來處理 |
ThreadPoolExecutor執行緒池管理機制:
1.當執行緒池小於corePoolSize時,新提交任務將建立一個新執行緒執行任務,即使此時執行緒池中存在空閒執行緒。
2.當執行緒池達到corePoolSize時,新提交任務將被放入workQueue中,等待執行緒池中任務排程執行
3.當workQueue已滿,且maximumPoolSize>corePoolSize時,新提交任務會建立新執行緒執行任務
4.當提交任務數超過maximumPoolSize時,新提交任務由RejectedExecutionHandler處理
5.當執行緒池中超過corePoolSize執行緒,空閒時間達到keepAliveTime時,關閉空閒執行緒
6.當設定allowCoreThreadTimeOut(true)時,執行緒池中corePoolSize執行緒空閒時間達到keepAliveTime也將關閉
一個簡單的示例:
public static void main(String[] args) {
java.util.concurrent.ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(10, //corePoolSize 核心執行緒數
100, //maximumPoolSize 最大執行緒數
30, //keepAliveTime 執行緒池中超過corePoolSize數目的空閒執行緒最大存活時間;
// TimeUnit keepAliveTime時間單位
TimeUnit.SECONDS,
//workQueue 阻塞任務佇列
new LinkedBlockingQueue<Runnable>(1000),
//threadFactory 新建執行緒的工廠
Executors.defaultThreadFactory(),
//RejectedExecutionHandler當提交任務數超過maxmumPoolSize+workQueue之和時,
// 任務會交給RejectedExecutionHandler來處理
new ThreadPoolExecutor.AbortPolicy()
);
for (int i = 0; i < 100; i++) {
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
System.out.println("哈哈");
}
});
}
/**
* 如果不再需要新任務,請適當關閉threadPoolExecutor並拒絕新任務
*/
threadPoolExecutor.shutdown();
}
18、可攜帶結果的任務Callable 和 Future / FutureTask
(1)為解決Runnable介面不能返回一個值或受檢查的異常,可以採用Callable介面實現一個任務。
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
(2)Future表示非同步計算的結果,可以對於具體的Runnable或者Callable任務進行查詢是否完成,查詢是否取消,獲取執行結果,取消任務等操作。
V get() throws InterruptedException, ExecutionException 如有必要,等待計算完成,然後獲取其結果。
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException 如有必要,最多等待為使計算完成所給定的時間之後,獲取其結果(如果結果可用)。
(3)FutureTask
FutureTask則是一個RunnableFuture<V>,而RunnableFuture實現了Runnbale又實現了Futrue<V>這兩個介面。
簡單示例一:
public static void main(String[] args) throws InterruptedException {
FutureTask<Integer> future = new FutureTask<Integer>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
// 返回一個值或受檢查的異常
//throw new Exception();
return new Random().nextInt(100);
}
});
new Thread(future).start();;
/**
* 模擬其他業務邏輯
*/
Thread.sleep(1000);
//Integer result = future.get(0, TimeUnit.SECONDS);
Integer result = null;
try {
result = future.get();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("result========" + result);
}
簡單示例二,採用Executors:
public static void main(String[] args) throws InterruptedException {
java.util.concurrent.ExecutorService threadPoolExecutor =
java.util.concurrent.Executors.newCachedThreadPool();
Future<Integer> future = threadPoolExecutor.submit(
new Callable<Integer>() {
@Override
public Integer call() throws Exception {
// 返回一個值或受檢查的異常
//throw new Exception();
return new Random().nextInt(100);
}
});
/**
* 如果不再需要新任務,請適當關閉threadPoolExecutor並拒絕新任務
*/
threadPoolExecutor.shutdown();
/**
* 模擬其他業務邏輯
*/
Thread.sleep(1000);
//Integer result = future.get(0, TimeUnit.SECONDS);
Integer result = null;
try {
result = future.get();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("result========" + result);
}
簡單示例三,採用Executors+CompletionService:
static class MyCallable implements Callable<Integer> {
private final int i;
public MyCallable(int i) {
super();
this.i = i;
}
@Override
public Integer call() throws Exception {
// 返回一個值或受檢查的異常
//throw new Exception();
return new Integer(i);
}
}
public static void main(String[] args) throws InterruptedException {
java.util.concurrent.ExecutorService threadPoolExecutor =
java.util.concurrent.Executors.newCachedThreadPool();
java.util.concurrent.CompletionService<Integer> completionService =
new java.util.concurrent.ExecutorCompletionService<Integer>(threadPoolExecutor);
final int threadNum = 10;
for (int i = 0; i < threadNum; i++) {
completionService.submit(new MyCallable(i + 1));
}
/**
* 如果不再需要新任務,請適當關閉threadPoolExecutor並拒絕新任務
*/
threadPoolExecutor.shutdown();
/**
* 模擬其他業務邏輯
*/
Thread.sleep(2000);
for (int i = 0; i < threadNum; i++) {
try {
System.out.println("result========" + completionService.take().get());
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
注意的是提交到CompletionService中的Future是按照完成的順序排列的,而不是按照新增的順序排列的。
19、Atomic系列-原子變數類
其基本的特性就是在多執行緒環境下,當有多個執行緒同時執行這些類的例項包含的方法時,具有排他性,即當某個執行緒進入方法,執行其中的指令時,不會被其他執行緒打斷,而別的執行緒就像自旋鎖一樣,一直等到該方法執行完成,才由JVM從等待佇列中選擇一個另一個執行緒進入,這只是一種邏輯上的理解。實際上是藉助硬體的相關指令來實現的,不會阻塞執行緒(或者說只是在硬體級別上阻塞了)。其中的類可以分成4組
基本類:AtomicInteger、AtomicLong、AtomicBoolean;
引用型別:AtomicReference、AtomicStampedRerence、AtomicMarkableReference;--AtomicStampedReference 或者 AtomicMarkableReference 解決執行緒併發中,導致的ABA問題
陣列型別:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray ---陣列長度固定不可變,但保證陣列上每個元素的操作絕對安全的
屬性原子修改器(Updater):AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater
Updater使用限制:
限制1:操作的目標不能是static型別,前面說到unsafe的已經可以猜測到它提取的是非static型別的屬性偏移量,如果是static型別在獲取時如果沒有使用對應的方法是會報錯的,而這個Updater並沒有使用對應的方法。
限制2:操作的目標不能是final型別的,因為final根本沒法修改。
限制3:必須是volatile型別的資料,也就是資料本身是讀一致的。
限制4:屬性必須對當前的Updater所在的區域是可見的,也就是private如果不是當前類肯定是不可見的,protected如果不存在父子關係也是不可見的,default如果不是在同一個package下也是不可見的。
簡單示例:
static class A {
volatile int intValue = 100;
}
private AtomicIntegerFieldUpdater<A> atomicIntegerFieldUpdater
= AtomicIntegerFieldUpdater.newUpdater(A.class, "intValue");
20、總結
什麼叫執行緒安全?
執行緒安全就是每次執行結果和單執行緒執行的結果是一樣的,而且其他的變數的值也和預期的是一樣的。
執行緒安全就是說多執行緒訪問同一程式碼,不會產生不確定的結果。
執行緒安全問題多是由全域性變數和靜態變數引起的,當多個執行緒對共享資料只執行讀操作,不執行寫操作時,一般是執行緒安全的;當多個執行緒都執行寫操作時,需要考慮執行緒同步來解決執行緒安全問題。
什麼叫執行緒同步?
多個執行緒操作一個資源的情況下,導致資源資料前後不一致。這樣就需要協調執行緒的排程,即執行緒同步。 解決多個執行緒使用共通資源的方法是:執行緒操作資源時獨佔資源,其他執行緒不能訪問資源。使用鎖可以保證在某一程式碼段上只有一條執行緒訪問共用資源。
有兩種方式實現執行緒同步:
1、synchronized
2、同步鎖(Lock)
什麼叫執行緒通訊?
有時候執行緒之間需要協作和通訊。
有兩種方式實現執行緒通訊:
1、synchronized 實現記憶體可見性,滿足執行緒共享變數
2、wait/notifynotifyAll(synchronized同步方法或同步塊中使用) 實現記憶體可見性,及生產消費模式的相互喚醒機制
3、同步鎖(Lock)的Condition(awaitsignalsignalAll)
4、管道,實現資料的共享,滿足讀寫模式
更多Demo:https://git.oschina.net/svenaugustus/MyJavaMultithreadingLab