黑馬程式設計師_基礎加強_Java執行緒通訊和執行緒併發庫
阿新 • • 發佈:2019-02-17
------- android培訓、java培訓、期待與您交流! ----------
java5的執行緒鎖技術
Lock&Condition實現執行緒同步通訊Lock比傳統的synchronized方式更加面向物件,兩個執行緒執行的程式碼塊要實現同步互斥,必須持有同一個Lock物件。
ReadWriteLock,多個讀鎖不互斥,讀鎖與寫鎖互斥。如果需要多執行緒同時讀,但不能同時寫,加讀鎖;如果程式碼修改資料,為改程式碼加寫鎖,寫鎖是執行緒獨佔的。
在等待 Condition 時,允許發生“虛假喚醒”,Condition 應該總是在一個迴圈中被等待,並測試正被等待的狀態。
Condition condition = lock.newCondition();
使用讀寫鎖的快取功能
- class CachedData {
- Object data;
- volatile boolean cacheValid;
- ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
- void processCachedData() {
- rwl.readLock().lock();
- if (!cacheValid) {//如果快取中沒有data
- // 在使用寫鎖前必須釋放讀鎖
- rwl.readLock().unlock();
- rwl.writeLock().lock();
- // 獲取寫鎖後在檢查
- if (!cacheValid) {
- data = ...//寫入data
- cacheValid = true;
- }
- // 寫入資料後,上讀鎖,在釋放寫鎖
- rwl.readLock().lock();
- rwl.writeLock().unlock();
- }
- use(data);
- rwl.readLock().unlock();
- }
- }
class CachedData { Object data; volatile boolean cacheValid; ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { rwl.readLock().lock(); if (!cacheValid) {//如果快取中沒有data // 在使用寫鎖前必須釋放讀鎖 rwl.readLock().unlock(); rwl.writeLock().lock(); // 獲取寫鎖後在檢查 if (!cacheValid) { data = ...//寫入data cacheValid = true; } // 寫入資料後,上讀鎖,在釋放寫鎖 rwl.readLock().lock(); rwl.writeLock().unlock(); } use(data); rwl.readLock().unlock(); } }
Semaphore 計數訊號燈
限制可以訪問某些資源的執行緒數目。可進入同一段程式碼的執行緒數目。Semaphore(int permits) permits允許的併發執行緒數.
acquire() 從此訊號量獲取一個許可.
void release() 釋放一個許可。
單個訊號量的Semaphore物件可以實現互斥鎖的功能,並且可以是由一個執行緒獲得了“鎖”,再由另一個執行緒釋放“鎖”。
[java] view plaincopyprint?
- public class SemaphoreTest {
- public static void main(String[] args) {
- ExecutorService service = Executors.newCachedThreadPool();
- final Semaphore sp = new Semaphore(3);//建立一個可以允許3個執行緒併發訪問的訊號燈
- for(int i=0;i<10;i++){
- Runnable runnable = new Runnable(){
- public void run(){
- try {
- sp.acquire();
- } catch (InterruptedException e1) {
- e1.printStackTrace();
- }
- System.out.println("執行緒" + Thread.currentThread().getName() +
- "進入,當前已有" + (3-sp.availablePermits()) + "個併發");
- try {
- Thread.sleep((long)(Math.random()*10000));
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("執行緒" + Thread.currentThread().getName() +
- "即將離開");
- sp.release();
- //下面程式碼有時候執行不準確,因為其沒有和上面的程式碼合成原子單元
- System.out.println("執行緒" + Thread.currentThread().getName() +
- "已離開,當前已有" + (3-sp.availablePermits()) + "個併發");
- }
- };
- service.execute(runnable); //將任務交給執行緒池
- }
- }
- }
public class SemaphoreTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final Semaphore sp = new Semaphore(3);//建立一個可以允許3個執行緒併發訪問的訊號燈
for(int i=0;i<10;i++){
Runnable runnable = new Runnable(){
public void run(){
try {
sp.acquire();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("執行緒" + Thread.currentThread().getName() +
"進入,當前已有" + (3-sp.availablePermits()) + "個併發");
try {
Thread.sleep((long)(Math.random()*10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("執行緒" + Thread.currentThread().getName() +
"即將離開");
sp.release();
//下面程式碼有時候執行不準確,因為其沒有和上面的程式碼合成原子單元
System.out.println("執行緒" + Thread.currentThread().getName() +
"已離開,當前已有" + (3-sp.availablePermits()) + "個併發");
}
};
service.execute(runnable); //將任務交給執行緒池
}
}
}
CyclicBarrier
一個同步輔助類,它允許一組執行緒互相等待,直到到達某個公共屏障點。[java] view plaincopyprint?
- public class CyclicBarrierTest {
- public static void main(String[] args) {
- ExecutorService service = Executors.newCachedThreadPool();//建立一個執行緒池
- final CyclicBarrier cb = new CyclicBarrier(3);//建立一個迴圈barrier,執行緒數目為3
- for(int i=0;i<3;i++){//新建3個執行緒,交給執行緒池執行
- Runnable runnable = new Runnable(){
- public void run(){
- try {
- Thread.sleep((long)(Math.random()*10000));
- System.out.println("執行緒" + Thread.currentThread().getName() +
- "即將到達集合地點1,當前已有" + (cb.getNumberWaiting()+1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候"));
- cb.await();//等待其他執行緒
- Thread.sleep((long)(Math.random()*10000));
- System.out.println("執行緒" + Thread.currentThread().getName() +
- "即將到達集合地點2,當前已有" + (cb.getNumberWaiting()+1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候"));
- cb.await();
- Thread.sleep((long)(Math.random()*10000));
- System.out.println("執行緒" + Thread.currentThread().getName() +
- "即將到達集合地點3,當前已有" + (cb.getNumberWaiting() + 1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候"));
- cb.await();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- };
- service.execute(runnable);
- }
- service.shutdown();
- }
- }
public class CyclicBarrierTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();//建立一個執行緒池
final CyclicBarrier cb = new CyclicBarrier(3);//建立一個迴圈barrier,執行緒數目為3
for(int i=0;i<3;i++){//新建3個執行緒,交給執行緒池執行
Runnable runnable = new Runnable(){
public void run(){
try {
Thread.sleep((long)(Math.random()*10000));
System.out.println("執行緒" + Thread.currentThread().getName() +
"即將到達集合地點1,當前已有" + (cb.getNumberWaiting()+1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候"));
cb.await();//等待其他執行緒
Thread.sleep((long)(Math.random()*10000));
System.out.println("執行緒" + Thread.currentThread().getName() +
"即將到達集合地點2,當前已有" + (cb.getNumberWaiting()+1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候"));
cb.await();
Thread.sleep((long)(Math.random()*10000));
System.out.println("執行緒" + Thread.currentThread().getName() +
"即將到達集合地點3,當前已有" + (cb.getNumberWaiting() + 1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候"));
cb.await();
} catch (Exception e) {
e.printStackTrace();
}
}
};
service.execute(runnable);
}
service.shutdown();
}
}
CountDownLatch
CountDownLatch(int count) 構造一個用給定計數初始化的 CountDownLatch。呼叫countDown方法就將計數器減1,當計數到達0時,則所有等待者或單個等待者開始執行。
[java] view plaincopyprint?
- public class CountdownLatchTest {
- public static void main(String[] args) {
- ExecutorService service = Executors.newCachedThreadPool();
- final CountDownLatch cdOrder = new CountDownLatch(1);
- final CountDownLatch cdAnswer = new CountDownLatch(3);
- for(int i=0;i<3;i++){
- Runnable runnable = new Runnable(){//建立3個執行緒
- public void run(){
- try {
- System.out.println("執行緒" + Thread.currentThread().getName() +
- "正準備接受命令");
- cdOrder.await();//等待主執行緒控制cdOrder開始任務
- System.out.println("執行緒" + Thread.currentThread().getName() +
- "已接受命令");
- Thread.sleep((long)(Math.random()*10000));
- System.out.println("執行緒" + Thread.currentThread().getName() +
- "迴應命令處理結果");
- cdAnswer.countDown();//任務完成通知主執行緒
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- };
- service.execute(runnable);
- }
- try {
- Thread.sleep((long)(Math.random()*10000));
- System.out.println("執行緒" + Thread.currentThread().getName() +
- "即將釋出命令");
- cdOrder.countDown();//將count歸0,開始執行任務
- System.out.println("執行緒" + Thread.currentThread().getName() +
- "已傳送命令,正在等待結果");
- cdAnswer.await();//等待子執行緒完成
- System.out.println("執行緒" + Thread.currentThread().getName() +
- "已收到所有響應結果");
- } catch (Exception e) {
- e.printStackTrace();
- }
- service.shutdown();
- }
- }
public class CountdownLatchTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final CountDownLatch cdOrder = new CountDownLatch(1);
final CountDownLatch cdAnswer = new CountDownLatch(3);
for(int i=0;i<3;i++){
Runnable runnable = new Runnable(){//建立3個執行緒
public void run(){
try {
System.out.println("執行緒" + Thread.currentThread().getName() +
"正準備接受命令");
cdOrder.await();//等待主執行緒控制cdOrder開始任務
System.out.println("執行緒" + Thread.currentThread().getName() +
"已接受命令");
Thread.sleep((long)(Math.random()*10000));
System.out.println("執行緒" + Thread.currentThread().getName() +
"迴應命令處理結果");
cdAnswer.countDown();//任務完成通知主執行緒
} catch (Exception e) {
e.printStackTrace();
}
}
};
service.execute(runnable);
}
try {
Thread.sleep((long)(Math.random()*10000));
System.out.println("執行緒" + Thread.currentThread().getName() +
"即將釋出命令");
cdOrder.countDown();//將count歸0,開始執行任務
System.out.println("執行緒" + Thread.currentThread().getName() +
"已傳送命令,正在等待結果");
cdAnswer.await();//等待子執行緒完成
System.out.println("執行緒" + Thread.currentThread().getName() +
"已收到所有響應結果");
} catch (Exception e) {
e.printStackTrace();
}
service.shutdown();
}
}
Exchanger
實現持有同一Exchanger物件的兩個執行緒間的資料交換。V exchange(V x) 方法交換資料。
可阻塞佇列
public class ArrayBlockingQueue<E>extends AbstractQueue<E>implements BlockingQueue<E>, Serializable按 FIFO(先進先出)原則對元素進行排序。
生產者使用put方法放入資料,消費者使用take方法取出資料。
新元素插入到佇列的尾部,佇列獲取操作則是從佇列頭部開始獲得元素。
size() 返回此佇列中元素的數量。
可以由兩個只有一個元素的同步佇列物件實現執行緒通訊功能。
空中網題目:
Test類中的程式碼在不斷地產生資料,然後交給TestDo.doSome()方法去處理,就好像生產者在不斷地產生資料,消費者在不斷消費資料。程式有10個執行緒來消費生成者產生的資料,這些消費者都呼叫TestDo.doSome()方法去進行處理,故每個消費者都需要一秒才能處理完,程式應保證這些消費者執行緒依次有序地消費資料,只有上一個消費者消費完後,下一個消費者才能消費資料,下一個消費者是誰都可以,但要保證這些消費者執行緒拿到的資料是有順序的。
[java] view plaincopyprint?
- public class Test {
- public static void main(String[] args) {
- final Semaphore semaphore = new Semaphore(1);
- final SynchronousQueue<String> queue = new SynchronousQueue<String>();
- for(int i=0;i<10;i++){
- new Thread(new Runnable(){
- @Override
- public void run() {
- try {
- semaphore.acquire();
- String input = queue.take();//保證按順序取出資料
- String output = TestDo.doSome(input);
- System.out.println(Thread.currentThread().getName()+ ":" + output);
- semaphore.release();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }).start();
- }
- System.out.println("begin:"+(System.currentTimeMillis()/1000));
- for(int i=0;i<10;i++){ //這行不能改動
- String input = i+""; //這行不能改動
- try {
- queue.put(input);//主執行緒的迴圈產生資料,並將資料放入同步佇列中,由子執行緒對佇列進行操作
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- //不能改動此TestDo類
- class TestDo {
- public static String doSome(String input){
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- String output = input + ":"+ (System.currentTimeMillis() / 1000);
- return output;
- }
- }
public class Test {
public static void main(String[] args) {
final Semaphore semaphore = new Semaphore(1);
final SynchronousQueue<String> queue = new SynchronousQueue<String>();
for(int i=0;i<10;i++){
new Thread(new Runnable(){
@Override
public void run() {
try {
semaphore.acquire();
String input = queue.take();//保證按順序取出資料
String output = TestDo.doSome(input);
System.out.println(Thread.currentThread().getName()+ ":" + output);
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
System.out.println("begin:"+(System.currentTimeMillis()/1000));
for(int i=0;i<10;i++){ //這行不能改動
String input = i+""; //這行不能改動
try {
queue.put(input);//主執行緒的迴圈產生資料,並將資料放入同步佇列中,由子執行緒對佇列進行操作
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//不能改動此TestDo類
class TestDo {
public static String doSome(String input){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String output = input + ":"+ (System.currentTimeMillis() / 1000);
return output;
}
}
同步集合
傳統方式下的Collection在迭代集合時,不允許對集合進行修改。同步集合類:ConcurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet、CopyOnWriteArrayList 和 CopyOnWriteArraySet。當期望許多執行緒訪問一個給定 collection 時,ConcurrentHashMap 通常優於同步的 HashMap,ConcurrentSkipListMap 通常優於同步的 TreeMap。當期望的讀數和遍歷遠遠大於列表的更新數時,CopyOnWriteArrayList 優於同步的 ArrayList。
空中網面試題:
如果有幾個執行緒呼叫TestDo.doSome(key, value)方法時,傳遞進去的key相等(equals比較為true),則這幾個執行緒應互斥排隊輸出結果,即當有兩個執行緒的key都是"1"時,它們中的一個要比另外其他執行緒晚1秒輸出結果,如下所示:4:4:1258199615
1:1:1258199615
3:3:1258199615
1:2:1258199616
[java] view plaincopyprint?
- import java.util.Iterator;
- import java.util.concurrent.CopyOnWriteArrayList;
- //不能改動此Test類
- public class Test extends Thread{
- private TestDo testDo;
- private String key;
- private String value;
- public Test(String key,String key2,String value){
- this.testDo = TestDo.getInstance();
- /*常量"1"和"1"是同一個物件,下面這行程式碼就是要用"1"+""的方式產生新的物件,
- 以實現內容沒有改變,仍然相等(都還為"1"),但物件卻不再是同一個的效果*/
- this.key = key+key2;
- this.value = value;
- }
- public static void main(String[] args) throws InterruptedException{
- Test a = new Test("1","","1");
- Test b = new Test("1","","2");
- Test c = new Test("3","","3");
- Test d = new Test("4","","4");
- System.out.println("begin:"+(System.currentTimeMillis()/1000));
- a.start();
- b.start();
- c.start();
- d.start();
- }
- public void run(){
- testDo.doSome(key, value);
- }
- }
- class TestDo {
- private TestDo() {}
- private static TestDo _instance = new TestDo();
- public static TestDo getInstance() {
- return _instance;
- }
- //因為在執行doSome時,需要對key進行判斷和新增,需要使用同步集合
- private CopyOnWriteArrayList keys = new CopyOnWriteArrayList();
- public void doSome(Object key, String value) {
- Object o = key;//以key做同步互斥鎖物件
- if(!keys.contains(o)){
- keys.add(o);
- }else{
- for(Iterator iter=keys.iterator();iter.hasNext();){
- try {
- Thread.sleep(20);//測試程式碼,讓對集合操作的動作多執行一會
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- Object oo = iter.next();
- if(oo.equals(o)){
- o = oo;
- break;
- }
- }
- }
- synchronized(o)
- // 以大括號內的是需要區域性同步的程式碼,不能改動!
- {
- try {
- Thread.sleep(1000);
- System.out.println(key+":"+value + ":"
- + (System.currentTimeMillis() / 1000));
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArrayList;
//不能改動此Test類
public class Test extends Thread{
private TestDo testDo;
private String key;
private String value;
public Test(String key,String key2,String value){
this.testDo = TestDo.getInstance();
/*常量"1"和"1"是同一個物件,下面這行程式碼就是要用"1"+""的方式產生新的物件,
以實現內容沒有改變,仍然相等(都還為"1"),但物件卻不再是同一個的效果*/
this.key = key+key2;
this.value = value;
}
public static void main(String[] args) throws InterruptedException{
Test a = new Test("1","","1");
Test b = new Test("1","","2");
Test c = new Test("3","","3");
Test d = new Test("4","","4");
System.out.println("begin:"+(System.currentTimeMillis()/1000));
a.start();
b.start();
c.start();
d.start();
}
public void run(){
testDo.doSome(key, value);
}
}
class TestDo {
private TestDo() {}
private static TestDo _instance = new TestDo();
public static TestDo getInstance() {
return _instance;
}
//因為在執行doSome時,需要對key進行判斷和新增,需要使用同步集合
private CopyOnWriteArrayList keys = new CopyOnWriteArrayList();
public void doSome(Object key, String value) {
Object o = key;//以key做同步互斥鎖物件
if(!keys.contains(o)){
keys.add(o);
}else{
for(Iterator iter=keys.iterator();iter.hasNext();){
try {
Thread.sleep(20);//測試程式碼,讓對集合操作的動作多執行一會
} catch (InterruptedException e) {
e.printStackTrace();
}
Object oo = iter.next();
if(oo.equals(o)){
o = oo;
break;
}
}
}
synchronized(o)
// 以大括號內的是需要區域性同步的程式碼,不能改動!
{
try {
Thread.sleep(1000);
System.out.println(key+":"+value + ":"
+ (System.currentTimeMillis() / 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}