1. 程式人生 > >elasticsearch原始碼分析之Transport(五)

elasticsearch原始碼分析之Transport(五)

一、基本介紹

1.1概念介紹

transport模組是es通訊的基礎模組,在elasticsearch中用的很廣泛,比如叢集node之間的通訊、資料的傳輸、transport client方式的資料傳送等等,只要數和通訊、資料傳輸相關的都離不開transport模組的作用。

transport模組分為LocalTransport和NettyTransport兩種,在TransportModule中註冊中可以通過node是local還是network的來判別使用哪一種transport,可以通過配置node.mode來決定,bind邏輯如下:

 

預設而且通常我們使用的實現類是NettyTransport,描述資訊如下:


NettyTransport分為四種類型的連線,分別是:

  • recovery:做資料恢復recovery,默認個數2個;
  • bulk:用於bulk請求,默認個數3個;
  • med/reg:典型的搜尋和單doc索引,默認個數6個;
  • high:如叢集state的傳送等,默認個數1個;
  • ping:就是node之間的ping咯。默認個數1個;

其中recovery和bulk之前版本是同一個的,叫做low,表示大資料量的傳輸,它們可能會導致通常的請求(如search或是單資料索引)耗時加長;

1.2配置資訊

1.2.1workerCount

workerCount表示transport的總共的worker數目,由transport.netty.worker_count

來配置,預設值是32和Runtime.getRuntime().availableProcessors()中的最小值,也就是不能超過32,為什麼會有這個限制呢?是因為在elasticsearch的github上有人提了個issues/3478,當使用core很多的機器的時候(比如48core),會建立太多的記憶體從而導致OOM,所以設定了32的上限來避免太多執行緒給系統產生壓力。

建立邏輯


建立serverBootstap


建立clientBootstap


1.2.2connetion number

1.1中介紹的各種連線數初始化


1.3.2defaultReceiverPredictor

Netty是nio的,在Netty中通過ReceiveBufferSizePredictor根據上次訊息的大小來決定預測本次訊息所需的快取大小。

從channel讀取資料到快取到,並向上行流通知訊息接收事件。預設值計算邏輯:


JvmInfo.jvmInfo().getMem().getDirectMemoryMax(),最終呼叫的就是jdk中sun.misc.VM類裡面的directMemory ,可以通過 -XX:MaxDirectMemorySize來配置的,預設是64M.

二、建立連線

2.1啟動服務

因為NettyTransport繼承了AbstractLifecycleComponent,實現了doStart()方法。在node、transportclient啟動的時候呼叫了


最終呼叫了doStart()方法,在doStart中會根據配置啟動一個client和一個server,分別是ClientBootstrapServerBootstrap(都是Netty中的),因為節點之間要相互通訊的,所以client和sever都要啟動。並分別註冊各自的PipelineFactory,在PipelineFactory中建立各自的channelPipeline,其中註冊了訊息處理的方式。

客戶端


服務端


2.2連線節點

在啟動Discovery服務後,會發現新節點,發現之後開始進行連線工作。在UnicastZenPing中會呼叫transportService.connectToNodeLight(finalNodeToSend)進行連線。

實質就是建立Netty中的Channel,一個連線就是第一個Channel,根據之前所說的會有多個型別的連線會建立:

nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], 

new Channel[connectionsPerNodeBulk], new Channel[connectionsPerNodeReg], 
new Channel[connectionsPerNodeState], new Channel[connectionsPerNodePing]);


上面連線建立完畢,供後續資料傳輸使用。

三、傳送資料

3.1獲取channel

sendRequest方法會傳入一個引數options,是一個TransportRequestOptions的例項,包含三個屬性:timeout(超時時間)、compress(是否壓縮)、type(傳送的型別,即上面說的五個之一)。

根據需要傳送資料的節點和傳送的型別(上面五大型別)獲得到對應的channel



3.1資料寫入

之後資料該壓縮的壓縮(壓縮方法在CompressorFactory中實現),並寫入version和action;

寫入request資訊,通過ChannelBuffers創建出buffer;


3.3資料傳送

最後通過Netty中的targetChannel.write(buffer),將資料傳送。


四、接收資料

4.1處理類註冊

參照2.1啟動服務中的channelPipeline註冊,統一的handler為MessageChannelHandler,負責訊息接受及處理邏輯,在其他模組中會對不同的訊息(Action)註冊對應的處理程式(handler)。

4.2訊息處理

在對收到的內容進行解析的過程中獲取到action,找到對應的handler進行處理;訊息處理則需要messageReceived,裡面會有對於request和response分別有相應的handler來處理:handleRequest、handleResponse。