1. 程式人生 > 程式設計 >執行緒活躍性問題及其解決方案

執行緒活躍性問題及其解決方案

1.什麼是死鎖

死鎖發生於併發中,當兩個或更多執行緒互相持有對方的資源,但又不主動釋放,令兩個執行緒都無法前進,從而陷入無盡的等待之中的情況就是死鎖。

例如上圖兩個執行緒互相持有對方需要的鎖,但是都不肯釋放持有的鎖,從而陷入死鎖。

在多個執行緒的情況下,存在環形的依賴關係,這樣就有可能發生死鎖,例如上圖,Thread1持有鎖A想獲取鎖B,Thread2持有鎖B想獲取鎖C,Thread3持有鎖C想獲取鎖A,但是每個執行緒都不肯讓出自己持有的鎖,這樣就發生了死鎖。

2.死鎖的影響

死鎖在不同系統中的影響是不同的,這取決於系統對死鎖的處理能力

資料庫中可以對事物進行檢測和放棄,如果發生搶佔的情況可以指定某個事務放棄,這樣可以解決死鎖。但是在JVM中不具備自動處理的能力

死鎖發生的概率比較低,但是產生的危害比較大,在多執行緒併發情況下,影響的使用者比較多。

死鎖會導致系統整體崩潰,子系統崩潰,效能降低。並且壓力測試無法發現所有潛在的死鎖

3.發生死鎖的例子

3.1 看程式停止的訊號

/**
 * 必定發生死鎖的情況
 */
public class MustDeadLock implements Runnable {

    int flag = 1;

    static Object o1 = new Object();
    static Object o2 = new Object();

    @Override
    public void run
() { System.out.println("flag = " + flag); if (flag == 1){ synchronized (o1){ try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("thread1持有o1"
); synchronized (o2){ System.out.println("thread1持有o2"); } } } if (flag == 0){ synchronized (o2){ try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("thread2持有o2"); synchronized (o1){ System.out.println("thread2持有o1"); } } } } public static void main(String[] args) { MustDeadLock r1 = new MustDeadLock(); MustDeadLock r2 = new MustDeadLock(); r1.flag = 1; r2.flag = 0; Thread thread1 = new Thread(r1); Thread thread2 = new Thread(r2); thread1.start(); thread2.start(); } } 複製程式碼

3.2 銀行轉賬發生死鎖

前提條件:需要把鎖(將轉賬和被轉賬的執行緒鎖住,保證中間不被幹擾),在成功獲取兩把鎖的情況下,且餘額大於0,則扣除轉賬人,增加收款人的餘額,是原子操作。 順序相反導致死鎖

/**
 * 轉賬的時候遇到了死鎖,一旦開啟註釋,便會發生死鎖
 */
public class TransferMoney implements Runnable {

    int flag = 1;
    //a和b沒人都有500
    static Account a = new Account(500);
    static Account b = new Account(500);

    public static void main(String[] args) throws InterruptedException {
        TransferMoney r1 = new TransferMoney();
        TransferMoney r2 = new TransferMoney();

        r1.flag = 1;
        r2.flag = 0;

        Thread t1 = new Thread(r1);
        Thread t2 = new Thread(r2);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println("a的餘額為:" + a.balance);
        System.out.println("b的餘額為:" + b.balance);
    }

    @Override
    public void run() {
        if (flag == 1){
            //a向b轉200
            transferMoney(a,b,200);
        }
        if (flag == 0){
            //b向a轉200

            transferMoney(b,a,200);
        }
    }

    public static void transferMoney(Account from,Account to,int amount){
        synchronized (from){
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            synchronized (to){
                if (from.balance - amount < 0){
                    System.out.println("轉賬失敗,餘額不足");
                }

                from.balance -= amount;
                to.balance += amount;
                System.out.println("成功轉賬:" + amount + "元");
            }
        }
    }

    //賬戶
    static class Account{
        int balance;

        public Account(int balance) {
            this.balance = balance;
        }
    }
}
複製程式碼

如果註釋掉sleep()就不會發生死鎖,但是添加了sleep()在這500ms中就會發生死鎖。

3.3 模擬多人轉賬

/**
 * 多人轉賬情況下發生死鎖
 */
public class MultiTransferMoney {

    //50個賬戶
    private static final int NUM_ACCOUNTS = 50;
    //每個賬戶有1000元
    private static final int NUM_MONEYS = 1000;
    //轉賬次數
    private static final int NUM_ITERATIONS = 1000000;
    //操作賬戶的人數
    private static final int NUM_THREADS = 20;

    public static void main(String[] args) {
        Random random = new Random();
        TransferMoney.Account[] accounts = new TransferMoney.Account[NUM_ACCOUNTS];
        for (int i = 0; i < accounts.length; i++) {
            accounts[i] = new TransferMoney.Account(NUM_MONEYS);
        }

        class TransferThread extends Thread {
            @Override
            public void run() {
                for (int i = 0; i < NUM_ITERATIONS; i++) {
                    int fromAccount = random.nextInt(NUM_ACCOUNTS);
                    int toAccount = random.nextInt(NUM_ACCOUNTS);
                    int amount = random.nextInt(NUM_MONEYS);

                    TransferMoney.transferMoney(accounts[fromAccount],accounts[toAccount],amount);
                }
            }
        }

        for (int i = 0; i < NUM_THREADS; i++) {
            new TransferThread().start();
        }
    }
}
複製程式碼

死鎖發生的概率隨著賬戶數量的減少而增加

4.發生死鎖的4個必要條件(缺一不可)

  • 互斥:當thread1拿到lockA後,其他執行緒就無法獲取到lockA
  • 請求與保持:當thread1拿到lockA後,還一定要獲取到lockB
  • 不可剝奪:在資料庫中可以避免發生死鎖是因為資料庫自身可以剝奪某個事務,這樣就會避免死鎖,但是在Java中不可剝奪。
  • 迴圈等待:在兩個執行緒中兩個執行緒相互等待,在多個執行緒中每個執行緒首尾相接形成環路,也就是發生迴圈等待。

5.如何定位死鎖

5.1 ThreadMXBean程式碼演示

/**
 * 用ThreadMXBean檢測死鎖
 */
public class ThreadMXBeanDetection implements Runnable {

    int flag = 1;
    static Object o1 = new Object();
    static Object o2 = new Object();

    @Override
    public void run() {
        if (flag == 1){
            synchronized (o1){
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (o2){
                    System.out.println("t1持有o2");
                }
            }
        }

        if (flag == 0){
            synchronized (o2){
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (o1){
                    System.out.println("t2持有o1");
                }
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ThreadMXBeanDetection d1 = new ThreadMXBeanDetection();
        ThreadMXBeanDetection d2 = new ThreadMXBeanDetection();

        d1.flag = 1;
        d2.flag = 0;

        Thread t1 = new Thread(d1);
        Thread t2 = new Thread(d2);
        t1.start();
        t2.start();

        Thread.sleep(1000);
        ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
        long[] deadlockedThreads = threadMXBean.findDeadlockedThreads();
        if (deadlockedThreads != null && deadlockedThreads.length > 0){ //發現了死鎖
            for (int i = 0; i < deadlockedThreads.length; i++) {
                //通過死鎖的執行緒ID獲取執行緒資訊
                ThreadInfo threadInfo = threadMXBean.getThreadInfo(deadlockedThreads[i]);
                System.out.println("發現死鎖" + threadInfo.getThreadName());
            }
        }
    }
}
複製程式碼

發現死鎖後可以通過報警或者日誌的方式對死鎖進行修復。

6.如何修復死鎖

6.1 線上發生死鎖怎麼辦

對於線上問題一定要防患於未然,因為線上上想要沒有任何損失的修復死鎖幾乎是不可能的了。所以需要

  • 先將“案發現場”儲存下來然後立刻重啟伺服器。
  • 暫時保證線上服務的安全,然後利用剛才留下的資訊立刻定位死鎖,進行修復,然後重新發版。

6.2 修復死鎖的策略

  • 避免策略:哲學家就餐的換手方案、轉賬換序方案 思路:避免相反的獲取鎖的順序
  • 檢測與恢復策略:一段時間內檢查是否發生死鎖,如果發生死鎖,對資源進行剝奪,從而修復死鎖。
  • 鴕鳥策略:鴕鳥這種動物在遇到危險時會把頭埋在地上,這樣就看不到危險了。這也就是說在發生可能性低的時候可以暫時忽略掉死鎖,等到發生後進行人工修復。

6.2.1 避免策略的使用

/**
 * 轉賬的時候遇到了死鎖,一旦開啟註釋,便會發生死鎖
 */
public class TransferMoney implements Runnable {

    int flag = 1;
    //a和b沒人都有500
    static Account a = new Account(500);
    static Account b = new Account(500);
    static Object lock = new Object();

    public static void main(String[] args) throws InterruptedException {
        TransferMoney r1 = new TransferMoney();
        TransferMoney r2 = new TransferMoney();
        
        r1.flag = 1;
        r2.flag = 0;
        
        Thread t1 = new Thread(r1);
        Thread t2 = new Thread(r2);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println("a的餘額為:" + a.balance);
        System.out.println("b的餘額為:" + b.balance);
    }

    @Override
    public void run() {
        if (flag == 1){
            //a向b轉200
            transferMoney(a,200);
        }
        if (flag == 0){
            //b向a轉200
            transferMoney(b,int amount){

        class Helper{
            public void transfer(){
                if (from.balance - amount < 0){
                    System.out.println("轉賬失敗,餘額不足");
                }

                from.balance -= amount;
                to.balance += amount;
                System.out.println("成功轉賬:" + amount + "元");
            }
        }
        //獲取轉入和轉出的hash值
        int fromHash = System.identityHashCode(from);
        int toHash = System.identityHashCode(to);
        if (fromHash < toHash){ //通過hash值保證了獲取鎖的順序
            synchronized (from) {
                synchronized (to) {
                    new Helper().transfer();
                }
            }
        } else if (fromHash > toHash){
            synchronized (to) {
                synchronized (from) {
                    new Helper().transfer();
                }
            }
        } else {    //fromHash == toHash
            synchronized (lock){    //誰先拿到lock誰就先執行
                synchronized (to) {
                    synchronized (from) {
                        new Helper().transfer();
                    }
                }
            }
        }
    }

    //賬戶
    static class Account{
        int balance;

        public Account(int balance) {
            this.balance = balance;
        }
    }
}
複製程式碼

通過修改transfer方法計算轉出和轉入的hash值,通過hash值作比較來設定鎖的獲取順序,這樣可以避免死鎖的發生。

7.哲學家就餐問題

7.1 什麼是哲學家就餐問題

每個哲學家吃飯需要先拿起左手(或右手的叉子)再拿起右手(或左手)的叉子才可以吃飯,等待自己用完了放回原處,叉子再供另外的人使用(暫時不考慮衛生問題:))。

死鎖:如果每個人都同時拿起了左邊的叉子,這樣就無法拿到右手邊的叉子,這樣就造成了等待的問題

/**
 * 描述:     演示哲學家就餐問題導致的死鎖
 */
public class DiningPhilosophers {

    public static class Philosophers implements Runnable {

        private Object leftChopstick;
        private Object rightChopstick;

        public Philosophers(Object leftChopstick,Object rightChopstick) {
            this.leftChopstick = leftChopstick;
            this.rightChopstick = rightChopstick;
        }

        @Override
        public void run() {
            try {
                while (true) {

                    doAction("Thinking");

                    synchronized (leftChopstick) {
                        doAction("Picked up left chopstick");
                        synchronized (rightChopstick) {
                            doAction("Picked up right chopstick -eating");

                            doAction("Put down right chopstick");
                        }
                        doAction("Put down left chopstick");
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        private void doAction(String action) throws InterruptedException {
            System.out.println(Thread.currentThread().getName() + " " + action);
            Thread.sleep((long) (Math.random() * 10));
        }
    }

    public static void main(String[] args) {
        //設定哲學家的人數
        Philosophers[] philosophers = new Philosophers[5];
        //設定筷子的數量,數量與哲學家人數相同
        Object[] chopsticks = new Object[philosophers.length];
        for (int i = 0; i < chopsticks.length; i++) {
            chopsticks[i] = new Object();
        }
        for (int i = 0; i < philosophers.length; i++) {
            Object leftChopstick = chopsticks[i];
            Object rightChopstick = chopsticks[(i+1)%chopsticks.length];
            philosophers[i] = new Philosophers(leftChopstick,rightChopstick);
            new Thread(philosophers[i],"哲學家"+(i+1)+"號").start();
        }
    }
}
複製程式碼

導致死鎖的一大特徵就是:每個哲學家的左手都拿著筷子,右手無法獲取筷子

7.2 解決哲學家死鎖問題的4種辦法

  • 服務員檢查(避免策略)
  • 改變哲學家拿叉子的順序(避免策略)
  • 餐票(將餐票數量設定為人數-1)(避免策略)
  • 領導調節(檢測與恢復策略)

7.2.1 實現換手策略

public static void main(String[] args) {
        //設定哲學家的人數
        Philosophers[] philosophers = new Philosophers[5];
        //設定筷子的數量,數量與哲學家人數相同
        Object[] chopsticks = new Object[philosophers.length];
        for (int i = 0; i < chopsticks.length; i++) {
            chopsticks[i] = new Object();
        }
        for (int i = 0; i < philosophers.length; i++) {
            Object leftChopstick = chopsticks[i];
            Object rightChopstick = chopsticks[(i + 1) % chopsticks.length];

            if (i == philosophers.length - 1) { //讓這個哲學家換手,避免形成環路
                philosophers[i] = new Philosophers(rightChopstick,leftChopstick);
            }else{
                philosophers[i] = new Philosophers(leftChopstick,rightChopstick);
            }

            new Thread(philosophers[i],"哲學家" + (i + 1) + "號").start();
        }
    }
複製程式碼

7.2.2 死鎖檢測與恢復策略

檢測演演算法:鎖的呼叫鏈路圖

  • 允許發生死鎖
  • 每次呼叫鎖都記錄
  • 定期檢查鎖的呼叫鏈路圖是否形成環路
  • 一旦發生死鎖,就用死鎖恢復機制進行恢復

恢復方法1:程式終止

逐個終止執行緒,直到死鎖消除

終止順序:

  • 1.優先順序(是前臺互動還是後臺處理)
  • 2.已佔用資源,還需要的資源
  • 3.已經執行的時間

恢復方法2:資源搶佔

把每個分發出去的鎖收回來

讓執行緒回退幾步,這樣就不用結束整個執行緒,成本比較低,但是這樣可能會造成資源一直被搶佔,造成飢餓

8.避免死鎖的有效手段

8.1 設定超時時間(退一步海闊天空)

Lock的tryLock(long timeout,TimeUnit unit)

synchronized不具備嘗試鎖的能力

/**
 * 描述:     用tryLock來避免死鎖
 */
public class TryLockDeadLock implements Runnable {

    int flag;
    static Lock lock1 = new ReentrantLock();
    static Lock lock2 = new ReentrantLock();

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            if (flag == 1) {
                try {
                    if (lock1.tryLock(800,TimeUnit.MILLISECONDS)) {
                        System.out.println("執行緒1獲取到了鎖1");
                        Thread.sleep(new Random().nextInt(1000));
                        if (lock2.tryLock(800,TimeUnit.MILLISECONDS)) {
                            System.out.println("執行緒1獲取到了鎖2");
                            System.out.println("執行緒1成功獲取到兩把鎖");

                            lock2.unlock();
                            lock1.unlock();
                            break;
                        } else {
                            System.out.println("執行緒1嘗試獲取鎖2失敗,已重試");
                            lock1.unlock();

                            Thread.sleep(new Random().nextInt(1000));
                        }
                    } else {
                        System.out.println("執行緒1獲取鎖1已失敗,已重試");
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if (flag == 0) {
                try {
                    if (lock2.tryLock(3000,TimeUnit.MILLISECONDS)) {
                        System.out.println("執行緒2獲取到了鎖2");
                        Thread.sleep(new Random().nextInt(1000));
                        if (lock1.tryLock(3000,TimeUnit.MILLISECONDS)) {
                            System.out.println("執行緒2獲取到了鎖1");
                            System.out.println("執行緒2成功獲取到兩把鎖");

                            lock1.unlock();
                            lock2.unlock();
                            break;
                        } else {
                            System.out.println("執行緒2嘗試獲取鎖1失敗,已重試");
                            lock2.unlock();

                            Thread.sleep(new Random().nextInt(1000));
                        }
                    } else {
                        System.out.println("執行緒2獲取鎖2已失敗,已重試");
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        TryLockDeadLock r1 = new TryLockDeadLock();
        TryLockDeadLock r2 = new TryLockDeadLock();
        r1.flag = 1;
        r2.flag = 0;
        new Thread(r1).start();
        new Thread(r2).start();
    }
}
複製程式碼

8.2 多使用併發類而不是自己設計鎖

ConcurrentHashMap、ConcurrentLinkedQueue、AtomicBoolean等

實際使用時java.util.concurrent.atomic十分有用,簡單方便且效率比使用Lock高

多用併發集合少用同步集合,併發集合比同步集合擴充套件性更好

併發場景需要用到map,首先想到用ConcurrentHashMap

8.3 降低使用鎖的粒度,避免使用同一個鎖

保護的範圍大,效率低,容易發生死鎖

8.4 儘量使用同步程式碼塊而非同步方法

使用同步程式碼塊相對於同步方法,縮小了保護的範圍,增加了對物件的控制權,降低發生死鎖的風險。

8.5 給執行緒起個有意義的名字

8.6 避免鎖的巢狀

例如上面的MustDeadLock類

8.7 分配資源前先看能不能收回來

例如:銀行家演演算法

8.8 專鎖專用

儘量不要多個功能使用同一把鎖

9.其他活性問題

死鎖是最常見的活躍性問題,不過除了剛才的死鎖之外,還有一些類似的問題,會導致程式無法順利執行,統稱為活躍性問題。

9.1 活鎖

9.1.1 什麼是活鎖

再回到前面的哲學家就餐問題,發生死鎖是因為每個哲學家都是先拿到左手的餐具,永遠在等待右手邊的餐具(或者相反),這樣就會發生死鎖。

活鎖相對與死鎖更加智慧一點,這些哲學家同時進入餐廳,同時拿起左邊的餐具然後會等待5分鐘,然後放下餐具,再等5分鐘,又同時拿起餐具,這樣也會導致每個哲學家無法吃飯。

換成程式中的話就是說:程式一直在執行,但是屬於無用功,白白浪費資源

9.1.2 活鎖的出現

/**
 * 描述:     演示活鎖問題
 */
public class LiveLock {

    static class Spoon{
        private Diner onwer;

        public Spoon(Diner onwer) {
            this.onwer = onwer;
        }

        public Diner getOnwer() {
            return onwer;
        }

        public void setOnwer(Diner onwer) {
            this.onwer = onwer;
        }

        public synchronized void use(){
            System.out.printf("%s has eaten!",onwer.name);
        }
    }

    static class Diner{
        private String name;
        private boolean isHungry;

        public Diner(String name) {
            this.name = name;
            this.isHungry = true;
        }

        public void earWith(Spoon spoon,Diner spouse){
            while (isHungry){
                //自己沒有拿到勺子
                if (spoon.onwer != this){
                    //等待伴侶吃飯
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    continue;
                }

                //如果伴侶是飢餓的
                if (spouse.isHungry){
                    System.out.println(name + ": 親愛的" + spouse.name +"你先吃吧");
                    //將勺子給伴侶
                    spoon.setOnwer(spouse);
                    continue;
                }

                //我可以吃飯了
                spoon.use();
                //吃完了改變hungry的狀態
                isHungry = false;
                System.out.println(name + ": 我吃完了");
                //將勺子給伴侶
                spoon.setOnwer(spouse);
            }
        }
    }

    public static void main(String[] args) {

        Diner husband = new Diner("牛郎");
        Diner wife = new Diner("織女");

        Spoon spoon = new Spoon(husband);
        new Thread(new Runnable() {
            @Override
            public void run() {
                husband.earWith(spoon,wife);
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                wife.earWith(spoon,husband);
            }
        }).start();

    }
}
複製程式碼

9.1.3 如何解決活鎖問題

出現活鎖的原因:重試機制不變,訊息佇列始終重試,吃飯始終謙讓

解決: 加入隨機因素

9.1.4 程式碼演示

/**
 * 描述:     演示活鎖問題
 */
public class LiveLock {

    static class Spoon{
        private Diner onwer;

        public Spoon(Diner onwer) {
            this.onwer = onwer;
        }

        public Diner getOnwer() {
            return onwer;
        }

        public void setOnwer(Diner onwer) {
            this.onwer = onwer;
        }

        public synchronized void use(){
            System.out.printf("%s has eaten!",Diner spouse){
            while (isHungry){
                //自己沒有拿到勺子
                if (spoon.onwer != this){
                    //等待伴侶吃飯
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    continue;
                }

                Random random = new Random();
                //如果伴侶是飢餓的
                if (spouse.isHungry && random.nextInt(10) < 9){ //降低給勺子的機率
                    System.out.println(name + ": 親愛的" + spouse.name +"你先吃吧");
                    //將勺子給伴侶
                    spoon.setOnwer(spouse);
                    continue;
                }

                //我可以吃飯了
                spoon.use();
                //吃完了改變hungry的狀態
                isHungry = false;
                System.out.println(name + ": 我吃完了");
                //將勺子給伴侶
                spoon.setOnwer(spouse);
            }
        }
    }

    public static void main(String[] args) {

        Diner husband = new Diner("牛郎");
        Diner wife = new Diner("織女");

        Spoon spoon = new Spoon(husband);
        new Thread(new Runnable() {
            @Override
            public void run() {
                husband.earWith(spoon,husband);
            }
        }).start();

    }
}
複製程式碼

9.2 飢餓

當執行緒需要某些資源(例如CPU),卻始終得不到

飢餓的原因

  • 當某個執行緒的執行優先順序過低,始終得不到CPU資源
  • 某個執行緒一直持有鎖,卻從不釋放鎖
  • 某程式始終佔用某檔案的寫鎖。

飢餓的危害

飢餓可能會導致響應性變差:例如一個執行緒負責前臺的響應,另一條執行緒負責後臺的資料處理,但是由於前臺執行緒優先順序比較低始終得不到執行,這樣會導致使用者體驗變差。