1. 程式人生 > 其它 >閉關修煉(三)多執行緒之間的通訊

閉關修煉(三)多執行緒之間的通訊

技術標籤:java閉關修煉

文章目錄


多執行緒之間通訊

什麼是多執行緒之間的通訊?

目的是避免多個執行緒在操作同一個資源(共享資源)執行緒安全問題,其中多個執行緒操作的動作存在不同。

生產者與消費者例子

生產者執行緒對共享資源做寫操作,消費者執行緒讀共享資源,生產者表示釋出資源,消費者表示讀取資源,他們都同時對共享資源進行操作,但是操作的動作不同。

共享資源執行緒不安全演示程式碼:

package duoxiancheng;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; /** * 共享資源實體類 */ @Data @AllArgsConstructor @NoArgsConstructor class Res { private String userName; private String sex; } /** * 生成者執行緒 */ @EqualsAndHashCode(callSuper = true) @Data @NoArgsConstructor @AllArgsConstructor class
Producer extends Thread { private Res res; public void run() { int count = 0; while (true) { if (count == 0) { res.setUserName("小紅"); res.setSex("女"); } else { res.setUserName("小明"); res.setSex("男"); } // 每次迴圈count依次取:0 1 0 1 count = (count + 1) % 2; } } } /** * 消費者 */ @EqualsAndHashCode(callSuper = true) @Data @AllArgsConstructor @NoArgsConstructor class Consumer extends Thread { private Res res; @Override public void run() { while (true){ System.out.println(res.toString()); } } } public class ShengchanXiaofei { public static void main(String[] args) { // 共享資源res Res res = new Res(); Producer producer = new Producer(res); Consumer consumer = new Consumer(res); producer.start(); consumer.start(); } }

這段程式碼會產生執行緒安全問題,小紅性別出現了男,出現的這樣結果是因為消費者執行緒讀共享資源的時候,讀到一半剛好被暫停了,生產者執行緒又修改了資源,就導致了資料出錯的問題。
在這裡插入圖片描述

如何解決生成者和消費者問題?

生產者執行緒和消費者執行緒之間要進行同步!
是兩個執行緒都要進行同步,這樣才能保證兩個執行緒之間的資料的原子性問題。

如何新增同步?

用synchronized程式碼塊,並且兩個執行緒使用同一把鎖,故不能使用this鎖,因為這個兩個執行緒是不同類。 他們共享Res類資源,故可以用res物件作為鎖。

/**
 * 生成者執行緒
 */
@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
@AllArgsConstructor
class Producer extends Thread {
    private Res res;

    public void run() {
        int count = 0;
        while (true) {
            synchronized (res){
                if (count == 0) {
                    res.setUserName("小紅");
                    res.setSex("女");
                } else {
                    res.setUserName("小明");
                    res.setSex("男");
                }
                // 每次迴圈count依次取:0 1 0 1
                count = (count + 1) % 2;
            }

        }
    }
}

/**
 * 消費者
 */
@EqualsAndHashCode(callSuper = true)
@Data
@AllArgsConstructor
@NoArgsConstructor
class Consumer extends Thread {
    private Res res;

    @Override
    public void run() {
        while (true){
            synchronized (res){
                System.out.println(res.toString());
            }
        }
    }
}

但是這個程式碼還是有缺陷,和實際的生成應用還是有差別,消費者拿資料應該是一男一女交替,而不是一片連在一起。

wait和notify

對上面提到的問題的解決辦法?

寫完一個,讀一個,保證這樣的同步。生產者沒有任何生產,消費者不能讀。消費者沒有消費完生產者不能生產。某程度上等於沒有緩衝區的生產情況。

使用wait和notify方法可以解決。

wait和notify的作用?

wait讓當前執行緒從執行狀態變為休眠狀態,讓出cpu執行權,釋放當前鎖的資源,把資源給阻塞中的執行緒。

notify讓當前執行緒從休眠狀態變為執行狀態

wait和notify的使用條件?

  1. 要在同步中才能使用wait和notify,並且是鎖物件(共享資源)呼叫wait和notify。
  2. 使用同一把鎖,即都在synchronized程式碼塊中,使用鎖物件相同。
  3. wait和notify是成對出現的

使用程式碼例子

package duoxiancheng;

import lombok.*;

/**
 * 共享資源實體類
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
class Res {

    private String userName;
    private String sex;
    // false時生產者執行,true時消費者執行
    private Boolean flag = false;
}


/**
 * 生成者執行緒
 */
@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
@AllArgsConstructor
class Producer extends Thread {
    private Res res;

    @SneakyThrows
    public void run() {
        int count = 0;
        while (true) {
            synchronized (res) {
                if (res.getFlag()) {
                    // flag為true時變成休眠狀態,並且釋放鎖的資源
                    res.wait();
                }
                if (count == 0) {
                    res.setUserName("小紅");
                    res.setSex("女");
                } else {
                    res.setUserName("小明");
                    res.setSex("男");
                }
                // 每次迴圈count依次取:0 1 0 1
                count = (count + 1) % 2;
                // 寫操作完成,更改flag值,再進行生產者執行緒時將變為休眠狀態
                res.setFlag(true);
                // 喚醒當前被wait的執行緒
                res.notify();
            }

        }
    }
}

/**
 * 消費者
 */
@EqualsAndHashCode(callSuper = true)
@Data
@AllArgsConstructor
@NoArgsConstructor
class Consumer extends Thread {
    private Res res;

    @SneakyThrows
    @Override
    public void run() {
        while (true) {
            synchronized (res) {
                // flag為true時消費者執行,如果flag為false時執行緒應休眠,釋放物件鎖。
                if(!res.getFlag()){
                    res.wait();
                }
                System.out.println(res.toString());
                // 消費者執行完畢,更改flag值
                res.setFlag(false);
                // 喚醒當前被wait的執行緒
                res.notify();
            }
        }
    }
}

public class ShengchanXiaofei {
    public static void main(String[] args) {
        // 共享資源res
        Res res = new Res();
        Producer producer = new Producer(res);
        Consumer consumer = new Consumer(res);
        producer.start();
        consumer.start();
    }
}

兩點重要的點
一是wait讓執行緒讓出cpu執行權,釋放當前鎖
二是notify喚醒使用wait的執行緒,即通知正在等待控制權的執行緒可以繼續執行

notifyAll

喚醒所有等待

wait和sleep異同點在哪裡?

wait可以釋放鎖的資源
sleep不會釋放鎖的資源

都是做休眠

wait需要notify才能從休眠變為執行狀態
sleep只要時間到了就變為執行狀態

小試牛刀題

Object類中有哪些方法?
wait,notify,notifyAll,equals,hashCode,getClass,clone,toString,finalize

synchronized程式碼塊什麼時候開始上鎖?什麼時候開始釋放鎖?
程式碼開始和程式碼結束

synchronized有什麼缺點?
效率低,擴充套件性不高,不能自定義

你瞭解哪些java併發包?
Atomic中的原子類如AtomicInteger,AtomicIntegerArray,AtomicReference等,Lock鎖

多執行緒併發和網頁併發的區別?
多執行緒併發是多個Thread執行緒同時操作同一個資源,網頁併發是多個請求同時訪問一臺伺服器。

JDK1.5-Lock

什麼是Lock介面?

用來實現鎖的功能,Lock介面提供了與synchronized關鍵字類似的同步功能,但需要在使用時手動獲取鎖和釋放鎖。

Lock鎖寫法

lock鎖一般定義在共享資源中,多個執行緒使用同一個lock物件

Lock lock = new ReentrantLock();
lock.lock();
try{
    
}
finally {
	lock.unlock();
}

演示程式碼

wait和notify不能在這裡使用,只能在synchronized程式碼塊中使用!

package duoxiancheng;

import com.sun.org.apache.xml.internal.security.signature.reference.ReferenceNodeSetData;
import lombok.*;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 共享資源實體類
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
class Res {

    private String userName;
    private String sex;
    // false時生產者執行,true時消費者執行
    private Boolean flag = false;
    private Lock lock = new ReentrantLock();
}


/**
 * 生成者執行緒
 */
@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
@AllArgsConstructor
class Producer2 extends Thread {
    private Res res;

    @SneakyThrows
    public void run() {
        int count = 0;
        while (true) {

            try {
                // 開始上鎖
                res.getLock().lock();
                if (count == 0) {
                    res.setUserName("小紅");
                    res.setSex("女");
                } else {
                    res.setUserName("小明");
                    res.setSex("男");
                }
                // 每次迴圈count依次取:0 1 0 1
                count = (count + 1) % 2;
                res.setFlag(true);
            } catch (Exception e) {
                e.printStackTrace();

            } finally {
                // 釋放鎖
                res.getLock().unlock();
            }

        }

    }
}


/**
 * 消費者
 */
@EqualsAndHashCode(callSuper = true)
@Data
@AllArgsConstructor
@NoArgsConstructor
class Consumer2 extends Thread {
    private Res res;

    @SneakyThrows
    @Override
    public void run() {
        while (true) {
            try {
                res.getLock().lock();
                System.out.println(res.toString());
                res.setFlag(false);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                // 釋放鎖
                res.getLock().unlock();
            }

        }
    }
}

public class LockDemo {
    public static void main(String[] args) {

        // 共享資源res
        Res res = new Res();
        Producer2 producer = new Producer2(res);
        Consumer2 consumer = new Consumer2(res);
        producer.start();
        consumer.start();
    }
}

什麼是ReentrantLock鎖?

日後其他部落格裡再寫

Condition類解決wait和notify無法使用問題

Condition一般和Lock鎖一起使用。

Condition的功能類似於傳統技術中的Object.wait和notify。

線上程中例項化Condition

Condition condition = res.getLock().newCondition();

使用await和signal方法等待和喚醒

示例程式碼:

package duoxiancheng;

import lombok.*;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

@Data
@AllArgsConstructor
@NoArgsConstructor
class Res {
    public String userName;
    public String sex;
    public Boolean flag = false;
    Lock lock = new ReentrantLock();
}

class InputThread extends Thread {
    private Res res;
    Condition newCondition;

    public InputThread(Res res, Condition newCondition) {
        this.res = res;
        this.newCondition = newCondition;
    }

    @Override
    public void run() {
        int count = 0;
        while (true) {
            try {
                res.lock.lock();
                if (res.flag) {
                    try {
                        newCondition.await();
                    } catch (Exception e) {
                        // TODO: handle exception
                    }
                }
                if (count == 0) {
                    res.userName = "小明";
                    res.sex = "男";
                } else {
                    res.userName = "小紅";
                    res.sex = "女";
                }
                count = (count + 1) % 2;
                res.flag = true;
                newCondition.signal();
            } catch (Exception e) {
                // TODO: handle exception
            } finally {
                res.lock.unlock();
            }
        }

    }
}

class OutThrad extends Thread {
    private Res res;
    private Condition newCondition;

    public OutThrad(Res res, Condition newCondition) {
        this.res = res;
        this.newCondition = newCondition;
    }

    @Override
    public void run() {
        while (true) {
            try {
                res.lock.lock();
                if (!res.flag) {
                    try {
                        newCondition.await();
                    } catch (Exception e) {
                        // TODO: handle exception
                    }
                }
                System.out.println(res.userName + "," + res.sex);
                res.flag = false;
                newCondition.signal();
            } catch (Exception e) {
                // TODO: handle exception
            } finally {
                res.lock.unlock();
            }

        }

    }
}

public class LockDemo {

    public static void main(String[] args) {
        Res res = new Res();
        Condition newCondition = res.lock.newCondition();
        InputThread inputThread = new InputThread(res, newCondition);
        OutThrad outThrad = new OutThrad(res, newCondition);
        inputThread.start();
        outThrad.start();
    }

}

小試牛刀題

說說lock鎖和synchronized(同步)的區別?

  1. Lock介面可以嘗試非阻塞地獲取鎖,當前執行緒嘗試獲取鎖,如果這一時刻如果沒有被其他執行緒獲取到,則成功獲取並持有鎖。
  2. Lock鎖能被中斷地獲取鎖,synchronized是程式碼塊執行完畢或者報異常才釋放鎖
  3. synchronized自動上鎖和釋放鎖,而lock鎖需要自己上鎖和釋放鎖,lock鎖靈活性更強。
  4. lock鎖運用比synchronized要多

如何停止執行緒

用stop方法為什麼不好?

程式執行到一半的時候,直接把程式stop,程式中斷,後面一半程式沒有執行完,先前執行的程式碼也不會回滾,即程式不會恢復,不安全。

說說設計停止執行緒的思路?

根據stop的缺點進行優化,程式碼執行完畢再停止。無非讓迴圈停止。

程式碼設計:

package ch3;


import lombok.SneakyThrows;

class MyThread extends Thread {
    private volatile Boolean flag = true;

    @Override
    public void run() {
        System.out.println("start loop");
        while (flag) {
        }
        System.out.println("stop loop");
    }

    public void stopThread() {
        this.flag = false;
    }
}

public class StopDemo {
    @SneakyThrows
    public static void main(String[] args) {
        MyThread myThread = new MyThread();
        myThread.start();

        for (int i = 0; i < 2; i++) {
            System.out.println("main thread, waiting");
            Thread.sleep(1000);
        }
        myThread.stopThread();
        for (int i = 0; i < 5; i++) {
            System.out.println("main thread, waiting");
            Thread.sleep(1000);

        }

    }
}

但是這樣的設計還有什麼問題呢?如果while迴圈中存在wait等待,則不會到while判斷為false退出迴圈,直到執行緒被notify喚醒,然而這樣不是我們希望的,因此正確的用法是使用interrupt方法,interrupt方法會丟擲異常,在異常處理中處理停止執行緒。

package ch3;


import lombok.SneakyThrows;

class MyThread extends Thread {
    private volatile Boolean flag = true;

    @Override
    public synchronized void run() {
        try{
            System.out.println("start loop");
            while (flag) {
                wait();
            }
        }catch (Exception e){
            stopThread();
            System.out.println("stop loop");
        }

    }

    public void stopThread() {
        this.flag = false;
    }
}

public class StopDemo {
    @SneakyThrows
    public static void main(String[] args) {
        MyThread myThread = new MyThread();
        myThread.start();

        for (int i = 0; i < 2; i++) {
            System.out.println("main thread, waiting");
            Thread.sleep(1000);
        }
        myThread.interrupt();
        for (int i = 0; i < 5; i++) {
            System.out.println("main thread, waiting");
            Thread.sleep(1000);

        }

    }
}

ThreadLocal類

談談什麼是ThreadLocal?

是本地執行緒, 為每一個執行緒提供一個區域性變數。

執行緒t1、t2共享主記憶體中的資料,使用ThreadLoacl後,在本地執行緒提供區域性變數,將共享的資料變成本地的變數。

ThreadLocal例子

package ch3;

import lombok.AllArgsConstructor;

class ResNumber {
    public int count = 0;
    public static ThreadLocal<Integer> threadLocal = new ThreadLocal<Integer>() {
        @Override
        protected Integer initialValue() {
            // threadLocal起始值賦值為0
            return 0;
        }
    };

    public String getNumber() {
        count = threadLocal.get() + 1;
        threadLocal.set(count);
        return String.valueOf(count);
    }
}

@AllArgsConstructor
class LocalThread extends Thread {
    public ResNumber resNumber;

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            System.out.println(getName() + ":" + resNumber.getNumber());
        }
    }
}

public class ThreadLoca {

    public static void main(String[] args) {
        ResNumber resNumber = new ResNumber();
        LocalThread localThread1 = new LocalThread(resNumber);
        LocalThread localThread2 = new LocalThread(resNumber);
        LocalThread localThread3 = new LocalThread(resNumber);
        localThread1.start();
        localThread2.start();
        localThread3.start();

    }
}

ThreadLocal的原理?

Map<Thread,Object>集合,Thread為當前執行緒,Object為要local的資料

底層程式碼

set方法:

    public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
    }

get方法:

    public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        return setInitialValue();
    }