1. 程式人生 > >java多線程系列(三)

java多線程系列(三)

htm cep 開始 管道 線程間通信 base 並發編程 nds 打印

等待通知機制

前言:本系列將從零開始講解java多線程相關的技術,內容參考於《java多線程核心技術》與《java並發編程實戰》等相關資料,希望站在巨人的肩膀上,再通過我的理解能讓知識更加簡單易懂。

目錄

  • 認識cpu、核心與線程
  • java多線程系列(一)之java多線程技能
  • java多線程系列(二)之對象變量的並發訪問
  • java多線程系列(三)之等待通知機制
  • java多線程系列(四)之ReentrantLock的使用

非等待通知

public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                list.add();
                System.out.println("添加了" + (i + 1) + "個元素");
                Thread.sleep(1000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
public void run() {
        try {
            while (true) {
                if (list.size() == 5) {
                    System.out.println("==5了,線程b要退出了!");
                    throw new InterruptedException();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
  • 兩個線程實現了通信,但list大小為5的時候,線程B退出了,但是線程B不停地輪詢是否為5,這個時候是很占資源的
  • 如果輪詢的時間間隔小,這個時候更加浪費資源
  • 如果輪詢的時間間隔大,那麽還可能錯過了想要的數據,比如可能錯過了5
  • 這裏共享了list,所以實現了通信,但是因為不知道什麽時候通信,所以不停地輪詢,這種通信有缺點,一是浪費cpu資源,二是可能讀取到錯誤的數據

什麽是等待通知機制

  • 線程A要等待線程B發出通知才執行,這個時候線程A可以執行wait方法,等待線程B執行notify方法喚醒線程A

等待通知機制實現

public void run() {
        try {
            synchronized (lock) {
                if (MyList.size() != 5) {
                    System.out.println("wait begin "
                            + System.currentTimeMillis());
                    lock.wait();
                    System.out.println("wait end  "
                            + System.currentTimeMillis());
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    
public void run() {
        try {
            synchronized (lock) {
                for (int i = 0; i < 10; i++) {
                    MyList.add();
                    if (MyList.size() == 5) {
                        lock.notify();
                        System.out.println("已發出通知!");
                    }
                    System.out.println("添加了" + (i + 1) + "個元素!");
                    Thread.sleep(1000);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
  • 將上面的代碼進行更改,當大小不等於5的時候,線程A處於wait狀態,直到線程B發出通知,喚醒線程A,通過等待通知機制,避免了線程A不停輪詢造成的資源浪費

消息通知機制註意點

  • wait和notify必須是在同步方法和同步代碼塊裏面調用,要不然會拋出異常
  • notify方法是繼承自Object類,可以喚醒在此對象監視器等待的線程,也就是說喚醒的是同一個鎖的線程
  • notify方法調用之後,不會馬上釋放鎖,而是運行完該同步方法或者是運行完該同步代碼塊的代碼
  • 調用notify後隨機喚醒的是一個線程
  • 調用wait方法後會將鎖釋放
  • wait狀態下中斷線程會拋出異常
  • wait(long),超過設置的時間後會自動喚醒,還沒超過該時間也可以通過其他線程喚醒
  • notifyAll可以喚醒同一鎖的所有線程
  • 如果線程還沒有處於等待狀態,其他線程進行喚醒,那麽不會起作用,此時會打亂程序的正常邏輯

案例:生產者消費者模式

一個生產者,一個消費者

public void setValue() {
        try {
            synchronized (lock) {
                if (!ValueObject.value.equals("")) {
                    lock.wait();
                }
                String value = System.currentTimeMillis() + "_"
                        + System.nanoTime();
                System.out.println("set"+ value);
                ValueObject.value = value;
                lock.notify();
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
public void getValue() {
        try {
            synchronized (lock) {
                if (ValueObject.value.equals("")) {
                    lock.wait();
                }
                System.out.println("get"+ ValueObject.value);
                ValueObject.value = "";
                lock.notify();
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
public void run() {
        while (true) {
            r.getValue();
        }
    }
public void run() {
        while (true) {
            p.setValue();
        }
    }
  • 如果我們創建一個生產線程,一個消費線程,那麽這個時候會交替運行

多個生產者,多個消費者

public void getValue() {
        try {
            synchronized (lock) {
                while (ValueObject.value.equals("")) {
                    System.out.println("消費者 "
                            + Thread.currentThread().getName() + " WAITING了☆");
                    lock.wait();
                }
                System.out.println("消費者 " + Thread.currentThread().getName()
                        + " RUNNABLE了");
                ValueObject.value = "";
                lock.notify();
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
public void run() {
        while (true) {
            r.getValue();
        }
    }
public void setValue() {
        try {
            synchronized (lock) {
                while (!ValueObject.value.equals("")) {
                    System.out.println("生產者 "
                            + Thread.currentThread().getName() + " WAITING了★");
                    lock.wait();
                }
                System.out.println("生產者 " + Thread.currentThread().getName()
                        + " RUNNABLE了");
                String value = System.currentTimeMillis() + "_"
                        + System.nanoTime();
                ValueObject.value = value;
                lock.notify();
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
public void run() {
        while (true) {
            p.setValue();
        }
    }
  • 如果這個時候創建多個生產者,多個消費者,如果連續喚醒的是同類線程,那麽會出現假死狀態,就是線程都處於waiting狀態,因為notify隨機喚醒一個線程,如果喚醒的同類的,那麽就浪費了一次喚醒,如果這個時候無法再喚醒異類線程,那麽就會假死。這種情況把notify改成notifyAll()就行了。

消息通知機制需要註意的地方

  • 是否線程喚醒的是同類線程會造成影響
  • 生產者消費模式,判斷條件if和while應該使用哪一個

通過管道進行線程間通信

public class ThreadWrite extends Thread {

    private WriteData write;
    private PipedOutputStream out;

    public ThreadWrite(WriteData write, PipedOutputStream out) {
        super();
        this.write = write;
        this.out = out;
    }

    @Override
    public void run() {
        write.writeMethod(out);
    }

}
public class ThreadRead extends Thread {

    private ReadData read;
    private PipedInputStream input;

    public ThreadRead(ReadData read, PipedInputStream input) {
        super();
        this.read = read;
        this.input = input;
    }

    @Override
    public void run() {
        read.readMethod(input);
    }
}
public class Run {

    public static void main(String[] args) {

        try {
            WriteData writeData = new WriteData();
            ReadData readData = new ReadData();

            PipedInputStream inputStream = new PipedInputStream();
            PipedOutputStream outputStream = new PipedOutputStream();

            // inputStream.connect(outputStream);
            outputStream.connect(inputStream);//關鍵

            ThreadRead threadRead = new ThreadRead(readData, inputStream);
            threadRead.start();

            Thread.sleep(2000);

            ThreadWrite threadWrite = new ThreadWrite(writeData, outputStream);
            threadWrite.start();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

}
  • PipedInputStream和PiepedOutputStream(對應字符流PipedReader和PipedOutputWriter)這幾個類可以實現線程間流的通信,將管道輸出流和輸出流連接,實現一個線程往管道發送數據,一個線程從管道讀取數據

join方法

public static void main(String[] args) {
        try {
            MyThread threadTest = new MyThread();
            threadTest.start();
            threadTest.join();

            System.out.println("threadTest對象執行完,我再執行");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
  • 當前線程阻塞(main線程),調用線程(threadTest)正常執行,執行完後當前線程(main)繼續執行
public class ThreadB extends Thread {

    @Override
    public void run() {
        try {
            ThreadA a = new ThreadA();
            a.start();
            a.join();

            System.out.println("線程B在run end處打印了");
        } catch (InterruptedException e) {
            System.out.println("線程B在catch處打印了");
            e.printStackTrace();
        }
    }

}
  • 如果線程B執行完了join方法,此時線程B被中斷,那麽這個時候拋出異常,但是線程A正常運行

join(long)和sleep(long)的區別

public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;

        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }

        if (millis == 0) {
            while (isAlive()) {
                wait(0);
            }
        } else {
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }
  • 從join方法的源代碼可以發現,他的核心方法是wait,在前面已經提到wait方法會釋放鎖,說明join方法也會釋放鎖,但是sleep是不會釋放鎖的。
  • join方法是非靜態的,而sleep是靜態的

ThreadLocal

  • 解決變量在各個線程的隔離性,每個線程綁定自己的值
public void run() {
        try {
            for (int i = 0; i < 100; i++) {
                if (Tools.tl.get() == null) {
                    Tools.tl.set("ThreadA" + (i + 1));
                } else {
                    System.out.println("ThreadA get Value=" + Tools.tl.get());
                }
                Thread.sleep(200);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
public void run() {
        try {
            for (int i = 0; i < 100; i++) {
                if (Tools.tl.get() == null) {
                    Tools.tl.set("ThreadB" + (i + 1));
                } else {
                    System.out.println("ThreadB get Value=" + Tools.tl.get());
                }
                Thread.sleep(200);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
public class Tools {

    public static ThreadLocal tl = new ThreadLocal();

}
  • 每個線程都設置了值,但是得到的值卻是自己的,互相隔離
  • 如果不開始不設置值,那麽得到的值都是null,可以通過繼承ThreadLocal,重載initalValue方法,設置初始值

    public class ThreadLocalExt extends ThreadLocal {
    @Override
    protected Object initialValue() {
        return new Date().getTime();
    }
    }
  • InheritableThreadLocal,子線程可以繼承父線程的值

    public class InheritableThreadLocalExt extends InheritableThreadLocal {
    @Override
    protected Object initialValue() {
        return new Date().getTime();
    }
    }
    public static void main(String[] args) {
        try {
            for (int i = 0; i < 10; i++) {
                System.out.println("       在Main線程中取值=" + Tools.tl.get());
                Thread.sleep(100);
            }
            Thread.sleep(5000);
            ThreadA a = new ThreadA();
            a.start();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    //main線程和A線程輸出的一樣
  • 在上面代碼的基礎上,重寫childValue方法可以設置子線程的值

我覺得分享是一種精神,分享是我的樂趣所在,不是說我覺得我講得一定是對的,我講得可能很多是不對的,但是我希望我講的東西是我人生的體驗和思考,是給很多人反思,也許給你一秒鐘、半秒鐘,哪怕說一句話有點道理,引發自己內心的感觸,這就是我最大的價值。(這是我喜歡的一句話,也是我寫博客的初衷)

java多線程系列(三)