Java多執行緒與併發(三)
- Condition等待和喚醒
在我們的並行程式中,避免不了某些寫成要預先規定好的順序執行,例如:先新增後修改,先買後賣,先進後出,對於這些場景,使用JUC的Conditon物件再合適不過了。
JUC中提供了Condition物件,用於讓指定執行緒等待與喚醒,按預期順序執行。它必須和ReentrantLock重入鎖配合使用
Condition用於代替wait()/notify()方法
wait和notify是屬於Object的,可以對執行緒喚醒(notify只能隨機喚醒等待的執行緒,而Condition可以喚醒指定執行緒,這有利於更好的控制併發程式)
Condition核心方法:
await():阻塞當前執行緒,直到signal喚醒
signal():喚醒被await的執行緒,從中斷處繼續執行
signalAll():喚醒所有被await阻塞的執行緒(不太常用)
通過使用ReentrantLock和Condition的使用讓執行緒有順序的執行(有規劃的、等待、喚醒的過程)
程式碼案例:
public class ConditionSample {
public static void main(String[] args) {
final ReentrantLock lock = new ReentrantLock();//Condition必須配合ReentrantLock來使用
final Condition c1 = lock.newCondition();//建立Condition
final Condition c2 = lock .newCondition();
final Condition c3 = lock.newCondition();
new Thread(new Runnable() {
public void run() {
lock.lock();//加鎖
try {
c1.await();//阻塞當前執行緒,只有呼叫c1.signal的時候,執行緒繼續啟用執行
Thread.sleep(1000);
System.out .println("粒粒皆幸苦");
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
lock.unlock();//釋放
}
}
}).start();
new Thread(new Runnable() {
public void run() {
lock.lock();//加鎖
try {
c2.await();//阻塞當前執行緒,只有呼叫c1.signal的時候,執行緒繼續啟用執行
Thread.sleep(1000);
System.out.println("誰知盤中餐");
c1.signal();//執行緒t1喚醒繼續執行
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
lock.unlock();//釋放
}
}
}).start();
new Thread(new Runnable() {
public void run() {
lock.lock();//加鎖
try {
c3.await();//阻塞當前執行緒,只有呼叫c1.signal的時候,執行緒繼續啟用執行
Thread.sleep(1000);
System.out.println("汗滴禾下土");
c2.signal();//執行緒t2喚醒繼續執行
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
lock.unlock();//釋放
}
}
}).start();
new Thread(new Runnable() {
public void run() {
lock.lock();//加鎖
try {
Thread.sleep(1000);
System.out.println("鋤禾日當午");
c3.signal();//t3執行緒繼續執行
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
lock.unlock();//釋放
}
}
}).start();
}
}
- JUC之Callable和Future
Callable是一個介面,它和Runnable一樣代表著任務,區別在於Callable有返回值並且可以丟擲異常
Future也是一個藉口,它用於非同步計算的結果,提供了檢查計算是否完成的方法,以等待計算的完成,並獲取計算的結果,接受Callable返回的方法
案例:輸出1000以內的質數
public class FutureCallableSample {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(10);
//建立執行緒池
for(int i = 2;i<=10000 ;i++){
Computer c = new Computer();
c.setNum(i);
//Future是對用於計算的執行緒的監聽,因為計算是在其它執行緒中進行的,所以這個返回結果是非同步的
Future<Boolean> fu = threadPool.submit(c);//將c物件提交給執行緒池,如果有空閒執行緒立即執行call方法
try {
Boolean result = fu.get();//用於獲取返回值,如果執行緒內部的call沒有計算完畢,則進入等待狀態,直到計算完成
if(result){
System.out.println(c.getNum());
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
threadPool.shutdown();
}
}
- JUC之併發容器
如何保證既能執行緒安全,又可以有一定效率
執行緒安全—-併發容器
ArrayList —-CopyOnWriteArrayList —寫複製列表
HashSet —–CopyOnWriteArraySet —寫複製集合
HashMap —-ConcurrentHashMap —分段鎖對映
public class CopyOnWriteArrayListSample {
public static void main(String[] args) {
List list = new ArrayList();
for(int i =0; i<1000;i++){
list.add(i);
}
Iterator it = list.iterator();
while(it.hasNext()){
Integer i = (Integer)it.next();
list.remove(i);
}
System.out.println(list);
}
}
程式碼會報錯:
Exception in thread "main" java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(Unknown Source)
at java.util.ArrayList$Itr.next(Unknown Source)
at com.wanghaoxin.threadpool.CopyOnWriteArrayListSample.main(CopyOnWriteArrayListSample.java:15)
對於ArrayList預設採用連續儲存,但是會有併發問題,邊讀取邊刪除產生異常
CopyOnWriteArrayList採用即可解決這個問題:
public class CopyOnWriteArrayListSample {
public static void main(String[] args) {
/* List list = new ArrayList();
for(int i =0; i<1000;i++){
list.add(i);
}
Iterator it = list.iterator();
while(it.hasNext()){
Integer i = (Integer)it.next();
list.remove(i);
}
System.out.println(list);*/
//寫複製列表
List list = new CopyOnWriteArrayList();
for(int i =0; i<1000;i++){
list.add(i);
}
Iterator it = list.iterator();
while(it.hasNext()){
Integer i = (Integer)it.next();
list.remove(i);
}
System.out.println(list);
}
}
寫複製列表:CopyOnWriteArrayList併發原理:
它通過副本解決併發問題
多個執行緒各執一份副本,採用副本的方式,再複製的過程中,不同執行緒是同步的
檢視原始碼得知:採用可重入鎖
public void add(int index, E element) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
if (index > len || index < 0)
throw new IndexOutOfBoundsException("Index: "+index+
", Size: "+len);
Object[] newElements;
int numMoved = len - index;
if (numMoved == 0)
newElements = Arrays.copyOf(elements, len + 1);
else {
newElements = new Object[len + 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index, newElements, index + 1,
numMoved);
}
newElements[index] = element;
setArray(newElements);
} finally {
lock.unlock();
}
}
ConcurrentHashMap
Segment——分段鎖,提高了批量同步的效能
HashMap執行緒不安全的,輸出值總是小於5000,但是如果程式碼中修改為HashTable則輸出值為5000(HashTable中所有方法都是synchronized 只允許同一時間同一執行緒修改)
為了解決效率低下的問題,產生了ConcurrentHashMap,輸出值始終為5000—表示執行緒安全
public class ConcurrentHashMapSample {
private static int user = 100;//同時模擬的併發使用者訪問數量 --為1的話看不出來效果
//private static int user = 10;
private static int dowloadCounts = 5000;//使用者的真實下載數
private static HashMap count= new HashMap();//計數器
public static void main(String[] args) {
//排程器,jdk1.5之後引入current對於併發的支援
ExecutorService executor = Executors.newCachedThreadPool();
//訊號量 ,用於模擬併發使用者數
final Semaphore semaphore = new Semaphore(user);
for(int i =0;i<dowloadCounts ;i++){
final int index = i;
//通過多執行緒模擬多個使用者訪問的下載次數
executor.execute(new Runnable() {
@Override
public void run() {
try{
semaphore.acquire();
count.put(index, index);
semaphore.release();
}catch(Exception e){
e.printStackTrace();
}
}
});
}
try {
//延遲主執行緒結束--讓for迴圈中程式碼執行完畢
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(count.size());
}
}
ConcurrentHashMap實現原理:
採用分段鎖Segment機制
HashTable:所有操作方法都是同步的,其他執行緒必須等待
這樣效率很低
ConcurrentHashMap把區域分為一個個的很小區域,segment,不同執行緒訪問不同的資料,只要不是同一段內都可以操作,但是如果操作的資料在同一個段內,這樣需要執行緒排隊
這樣效率很快,(極限)16倍,把原來的儲存空間進行分段加鎖處理,段的長度都是2的n次方
- *JUC之Atomic餘CAS演算法(樂觀鎖)
原子性:是指一個操作或多個操作要麼全部執行,且執行的過程不會被任何因素打斷,要麼就都不執行
Atomic包是java.util.concurrent下的另一個專門為執行緒安全設計的java包,包含多個原子操作類
Atomic包:
程式碼:
public class AtomicIntegerSample {
private static int user = 10;//同時模擬的併發使用者訪問數量
//private static int user = 10;
private static int dowloadCounts = 5000;//使用者的真實下載數
private static AtomicInteger count= new AtomicInteger(0);
public static void main(String[] args) {
//排程器,jdk1.5之後引入current對於併發的支援
ExecutorService executor = Executors.newCachedThreadPool();
//訊號量 ,用於模擬併發使用者數
final Semaphore semaphore = new Semaphore(user);
for(int i =0;i<dowloadCounts ;i++){
//通過多執行緒模擬多個使用者訪問的下載次數
executor.execute(new Runnable() {
@Override
public void run() {
try{
semaphore.acquire();
add();
semaphore.release();
}catch(Exception e){
e.printStackTrace();
}
}
});
}
try {
//延遲主執行緒結束--讓for迴圈中程式碼執行完畢
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(count);
}
/* private synchronized static void add(){
count++;
}*/
private static void add(){
count.getAndIncrement();//count++
}
//此處並沒有用synchronized和鎖機制
}
CAS演算法:
鎖是用來做併發的最簡單的機制,當然其代價也是很高的,獨佔鎖是一種悲觀鎖,synchronized就是一種獨佔鎖,它假設最壞的情況,並且只有在確保其它執行緒不會造成干擾的情況下執行,會導致所有需要鎖的執行緒颳起,等待持有鎖的執行緒釋放鎖
所謂樂觀鎖就是,每次不加鎖而是假設沒有衝突而去完成某項操作,如果因為衝突失敗就重試,直到成功為止。
其中CAS(比較與交換 Compare And Swap ),是一種有名的無鎖演算法
比較的是期望值與實際操作的結果
Atomic的應用場景:
雖然基於CAS的執行緒安全機制很好很搞笑,但是要說的是,並非所有執行緒安全都可以用這樣的方法來實現,這隻適合於一些鎖粒度比較小型:例如計數器這樣的需求用起來才更有效果,否則也不會有鎖的存在了
對於大量資料操作反而會損耗效能,因為做了很多次
- 總結