1. 程式人生 > 實用技巧 >基於專案內部的MQ手寫連線池

基於專案內部的MQ手寫連線池

最近,專案收到中介軟體團隊的報告,我們的應用連線他們的中介軟體(專案內部的MQ)連線數太大了,要求我們做一些調整。然後看了下我們的程式碼,發現我們接收和傳送MQ訊息的方式是每次新建一個連線然後關閉連線(詢問了之前的同事,目前因為某些原因現在只能採取這種方式傳送訊息),但是每連線一次都會new一個物件出來,感覺挺佔用資源的,而且萬一沒有及時釋放掉資源,一直佔用連線資源就對服務端壓力也很大,討論了一番決定採用連線池方式,這樣我們可以自己控制連線數量。

我們基於【2W1h】方式來討論連線池:什麼是連線池(what)?我們內部專案的MQ為什麼需要連線池(why)?怎麼樣做一個基於我們內的的MQ做連線池 (how)?

what: 什麼是連線池?

深入思考連線池的本質,但不要思考的過於複雜~~

連線: 是網路中用於傳輸資料的通道; ”連線“才是我們要真正去使用的物件,“池”是用來管理多個連線的一種方式。

池: 是一種容器的概念,做儲存的。在程式設計中我們往往使用陣列,連結串列,佇列,Map來表示。
所以“連線池”中的“連線”肯定是已經建立的好的長連線,比如tcp連線,websocket連線等,即取即用,用完放回。

跟據下游型別,我們常見的有資料庫連線池,快取連線池,服務連線池。在程式設計中,我們還會經常碰到程序池,執行緒池,協程池,記憶體池,物件池等。

why:我們內部專案的MQ為什麼需要連線池?

其實開頭的第一句話已經回答了這個問題。

連線池除了能非常方便的對連線進行管理外,而且在高吞吐的連線池大大提高資料的傳輸的效率。提高效率主要在於下面兩個方面:

1.避免反覆的三次握手和四次握手

長連線的建立需要進行三次握手,而連線的釋放需要進行四次握手,這是發生在系統層面的兩個動作,對於單條連線來說耗時微乎其微,
但來高吞吐場景時,耗時則不能忽略。所以連線池的及取即用和用完放回的特性,避免了大量三次握手和四次握手的無效耗時,
從而節省系統資源。

2. 增加並行車道,實現全雙工並行

資料通訊包括單工,半雙工和全雙工。單工通訊如下圖,資料只能從A到B,不符合訪問下游服務的場景。

半雙工通訊如下圖,資料可以從A到B,也可以從B到A,但是同一時刻只能一個方向上的資料傳輸,通道利用率是50%。

全雙工通訊如下圖,可同時存在從A到B和從B到A的資料傳輸,通道利用率是100%。長連線就是全雙工通訊。

在IO密集型的網際網路應用中,一條全雙工通訊通道仍然無法滿足資料吞吐的需求時,該如何解決?在網際網路效能測試指標中有個這樣一個公式:QPS(吞吐量)= 併發數/平均響應時間,在平均相響應時間不變的情況下,適度增加併發數可以提升吞吐量;所以採用多條雙全工通訊的方式可以在一定程度上提高吞吐量,而連線池就是最好的實現方式。

總結一下:為什麼需要連線池?

1.方便管理連線
2.避免反覆的三次握手和四次握手
3.更好的實現雙全工並行

how:怎麼樣做一個基於我們內的的MQ做連線池?

實現一個連線池,最關鍵的是均衡和保活,如下圖:

我們專案組實現方式的思路如下:

1.MqClient類,初始化連線池,設定初始連線數量,最大連線數量,獲取資源,釋放資源,虛擬碼如下:

MqClient {

 ConcurrentHashMap<String queueKey,ArrayBlockingQueue<Mqconnection> queue> queueMap;
 long initConnect = 5;
 long maxConnect = 10;
 AtomicLong capacity;
 
 
 init(){
    for(){
    // 初始化5個連線
    mqConnecions.offer(mqConnecion);
    capacity.getAndIncrement();
    }        
 }
 
 get(String queueKey){    
    // 根據queKey獲取queue 
     
     // 獲取mqCoonnection    
     if(mqConnection.peek()){
        return mqConnection.poll();
     }
     // 佇列沒有到最大值繼續建立連線
     if(capacity.get() < maxConnect){
         // 繼續建立新的連線,並塞入佇列
     }
     // 遞迴重試get()    
 }
 
 free(String queueKey, Mqconnection connection){
     // 如果可用並且佇列大小沒有超過最大連線值塞入佇列,否則主動斷開連線並丟棄
 }    
}

2.MqClientHelper(輔助類,負責監聽和保活)

MqClientHelper {
    MqClient mqclient;
    
    listenMqAcLog(){
     // 監聽mqx相關的日誌    
    }
    
    // 傳送心跳
    sendHeartbeat(){
        
    }
}

當然上面我們專案的第一版的方案的還是比較的粗糙的,還有很多地方不完善,比如有些方法需要加鎖操作等,而且通常我們的連線池會連線下游多個節點。如下圖所示:

一般來說,相對比較完善的連線池,有如下幾個特性:

1.高可用:下游任意一個server宕機時,連線池關閉相關無效連線,防止被client訪問;
2.可擴充套件:下游增加一個server節點時,連線池會發現並建立到新server節點的連線,供client訪問;
3.負載均衡:連線池會根據下游server的服務能力的高低分配資料請求;
4.中介軟體:當下遊server是類似MYSQL資料庫並分片時,連線池會將請求打在相應的資料節點上,並對資料進行聚合;