1. 程式人生 > 其它 >mysql-DuplicateUpdate和java的threadpool的"死鎖"

mysql-DuplicateUpdate和java的threadpool的"死鎖"

大家千萬不要被文章的標題給迷惑了,他兩在本篇文章是沒有關係的, 今天給大家講講最近2個有意思的issue,分享一下我學到的

  • mysql DuplicateUpdate的用法要注意的點
  • java的threadpool使用不當會造成“死鎖”問題

mysql DuplicateUpdate的用法要注意的點

有個issue說遇到了一個這樣的問題,

這個朋友使用我開源的job排程框架 https://github.com/yuzd/Hangfire.HttpJob

儲存用的是mysql,採用的實現是 https://github.com/arnoldasgudas/Hangfire.MySqlStorage

set表的id是自增主鍵,正常理解 都是慢慢自增上去的,但是發現是大幅度跳躍式的自增, 真相是什麼?

首先針對這個問題,首先我們搞清楚在hangfire中和storage相關的部分如下

image
  • hangfire server排程依賴storage
  • storage抽象出來一層api(解耦)
  • 第三方擴充套件(不關心具體的storage實現)
  • 不同的storage具體實現(比如mysql,sqlserver等)

Hangfire.Httpjob其實只是依賴了storage api那一層,也沒有能力去直接寫sql去執行, 只能用api去操作hangfire的那幾張表(比如set表)

那麼問題肯定不是在擴充套件層,而是得去看看mysqlstorage的實現原始碼,針對set表的處理邏輯

https://github.com/arnoldasgudas/Hangfire.MySqlStorage/blob/0bd1016f715c8c6617ce22fb7b2ce5b6c328d2fb/Hangfire.MySql/MySqlWriteOnlyTransaction.cs#L155


  public override void AddToSet(string key, string value, double score)
    {
        Logger.TraceFormat("AddToSet key={0} value={1}", key, value);

        AcquireSetLock();
        QueueCommand(x => x.Execute(
            $"INSERT INTO `{_storageOptions.TablesPrefix}Set` (`Key`, `Value`, `Score`) " +
            "VALUES (@Key, @Value, @Score) " +
            "ON DUPLICATE KEY UPDATE `Score` = @Score",
            new { key, value, score }));
    }

這裡是用了ON DUPLICATE KEY UPDATE 的語句

這個語法是在mysql 4.1(2005)引入的,意思是 insert的時候遇到主鍵已存在 就執行後面 的update

但是就是這個功能 會造成自增主鍵成跳躍式增長,增長跨度和SQL的執行次數成正比

根據朋友提供的截圖

image

雖說是會跳躍,但是這個增長也太誇張了

打上斷點除錯發現

是hangfire server 不斷的在呼叫,目的是把下一次執行時間(秒級別的時間戳)寫到set表中

image
image
image

打上日誌可以看到有非常多相同值的呼叫,這僅僅是一個job,這個自增速度得再乘以job的個數,難怪了

既然找到原因了,就提個PR 修改下


 public override void AddToSet(string key, string value, double score)
        {
            Logger.TraceFormat("AddToSet key={0} value={1}", key, value);
        
            AcquireSetLock();
            QueueCommand(x =>
            {
                var sql = "";
                if (key == "recurring-jobs") // 只發現這個key存在這個問題
                {
                     // key+value是uniq 改成先update 如果沒有成功 再insert
                    sql = $"UPDATE `{_storageOptions.TablesPrefix}Set` set `Score` = @score where `Key` = @key and `Value` = @value";
                    var updateRt = x.Execute(sql, new { score = score, key = key, value = value });
                    if (updateRt < 1)
                    {
                        sql = $"INSERT INTO `{_storageOptions.TablesPrefix}Set` (`Key`, `Value`, `Score`) " +
                              "VALUES (@Key, @Value, @Score) ";
                        x.Execute(
                            sql,
                            new { key, value, score });
                    }
                }
                else
                {
                    sql = $"INSERT INTO `{_storageOptions.TablesPrefix}Set` (`Key`, `Value`, `Score`) " +
                          "VALUES (@Key, @Value, @Score) " +
                          "ON DUPLICATE KEY UPDATE `Score` = @Score";
                   x.Execute(
                       sql,
                       new { key, value, score });
                }
        
                //Console.WriteLine(sql + " ==> " + key + "@" + value + "@" + score);
            });
        }

改完之後測試,id自增一切正常:

image

java的threadpool使用不當會造成“死鎖”問題

image

這個原因先說出來: threadpool的執行緒被佔用完後,再來的task會往queue裡面丟,如果這個時候在這個pool的執行緒裡面 future.get()的話會導致task runner(執行器)被堵住,沒人從佇列裡面取任務了~

(簡單來說就是 執行緒在wait future返回,而這個future在queue裡面苦苦等待新釋放的執行緒去執行,就像死鎖一樣,我在等你的結果,而結果在等待著被執行)

好傢伙,這個場景有點熟悉,因為我在專案中也用過Future.get()// 雖說有設定timeout

但是這個問題的重要一點是,這種花式“死鎖” jvm是檢測不出來的,下面有測試

模擬一下這個場景:

我搞了2個執行緒池,分別是nio執行緒池和業務執行緒池,模擬併發20個請求, 注意看process2方法裡的註釋,如果去掉那裡的程式碼的話 就不會有這個死鎖問題


/**
 * @author yuzd
 */
public class PoolTest {

    // 模擬nio執行緒池
    static ThreadPoolExecutor nioExecutor = new ThreadPoolExecutor(20, 20, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),
            new CustomerNamedThreadFactory("nio", false),
            new ThreadPoolExecutor.AbortPolicy());

    // 業務執行緒池
    static ThreadPoolExecutor buExecutor = new ThreadPoolExecutor(20, 20, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),
            new CustomerNamedThreadFactory("bu", true),
            new ThreadPoolExecutor.AbortPolicy());

    public static void main(String[] args) {

        // 模擬是http請求併發20個
        IntStream.rangeClosed(1, 20).parallel().forEach((index) -> {
            // 交給nio執行緒池處理            
            nioExecutor.execute(() -> {
                try {
                    httpHandler(index);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        });
    }


   
    static void httpHandler(int index) throws ExecutionException, InterruptedException {
        System.out.println(Thread.currentThread().getName() + " request index :" + index + " staring");
        // 交給業務執行緒池處理     
       
        Future<String> parentFuture = buExecutor.submit(() -> process1(index));
        String p1Rt = parentFuture.get();  // nio執行緒在wait
        System.out.println(Thread.currentThread().getName() + " request index :" + p1Rt + " ending");
    }

    // future1
    static String process1(int index) throws ExecutionException, InterruptedException {
        System.out.println( Thread.currentThread().getName() + " process1 index :" + index + " staring");
        Future<String> childFuture = buExecutor.submit(() -> process2(index));
        String p2Rt = childFuture.get();  // 這裡是bu執行緒在wait   這裡會發生死鎖
        
        System.out.println(Thread.currentThread().getName() + " process1 index :" + index + " ending");
        return p2Rt;
    }

    // future2
    static String process2(int index) throws InterruptedException, ExecutionException {
        System.out.println(Thread.currentThread().getName() + " process2 index :" + index + " staring");
        // 加上就會死鎖 
        // 只要不一下子產生足夠數量的task(把core全部佔掉)就不會死鎖 加了這裡就會把core全部佔據 導致task進入到queue,core執行緒在wait future.get 無法被釋放, 而queue的任務在等待它釋放產生新的執行緒
        Future<String> submit = buExecutor.submit(() -> {
            try {
                Thread.sleep(1000);
                return String.valueOf(index);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        submit.get(); 
        System.out.println(Thread.currentThread().getName() + " process2 index :" + index + " ending");
        return String.valueOf(index);
    }
}

用visualvm分析執行緒dump,很難直接發現有異常,非同步的很難檢測,排查起來比較複雜,只看到是在wait

image

用jstack沒有發現deadlock

image

在實際專案中我也看到過一個專案中共用一個執行緒池,執行緒池被封裝成一個util方法,要執行非同步的都用它,這個場景尤其要注意這個場景,也建議大家用帶有超時的方式 Future.get(xxxx)