閉關修煉(三)多執行緒之間的通訊
技術標籤:java閉關修煉
文章目錄
多執行緒之間通訊
什麼是多執行緒之間的通訊?
目的是避免多個執行緒在操作同一個資源(共享資源)執行緒安全問題,其中多個執行緒操作的動作存在不同。
生產者與消費者例子
生產者執行緒對共享資源做寫操作,消費者執行緒讀共享資源,生產者表示釋出資源,消費者表示讀取資源,他們都同時對共享資源進行操作,但是操作的動作不同。
共享資源執行緒不安全演示程式碼:
package duoxiancheng;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
/**
* 共享資源實體類
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
class Res {
private String userName;
private String sex;
}
/**
* 生成者執行緒
*/
@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
@AllArgsConstructor
class Producer extends Thread {
private Res res;
public void run() {
int count = 0;
while (true) {
if (count == 0) {
res.setUserName("小紅");
res.setSex("女");
} else {
res.setUserName("小明");
res.setSex("男");
}
// 每次迴圈count依次取:0 1 0 1
count = (count + 1) % 2;
}
}
}
/**
* 消費者
*/
@EqualsAndHashCode(callSuper = true)
@Data
@AllArgsConstructor
@NoArgsConstructor
class Consumer extends Thread {
private Res res;
@Override
public void run() {
while (true){
System.out.println(res.toString());
}
}
}
public class ShengchanXiaofei {
public static void main(String[] args) {
// 共享資源res
Res res = new Res();
Producer producer = new Producer(res);
Consumer consumer = new Consumer(res);
producer.start();
consumer.start();
}
}
這段程式碼會產生執行緒安全問題,小紅性別出現了男,出現的這樣結果是因為消費者執行緒讀共享資源的時候,讀到一半剛好被暫停了,生產者執行緒又修改了資源,就導致了資料出錯的問題。
如何解決生成者和消費者問題?
生產者執行緒和消費者執行緒之間要進行同步!
是兩個執行緒都要進行同步,這樣才能保證兩個執行緒之間的資料的原子性問題。
如何新增同步?
用synchronized程式碼塊,並且兩個執行緒使用同一把鎖,故不能使用this鎖,因為這個兩個執行緒是不同類。 他們共享Res類資源,故可以用res物件作為鎖。
/**
* 生成者執行緒
*/
@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
@AllArgsConstructor
class Producer extends Thread {
private Res res;
public void run() {
int count = 0;
while (true) {
synchronized (res){
if (count == 0) {
res.setUserName("小紅");
res.setSex("女");
} else {
res.setUserName("小明");
res.setSex("男");
}
// 每次迴圈count依次取:0 1 0 1
count = (count + 1) % 2;
}
}
}
}
/**
* 消費者
*/
@EqualsAndHashCode(callSuper = true)
@Data
@AllArgsConstructor
@NoArgsConstructor
class Consumer extends Thread {
private Res res;
@Override
public void run() {
while (true){
synchronized (res){
System.out.println(res.toString());
}
}
}
}
但是這個程式碼還是有缺陷,和實際的生成應用還是有差別,消費者拿資料應該是一男一女交替,而不是一片連在一起。
wait和notify
對上面提到的問題的解決辦法?
寫完一個,讀一個,保證這樣的同步。生產者沒有任何生產,消費者不能讀。消費者沒有消費完生產者不能生產。某程度上等於沒有緩衝區的生產情況。
使用wait和notify方法可以解決。
wait和notify的作用?
wait讓當前執行緒從執行狀態變為休眠狀態,讓出cpu執行權,釋放當前鎖的資源,把資源給阻塞中的執行緒。
notify讓當前執行緒從休眠狀態變為執行狀態
wait和notify的使用條件?
- 要在同步中才能使用wait和notify,並且是鎖物件(共享資源)呼叫wait和notify。
- 使用同一把鎖,即都在synchronized程式碼塊中,使用鎖物件相同。
- wait和notify是成對出現的
使用程式碼例子
package duoxiancheng;
import lombok.*;
/**
* 共享資源實體類
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
class Res {
private String userName;
private String sex;
// false時生產者執行,true時消費者執行
private Boolean flag = false;
}
/**
* 生成者執行緒
*/
@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
@AllArgsConstructor
class Producer extends Thread {
private Res res;
@SneakyThrows
public void run() {
int count = 0;
while (true) {
synchronized (res) {
if (res.getFlag()) {
// flag為true時變成休眠狀態,並且釋放鎖的資源
res.wait();
}
if (count == 0) {
res.setUserName("小紅");
res.setSex("女");
} else {
res.setUserName("小明");
res.setSex("男");
}
// 每次迴圈count依次取:0 1 0 1
count = (count + 1) % 2;
// 寫操作完成,更改flag值,再進行生產者執行緒時將變為休眠狀態
res.setFlag(true);
// 喚醒當前被wait的執行緒
res.notify();
}
}
}
}
/**
* 消費者
*/
@EqualsAndHashCode(callSuper = true)
@Data
@AllArgsConstructor
@NoArgsConstructor
class Consumer extends Thread {
private Res res;
@SneakyThrows
@Override
public void run() {
while (true) {
synchronized (res) {
// flag為true時消費者執行,如果flag為false時執行緒應休眠,釋放物件鎖。
if(!res.getFlag()){
res.wait();
}
System.out.println(res.toString());
// 消費者執行完畢,更改flag值
res.setFlag(false);
// 喚醒當前被wait的執行緒
res.notify();
}
}
}
}
public class ShengchanXiaofei {
public static void main(String[] args) {
// 共享資源res
Res res = new Res();
Producer producer = new Producer(res);
Consumer consumer = new Consumer(res);
producer.start();
consumer.start();
}
}
兩點重要的點
一是wait讓執行緒讓出cpu執行權,釋放當前鎖
二是notify喚醒使用wait的執行緒,即通知正在等待控制權的執行緒可以繼續執行
notifyAll
喚醒所有等待
wait和sleep異同點在哪裡?
wait可以釋放鎖的資源
sleep不會釋放鎖的資源
都是做休眠
wait需要notify才能從休眠變為執行狀態
sleep只要時間到了就變為執行狀態
小試牛刀題
Object類中有哪些方法?
wait,notify,notifyAll,equals,hashCode,getClass,clone,toString,finalize
synchronized程式碼塊什麼時候開始上鎖?什麼時候開始釋放鎖?
程式碼開始和程式碼結束
synchronized有什麼缺點?
效率低,擴充套件性不高,不能自定義
你瞭解哪些java併發包?
Atomic中的原子類如AtomicInteger,AtomicIntegerArray,AtomicReference等,Lock鎖
多執行緒併發和網頁併發的區別?
多執行緒併發是多個Thread執行緒同時操作同一個資源,網頁併發是多個請求同時訪問一臺伺服器。
JDK1.5-Lock
什麼是Lock介面?
用來實現鎖的功能,Lock介面提供了與synchronized關鍵字類似的同步功能,但需要在使用時手動獲取鎖和釋放鎖。
Lock鎖寫法
lock鎖一般定義在共享資源中,多個執行緒使用同一個lock物件
Lock lock = new ReentrantLock();
lock.lock();
try{
}
finally {
lock.unlock();
}
演示程式碼
wait和notify不能在這裡使用,只能在synchronized程式碼塊中使用!
package duoxiancheng;
import com.sun.org.apache.xml.internal.security.signature.reference.ReferenceNodeSetData;
import lombok.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 共享資源實體類
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
class Res {
private String userName;
private String sex;
// false時生產者執行,true時消費者執行
private Boolean flag = false;
private Lock lock = new ReentrantLock();
}
/**
* 生成者執行緒
*/
@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
@AllArgsConstructor
class Producer2 extends Thread {
private Res res;
@SneakyThrows
public void run() {
int count = 0;
while (true) {
try {
// 開始上鎖
res.getLock().lock();
if (count == 0) {
res.setUserName("小紅");
res.setSex("女");
} else {
res.setUserName("小明");
res.setSex("男");
}
// 每次迴圈count依次取:0 1 0 1
count = (count + 1) % 2;
res.setFlag(true);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 釋放鎖
res.getLock().unlock();
}
}
}
}
/**
* 消費者
*/
@EqualsAndHashCode(callSuper = true)
@Data
@AllArgsConstructor
@NoArgsConstructor
class Consumer2 extends Thread {
private Res res;
@SneakyThrows
@Override
public void run() {
while (true) {
try {
res.getLock().lock();
System.out.println(res.toString());
res.setFlag(false);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 釋放鎖
res.getLock().unlock();
}
}
}
}
public class LockDemo {
public static void main(String[] args) {
// 共享資源res
Res res = new Res();
Producer2 producer = new Producer2(res);
Consumer2 consumer = new Consumer2(res);
producer.start();
consumer.start();
}
}
什麼是ReentrantLock鎖?
日後其他部落格裡再寫
Condition類解決wait和notify無法使用問題
Condition一般和Lock鎖一起使用。
Condition的功能類似於傳統技術中的Object.wait和notify。
線上程中例項化Condition
Condition condition = res.getLock().newCondition();
使用await和signal方法等待和喚醒
示例程式碼:
package duoxiancheng;
import lombok.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Data
@AllArgsConstructor
@NoArgsConstructor
class Res {
public String userName;
public String sex;
public Boolean flag = false;
Lock lock = new ReentrantLock();
}
class InputThread extends Thread {
private Res res;
Condition newCondition;
public InputThread(Res res, Condition newCondition) {
this.res = res;
this.newCondition = newCondition;
}
@Override
public void run() {
int count = 0;
while (true) {
try {
res.lock.lock();
if (res.flag) {
try {
newCondition.await();
} catch (Exception e) {
// TODO: handle exception
}
}
if (count == 0) {
res.userName = "小明";
res.sex = "男";
} else {
res.userName = "小紅";
res.sex = "女";
}
count = (count + 1) % 2;
res.flag = true;
newCondition.signal();
} catch (Exception e) {
// TODO: handle exception
} finally {
res.lock.unlock();
}
}
}
}
class OutThrad extends Thread {
private Res res;
private Condition newCondition;
public OutThrad(Res res, Condition newCondition) {
this.res = res;
this.newCondition = newCondition;
}
@Override
public void run() {
while (true) {
try {
res.lock.lock();
if (!res.flag) {
try {
newCondition.await();
} catch (Exception e) {
// TODO: handle exception
}
}
System.out.println(res.userName + "," + res.sex);
res.flag = false;
newCondition.signal();
} catch (Exception e) {
// TODO: handle exception
} finally {
res.lock.unlock();
}
}
}
}
public class LockDemo {
public static void main(String[] args) {
Res res = new Res();
Condition newCondition = res.lock.newCondition();
InputThread inputThread = new InputThread(res, newCondition);
OutThrad outThrad = new OutThrad(res, newCondition);
inputThread.start();
outThrad.start();
}
}
小試牛刀題
說說lock鎖和synchronized(同步)的區別?
- Lock介面可以嘗試非阻塞地獲取鎖,當前執行緒嘗試獲取鎖,如果這一時刻如果沒有被其他執行緒獲取到,則成功獲取並持有鎖。
- Lock鎖能被中斷地獲取鎖,synchronized是程式碼塊執行完畢或者報異常才釋放鎖
- synchronized自動上鎖和釋放鎖,而lock鎖需要自己上鎖和釋放鎖,lock鎖靈活性更強。
- lock鎖運用比synchronized要多
如何停止執行緒
用stop方法為什麼不好?
程式執行到一半的時候,直接把程式stop,程式中斷,後面一半程式沒有執行完,先前執行的程式碼也不會回滾,即程式不會恢復,不安全。
說說設計停止執行緒的思路?
根據stop的缺點進行優化,程式碼執行完畢再停止。無非讓迴圈停止。
程式碼設計:
package ch3;
import lombok.SneakyThrows;
class MyThread extends Thread {
private volatile Boolean flag = true;
@Override
public void run() {
System.out.println("start loop");
while (flag) {
}
System.out.println("stop loop");
}
public void stopThread() {
this.flag = false;
}
}
public class StopDemo {
@SneakyThrows
public static void main(String[] args) {
MyThread myThread = new MyThread();
myThread.start();
for (int i = 0; i < 2; i++) {
System.out.println("main thread, waiting");
Thread.sleep(1000);
}
myThread.stopThread();
for (int i = 0; i < 5; i++) {
System.out.println("main thread, waiting");
Thread.sleep(1000);
}
}
}
但是這樣的設計還有什麼問題呢?如果while迴圈中存在wait等待,則不會到while判斷為false退出迴圈,直到執行緒被notify喚醒,然而這樣不是我們希望的,因此正確的用法是使用interrupt方法,interrupt方法會丟擲異常,在異常處理中處理停止執行緒。
package ch3;
import lombok.SneakyThrows;
class MyThread extends Thread {
private volatile Boolean flag = true;
@Override
public synchronized void run() {
try{
System.out.println("start loop");
while (flag) {
wait();
}
}catch (Exception e){
stopThread();
System.out.println("stop loop");
}
}
public void stopThread() {
this.flag = false;
}
}
public class StopDemo {
@SneakyThrows
public static void main(String[] args) {
MyThread myThread = new MyThread();
myThread.start();
for (int i = 0; i < 2; i++) {
System.out.println("main thread, waiting");
Thread.sleep(1000);
}
myThread.interrupt();
for (int i = 0; i < 5; i++) {
System.out.println("main thread, waiting");
Thread.sleep(1000);
}
}
}
ThreadLocal類
談談什麼是ThreadLocal?
是本地執行緒, 為每一個執行緒提供一個區域性變數。
執行緒t1、t2共享主記憶體中的資料,使用ThreadLoacl後,在本地執行緒提供區域性變數,將共享的資料變成本地的變數。
ThreadLocal例子
package ch3;
import lombok.AllArgsConstructor;
class ResNumber {
public int count = 0;
public static ThreadLocal<Integer> threadLocal = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
// threadLocal起始值賦值為0
return 0;
}
};
public String getNumber() {
count = threadLocal.get() + 1;
threadLocal.set(count);
return String.valueOf(count);
}
}
@AllArgsConstructor
class LocalThread extends Thread {
public ResNumber resNumber;
@Override
public void run() {
for (int i = 0; i < 100; i++) {
System.out.println(getName() + ":" + resNumber.getNumber());
}
}
}
public class ThreadLoca {
public static void main(String[] args) {
ResNumber resNumber = new ResNumber();
LocalThread localThread1 = new LocalThread(resNumber);
LocalThread localThread2 = new LocalThread(resNumber);
LocalThread localThread3 = new LocalThread(resNumber);
localThread1.start();
localThread2.start();
localThread3.start();
}
}
ThreadLocal的原理?
Map<Thread,Object>集合,Thread為當前執行緒,Object為要local的資料
底層程式碼
set方法:
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
get方法:
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}