java多線程系列(三)
阿新 • • 發佈:2017-08-02
htm cep 開始 管道 線程間通信 base 並發編程 nds 打印
等待通知機制
前言:本系列將從零開始講解java多線程相關的技術,內容參考於《java多線程核心技術》與《java並發編程實戰》等相關資料,希望站在巨人的肩膀上,再通過我的理解能讓知識更加簡單易懂。
目錄
- 認識cpu、核心與線程
- java多線程系列(一)之java多線程技能
- java多線程系列(二)之對象變量的並發訪問
- java多線程系列(三)之等待通知機制
- java多線程系列(四)之ReentrantLock的使用
非等待通知
public void run() { try { for (int i = 0; i < 10; i++) { list.add(); System.out.println("添加了" + (i + 1) + "個元素"); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } } public void run() { try { while (true) { if (list.size() == 5) { System.out.println("==5了,線程b要退出了!"); throw new InterruptedException(); } } } catch (InterruptedException e) { e.printStackTrace(); } }
- 兩個線程實現了通信,但list大小為5的時候,線程B退出了,但是線程B不停地輪詢是否為5,這個時候是很占資源的
- 如果輪詢的時間間隔小,這個時候更加浪費資源
- 如果輪詢的時間間隔大,那麽還可能錯過了想要的數據,比如可能錯過了5
- 這裏共享了list,所以實現了通信,但是因為不知道什麽時候通信,所以不停地輪詢,這種通信有缺點,一是浪費cpu資源,二是可能讀取到錯誤的數據
什麽是等待通知機制
- 線程A要等待線程B發出通知才執行,這個時候線程A可以執行wait方法,等待線程B執行notify方法喚醒線程A
等待通知機制實現
public void run() { try { synchronized (lock) { if (MyList.size() != 5) { System.out.println("wait begin " + System.currentTimeMillis()); lock.wait(); System.out.println("wait end " + System.currentTimeMillis()); } } } catch (InterruptedException e) { e.printStackTrace(); } } public void run() { try { synchronized (lock) { for (int i = 0; i < 10; i++) { MyList.add(); if (MyList.size() == 5) { lock.notify(); System.out.println("已發出通知!"); } System.out.println("添加了" + (i + 1) + "個元素!"); Thread.sleep(1000); } } } catch (InterruptedException e) { e.printStackTrace(); } }
- 將上面的代碼進行更改,當大小不等於5的時候,線程A處於wait狀態,直到線程B發出通知,喚醒線程A,通過等待通知機制,避免了線程A不停輪詢造成的資源浪費
消息通知機制註意點
- wait和notify必須是在同步方法和同步代碼塊裏面調用,要不然會拋出異常
- notify方法是繼承自Object類,可以喚醒在此對象監視器等待的線程,也就是說喚醒的是同一個鎖的線程
- notify方法調用之後,不會馬上釋放鎖,而是運行完該同步方法或者是運行完該同步代碼塊的代碼
- 調用notify後隨機喚醒的是一個線程
- 調用wait方法後會將鎖釋放
- wait狀態下中斷線程會拋出異常
- wait(long),超過設置的時間後會自動喚醒,還沒超過該時間也可以通過其他線程喚醒
- notifyAll可以喚醒同一鎖的所有線程
- 如果線程還沒有處於等待狀態,其他線程進行喚醒,那麽不會起作用,此時會打亂程序的正常邏輯
案例:生產者消費者模式
一個生產者,一個消費者
public void setValue() {
try {
synchronized (lock) {
if (!ValueObject.value.equals("")) {
lock.wait();
}
String value = System.currentTimeMillis() + "_"
+ System.nanoTime();
System.out.println("set"+ value);
ValueObject.value = value;
lock.notify();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void getValue() {
try {
synchronized (lock) {
if (ValueObject.value.equals("")) {
lock.wait();
}
System.out.println("get"+ ValueObject.value);
ValueObject.value = "";
lock.notify();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void run() {
while (true) {
r.getValue();
}
}
public void run() {
while (true) {
p.setValue();
}
}
- 如果我們創建一個生產線程,一個消費線程,那麽這個時候會交替運行
多個生產者,多個消費者
public void getValue() {
try {
synchronized (lock) {
while (ValueObject.value.equals("")) {
System.out.println("消費者 "
+ Thread.currentThread().getName() + " WAITING了☆");
lock.wait();
}
System.out.println("消費者 " + Thread.currentThread().getName()
+ " RUNNABLE了");
ValueObject.value = "";
lock.notify();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void run() {
while (true) {
r.getValue();
}
}
public void setValue() {
try {
synchronized (lock) {
while (!ValueObject.value.equals("")) {
System.out.println("生產者 "
+ Thread.currentThread().getName() + " WAITING了★");
lock.wait();
}
System.out.println("生產者 " + Thread.currentThread().getName()
+ " RUNNABLE了");
String value = System.currentTimeMillis() + "_"
+ System.nanoTime();
ValueObject.value = value;
lock.notify();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void run() {
while (true) {
p.setValue();
}
}
- 如果這個時候創建多個生產者,多個消費者,如果連續喚醒的是同類線程,那麽會出現假死狀態,就是線程都處於waiting狀態,因為notify隨機喚醒一個線程,如果喚醒的同類的,那麽就浪費了一次喚醒,如果這個時候無法再喚醒異類線程,那麽就會假死。這種情況把notify改成notifyAll()就行了。
消息通知機制需要註意的地方
- 是否線程喚醒的是同類線程會造成影響
- 生產者消費模式,判斷條件if和while應該使用哪一個
通過管道進行線程間通信
public class ThreadWrite extends Thread {
private WriteData write;
private PipedOutputStream out;
public ThreadWrite(WriteData write, PipedOutputStream out) {
super();
this.write = write;
this.out = out;
}
@Override
public void run() {
write.writeMethod(out);
}
}
public class ThreadRead extends Thread {
private ReadData read;
private PipedInputStream input;
public ThreadRead(ReadData read, PipedInputStream input) {
super();
this.read = read;
this.input = input;
}
@Override
public void run() {
read.readMethod(input);
}
}
public class Run {
public static void main(String[] args) {
try {
WriteData writeData = new WriteData();
ReadData readData = new ReadData();
PipedInputStream inputStream = new PipedInputStream();
PipedOutputStream outputStream = new PipedOutputStream();
// inputStream.connect(outputStream);
outputStream.connect(inputStream);//關鍵
ThreadRead threadRead = new ThreadRead(readData, inputStream);
threadRead.start();
Thread.sleep(2000);
ThreadWrite threadWrite = new ThreadWrite(writeData, outputStream);
threadWrite.start();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- PipedInputStream和PiepedOutputStream(對應字符流PipedReader和PipedOutputWriter)這幾個類可以實現線程間流的通信,將管道輸出流和輸出流連接,實現一個線程往管道發送數據,一個線程從管道讀取數據
join方法
public static void main(String[] args) {
try {
MyThread threadTest = new MyThread();
threadTest.start();
threadTest.join();
System.out.println("threadTest對象執行完,我再執行");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
- 當前線程阻塞(main線程),調用線程(threadTest)正常執行,執行完後當前線程(main)繼續執行
public class ThreadB extends Thread {
@Override
public void run() {
try {
ThreadA a = new ThreadA();
a.start();
a.join();
System.out.println("線程B在run end處打印了");
} catch (InterruptedException e) {
System.out.println("線程B在catch處打印了");
e.printStackTrace();
}
}
}
- 如果線程B執行完了join方法,此時線程B被中斷,那麽這個時候拋出異常,但是線程A正常運行
join(long)和sleep(long)的區別
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
- 從join方法的源代碼可以發現,他的核心方法是wait,在前面已經提到wait方法會釋放鎖,說明join方法也會釋放鎖,但是sleep是不會釋放鎖的。
- join方法是非靜態的,而sleep是靜態的
ThreadLocal
- 解決變量在各個線程的隔離性,每個線程綁定自己的值
public void run() {
try {
for (int i = 0; i < 100; i++) {
if (Tools.tl.get() == null) {
Tools.tl.set("ThreadA" + (i + 1));
} else {
System.out.println("ThreadA get Value=" + Tools.tl.get());
}
Thread.sleep(200);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void run() {
try {
for (int i = 0; i < 100; i++) {
if (Tools.tl.get() == null) {
Tools.tl.set("ThreadB" + (i + 1));
} else {
System.out.println("ThreadB get Value=" + Tools.tl.get());
}
Thread.sleep(200);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public class Tools {
public static ThreadLocal tl = new ThreadLocal();
}
- 每個線程都設置了值,但是得到的值卻是自己的,互相隔離
-
如果不開始不設置值,那麽得到的值都是null,可以通過繼承ThreadLocal,重載initalValue方法,設置初始值
public class ThreadLocalExt extends ThreadLocal { @Override protected Object initialValue() { return new Date().getTime(); } }
-
InheritableThreadLocal,子線程可以繼承父線程的值
public class InheritableThreadLocalExt extends InheritableThreadLocal { @Override protected Object initialValue() { return new Date().getTime(); } } public static void main(String[] args) { try { for (int i = 0; i < 10; i++) { System.out.println(" 在Main線程中取值=" + Tools.tl.get()); Thread.sleep(100); } Thread.sleep(5000); ThreadA a = new ThreadA(); a.start(); } catch (InterruptedException e) { e.printStackTrace(); } } //main線程和A線程輸出的一樣
-
在上面代碼的基礎上,重寫childValue方法可以設置子線程的值
我覺得分享是一種精神,分享是我的樂趣所在,不是說我覺得我講得一定是對的,我講得可能很多是不對的,但是我希望我講的東西是我人生的體驗和思考,是給很多人反思,也許給你一秒鐘、半秒鐘,哪怕說一句話有點道理,引發自己內心的感觸,這就是我最大的價值。(這是我喜歡的一句話,也是我寫博客的初衷)
java多線程系列(三)