1. 程式人生 > 實用技巧 >多執行緒與併發(二)——執行緒同步、執行緒協作

多執行緒與併發(二)——執行緒同步、執行緒協作

iwehdio的部落格園:https://www.cnblogs.com/iwehdio/

1、執行緒同步

  • 併發:同一個物件被多個執行緒同時操作。

  • 執行緒同步:處理併發問題。形成條件是佇列+鎖。

  • 執行緒同步其實就是一種等待機制,多個需要同時訪問此物件的執行緒進入這個物件的等待池形成佇列,等待前面執行緒使用完畢,下一個執行緒再使用。

  • 為了保證資料在方法中被訪問時的正確性,在訪問時加入鎖機制synchronized。當一個執行緒獲得物件的排它鎖,獨佔資源,其他執行緒必須等待,使用後釋放鎖即可。可能存在問題:

    • 一個執行緒持有鎖會導致其他所有需要此鎖的執行緒掛起。
    • 在多執行緒競爭下,加鎖,釋放鎖會導致比較多的上下文切換和排程延時,引起效能問題。
    • 如果一個優先順序高的執行緒等待一個優先順序低的執行緒釋放鎖,會導致優先順序倒置,引起效能問題。
  • 不安全案例:

    • 之前的搶票的例子。

    • 取款的例子:

      public class TestSync {
          public static void main(String[] args) {
              Account account = new Account("基金", 100);
              new Bank(account,50, "A").start();
              new Bank(account,100, "B").start();
          }
      }
      
      class Account {
          public String name;
          public int money;
      
          public Account(String name, int money) {
              this.name = name;
              this.money = money;
          }
      }
      
      class Bank extends Thread {
          public Account account;
          public int drawingMoney;
          public int nowMoney;
      
          public Bank(Account account, int drawingMoney, String name) {
              super(name);
              this.drawingMoney = drawingMoney;
              this.account = account;
          }
      
          @Override
          public void run() {
              if(account.money-drawingMoney<0){
                  System.out.println("餘額不足");
                  return;
              }
              try {
                  Thread.sleep(200);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              account.money -= drawingMoney;
              nowMoney = drawingMoney;
              System.out.println(account.name+"餘額為"+account.money);
              System.out.println(this.getName()+"取出"+nowMoney);
          }
      }
      
  • 同步方法:

    • 通過private關鍵字保證了資料物件只能被方法訪問,所以只需要對方法使用synchronized關鍵字。包括synchronized方法和synchronized塊。
    • 同步方法格式:public synchronized void method(int args){}
    • synchronized方法控制對物件的訪問,每個物件對應一把鎖,每個synchronized都必須活動呼叫該方法的物件的鎖才能執行,否則執行緒會阻塞。
    • 方法一旦執行,就獨佔該鎖,直到該方法返回才釋放鎖。後邊被阻塞的執行緒才能獲得這個鎖,繼續執行。
    • 若將一個大的方法宣告為synchronized將會影響效率。
    • 方法裡面需要修改的內容才需要鎖,只讀的內容不需要鎖。
    • 鎖中可以是物件也可以class位元組碼:
      • 對於普通同步方法,鎖是當前例項物件。 如果有多個例項 那麼鎖物件必然不同無法實現同步。
      • 對於靜態同步方法,鎖是當前類的class物件。有多個例項 但是鎖物件是相同的 可以完成同步。
      • 對於同步方法塊,鎖是Synchonized括號裡配置的物件。物件最好是隻有一個的 如當前類的 class 。
  • 同步塊:

    • 同步塊格式:synchronized(obj){}
    • obj稱之為同步監視器,可以是任何物件,但是推薦使用共享資源(所要進行增刪改的物件)作為同步監視器。
    • 同步方法的同步監視器就是物件本身即this。
    • 同步監視器的執行過程:
      • 第一個執行緒訪問,鎖定同步監視器,執行其中的程式碼。
      • 第二個執行緒訪問,發現同步監視器被鎖定,無法訪問。
      • 第一個執行緒訪問翻倍,解鎖同步監視器。
      • 第二個執行緒訪問,泛型同步監視器沒有鎖,然後鎖定並訪問。
  • 不安全案例的同步改造:

    • 搶票的例子:

      @Override
      public synchronized void run() {
          while (true) {
              if(ticketNum<=0) {
                  break;
              }
              try {
                  Thread.sleep(100);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              System.out.println(Thread.currentThread().getName()+"拿到了第"+ticketNum--+"張票");
          }
      }
      
    • 取款的例子:

      @Override
      public void run() {
          //鎖的是要進行修改的物件account
          synchronized (account){
              if(account.money-drawingMoney<0){
                  System.out.println("餘額不足");
                  return;
              }
              try {
                  Thread.sleep(200);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              account.money -= drawingMoney;
              nowMoney = drawingMoney;
              System.out.println(account.name+"餘額為"+account.money);
              System.out.println(this.getName()+"取出"+nowMoney);
          }
      }
      
  • Java中的JUC併發包java.util.concurrent:

    • JUC安全型別的集合:CopyOnWriteArrayList。一個執行緒安全的列表。
  • 死鎖:

    • 多個執行緒各自佔用一些共享資源,並且相互等待其他執行緒佔有的資源才能執行,而導致兩個或多個執行緒都在等待物件釋放資源,都停止執行的情形。

    • 某一個程式碼塊同時擁有兩個以上物件的鎖時,就可能會發生死鎖的問題。

    • 產生死鎖的四個必要條件:

      • 互斥條件:一個資源每次只能被一個程序使用。
      • 請求與保持條件:一個程序因請求資源而阻塞時,對已獲得的資源保持不放。
      • 不剝奪條件:程序已獲得的資源,在未使用完之前,不能強行剝奪。
      • 迴圈等待條件:若干程序之間形成一種頭尾相接的迴圈等待資源關係。
    • 產生死鎖的例子:

      public class DeadLock {
          public static void main(String[] args) {
              new dis(0).start();
              new dis(1).start();
          }
      }
      
      class Car{
      }
      
      class Key{
      }
      
      class dis extends Thread{
          //資源只有一份,靜態資源
          private static Car car = new Car();
          private static Key key = new Key();
          private int choice;
      
          public dis(int choice) {
              this.choice = choice;
          }
      
          @Override
          public void run() {
              makeup();
          }
      
          public void makeup(){
              if(choice==0) {
                  synchronized (car){
                      System.out.println(choice+" 鎖住了car");
                      synchronized (key) {
                          System.out.println(choice+" 鎖住了key");
                      }
                  }
              } else {
                  synchronized (key){
                      System.out.println(choice+" 鎖住了key");
                      synchronized (car) {
                          System.out.println(choice+" 鎖住了car");
                      }
                  }
              }
          }
      
      }
      
  • Lock鎖:

    • 從JDK5開始提供的更強大的執行緒同步機制,通過顯式定義同步鎖物件來實現同步。同步鎖使用Lock物件充當。

    • java.util.concurrent.locks.Lock介面是控制多個執行緒對共享資源進行訪問的工具。

    • 鎖提供了對共享資源的獨佔訪問,每次只能有一個執行緒對Lock物件加鎖,執行緒開始訪問共享資源之前應先獲得Lock物件。

    • 可重入鎖ReentrantLock類實現了Lock,擁有與synchronized相同的併發性和記憶體語義,可以顯式加鎖、釋放鎖。

    • Lock是顯式鎖,手動開啟和關閉鎖。synchronized是隱式鎖,出了作用域自動釋放。

    • 使用Lock鎖,JVM將花費較少的時間排程執行緒,效能更好。並且有更好的擴充套件性。

    • 一般與try/finally一起使用。

    • 使用Lock鎖的例子:

      public class TestLock {
          public static void main(String[] args) {
              Lock lock = new Lock();
              new Thread(lock,"A ").start();
              new Thread(lock,"B ").start();
          }
      }
      
      class Lock implements Runnable {
      
          private int ticketNum = 200;
          private final ReentrantLock lock = new ReentrantLock();
      
          @Override
          public void run() {
              while (true) {
                  lock.lock();
                  try{
                      if (ticketNum > 0) {
                          try {
                              Thread.sleep(20);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          System.out.println(Thread.currentThread().getName() + ticketNum--);
                      } else {
                          break;
                      }
                  } finally {
                      lock.unlock();
                  }
              }
          }
      }
      

2、執行緒協作

  • 生產者消費者問題:

    • 假設倉庫中只能存放一件產品,生產者將生產出來的產品放入倉庫,消費者將倉庫中產品取走消費。

    • 如果倉庫中沒有產品,則生產者將產品放入倉庫,否則停止生產並等待,直到產品被消費者取走。

    • 如果倉庫中放有產品,則消費者可以講產品取走消費,否則停止消費並等待,直到倉庫中再次放入產品。

    • 生產者和消費者共享同一個資源,並且生產者和消費者之間相互依賴、互為條件。

    • 需要實現不同執行緒之間的通訊問題。

    • Java提供了幾個方法解決執行緒之間的通訊問題:

  • 管程法:

    • 生產者將生產者將生產好的資料放入緩衝區,消費者從緩衝區拿出資料。
    public class TestPC {
        public static void main(String[] args) {
            //建立緩衝區
            Container container = new Container();
            new Producer(container).start();
            new Consumer(container).start();
        }
    }
    
    class Chicken {
        public int id;
    
        public Chicken(int id) {
            this.id = id;
        }
    }
    
    class Producer extends Thread {
        private Container container;
    
        public Producer(Container container) {
            this.container = container;
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                container.push(new Chicken(i), i);
            }
        }
    }
    
    class Consumer extends Thread {
        private Container container;
    
        public Consumer(Container container) {
            this.container = container;
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                container.pop();
            }
        }
    }
    
    class Container {
        //大小為10的緩衝區
        private Chicken[] chickens = new Chicken[10];
        public int count = 0;     //緩衝區中放入的產品個數
        //生產方法
        public synchronized void push(Chicken chicken, int i) {
            //使用while適用於多個生產者和消費者
            while (chickens.length == count) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            chickens[count] = chicken;
            count++;
            //在生產者或消費者的run方法中獲取count會有執行緒不安全問題
            System.out.println("生產 " + i + " " + count);
    
            this.notifyAll();
        }
    
        public synchronized void pop() {
            while (count == 0) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            count--;
            Chicken chicken = chickens[count];
            System.out.println("消費 " + chicken.id + " " + count);
    
            this.notifyAll();
    
        }
    }
    
  • 訊號燈法:

    • 使用一個標誌位,為true則等待,為false則喚醒另外一個執行緒。
    public class TestPC2 {
        public static void main(String[] args) {
            TV tv = new TV();
            new Producer2(tv).start();
            new Consumer2(tv).start();
        }
    }
    
    class Producer2 extends Thread {
        TV tv;
    
        public Producer2(TV tv) {
            this.tv = tv;
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 20; i++) {
                if(i%2==0) {
                    tv.play("節目1");
                } else {
                    tv.play("節目2");
                }
            }
        }
    }
    
    class Consumer2 extends Thread {
        TV tv;
    
        public Consumer2(TV tv) {
            this.tv = tv;
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 20; i++) {
                tv.watch();
            }
        }
    }
    
    class TV {
        String program;
        boolean flag = true;    //T生產,F消費
    
        public synchronized void play(String program){
            if(!flag){
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            System.out.println("生產了"+program);
            this.notifyAll();
            this.program = program;
            this.flag = !this.flag;
        }
    
        public synchronized void watch(){
            if(flag){
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            System.out.println("消費了"+this.program);
            this.notifyAll();
            this.flag = !this.flag;
        }
    }
    
  • 執行緒池:

    • 經常建立和銷燬、使用量特別大的資源,比如併發情況下的執行緒,對效能影響很大。

    • 可以提前建立好多個執行緒,放入執行緒池中,使用時直接獲取,使用完放回池中。

    • 好處:

      • 提高了響應速度,減少了建立新執行緒的時間。
      • 降低了資源消耗。
      • 便於執行緒管理,包括核心池大小、最大執行緒數、沒有任務時多長時間終止等。
    • 執行緒池相關API:

      • ExecutorService:真正的執行緒池介面,常見子類ThreadPoolExecutor
        • void execute(Runnable command):執行任務。
        • void shutdown():關閉執行緒池。
      • Executors:工具類、執行緒池的工廠類,用於建立並返回不同型別的執行緒池。
      public class TestPool {
          public static void main(String[] args) {
              //引數為執行緒池大小
              ExecutorService executorService = Executors.newFixedThreadPool(3);
      
              executorService.execute(new MyThread());
              executorService.execute(new MyThread());
              executorService.execute(new MyThread());
      
              executorService.shutdown();
          }
      }
      
      class MyThread implements Runnable {
      
          @Override
          public void run() {
              System.out.println(Thread.currentThread().getName());
          }
      }
      

iwehdio的部落格園:https://www.cnblogs.com/iwehdio/