1. 程式人生 > >Hadoop RPC分析(一) -- Client

Hadoop RPC分析(一) -- Client

[Hadoop RPC呼叫入口] 在使用Hadoop RPC基本框架中,主要是通過getProxy來獲得一個客戶端代理物件,通過這個物件來向服務端傳送RPC請求。 getProxy有多個過載方法,最終都是呼叫到了下面這個函式來進行實現 (來自org.apache.hadoop.ipc.RPCpublicstatic<T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol, longclientVersion,                                 InetSocketAddress addr,
                                UserGroupInformation ticket,                                 Configuration conf,                                 SocketFactory factory, intrpcTimeout,                                 RetryPolicy connectionRetryPolicy)throwsIOException {    if(UserGroupInformation.isSecurityEnabled()) {
      SaslRpcServer.init(conf);     } returngetProtocolEngine(protocol,conf).getProxy(protocol, clientVersion,         addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy);   } 而在服務端,通過build方法,來構建一個Server物件 (來自 org.apache.hadoop.ipc.RPC.Builder     /**      * Build the RPC Server.      *
@throwsIOException on error      *@throwsHadoopIllegalArgumentException when mandatory fields are not set      */ publicServerbuild()throwsIOException, HadoopIllegalArgumentException { if(this.conf == null) { thrownewHadoopIllegalArgumentException("conf is not set");       } if(this.protocol == null) { thrownewHadoopIllegalArgumentException("protocol is not set");       } if(this.instance == null) { thrownewHadoopIllegalArgumentException("instance is not set");       } returngetProtocolEngine(this.protocol,this.conf).getServer( this.protocol,this.instance,this.bindAddress,this.port, this.numHandlers,this.numReaders,this.queueSizePerHandler, this.verbose,this.conf,this.secretManager,this.portRangeConfig);     } 通過上面的兩個入口,分別在客戶端和服務端生成了進行遠端呼叫所需要的物件。 上面的getProtocolEngine,是獲取一個RPC引擎,預設使用的是WritableRpcEngine(新版本貌似改成了ProtobufRpcEngine?),這裡使用WritableRpcEngine來進行原始碼追蹤。 下面簡述追蹤路徑: 客戶端:WritableRpcEngine.getProxy() ---> Invoker ---> Client 使用了jdk的動態代理,Invoker實現了InvocationHandler介面,其invoke方法的實現,就是通過呼叫Client的call方法實現的,程式碼如下 @Override publicObjectinvoke(Object proxy, Method method, Object[] args) throwsThrowable { longstartTime = 0; if(LOG.isDebugEnabled()) {         startTime = Time.now();       }       ObjectWritable value = (ObjectWritable) client.call(RPC.RpcKind.RPC_WRITABLE,newInvocation(method, args),remoteId); if(LOG.isDebugEnabled()) { longcallTime = Time.now() - startTime; LOG.debug("Call: " + method.getName() +" "+ callTime);       } 因此,我們對於客戶端的理解,將主要集中在Client類上。 服務端:WritableRpcEngine.getServer() ---> Server new操作生成了一個Server物件,因此我們對於服務端的理解,將主要集中在Server類上。 [Hadoop RPC客戶端:Client] 客戶端的思路可以簡述為:將呼叫的方法資訊通過網路傳送到服務端,並等待服務端的返回。所以本質上,RPC就是對一般的網路訪問做了封裝,造成了類似本地呼叫的假象。 這裡我們將主要關注客戶端的一次RPC是什麼樣的流程,並希望能找到對應的實現程式碼。 與Client相關的類主要為下面幾個(都是Client的內部類) Client.Connection -------- 一個Connection物件表示一個和服務端之間的連線通道,它提供了和具體呼叫業務無關的底層通道資訊,作為一個基礎工具存在 Client.Call -------- 一個Call表示一次遠端過程呼叫,它裡面包含了本次遠端過程呼叫的請求資訊,呼叫結果返回等資訊,作為遠端過程呼叫業務存在。 由於實現了底層通道和具體的呼叫業務無關,多個呼叫業務可以複用同一個底層通道,在Connection內部會維護多個當前存在的呼叫業務。 通道本身是業務無關的,客戶端和服務端之間是可以存在多條並行的通道的,在Client內部會有一個Connection的執行緒池。 首先來看Client的屬性 (來自org.apache.hadoop.ipc.Client  /** A counter for generating call IDs. */ privatestaticfinalAtomicIntegercallIdCounter=newAtomicInteger(); privatestaticfinalThreadLocal<Integer>callId=newThreadLocal<Integer>(); privatestaticfinalThreadLocal<Integer>retryCount=newThreadLocal<Integer>();  private Hashtable<ConnectionId, Connection>connections=newHashtable<ConnectionId, Connection>();   privateClass<?extendsWritable>valueClass;  // class of call values privateAtomicBooleanrunning=newAtomicBoolean(true);// if client runs finalprivateConfigurationconf; privateSocketFactorysocketFactory;          // how to create sockets privateintrefCount= 1; privatefinalintconnectionTimeout; privatefinalbooleanfallbackAllowed; privatefinalbyte[]clientId; finalstaticintCONNECTION_CONTEXT_CALL_ID= -3; 可以看到,在Client中是存在多個與服務端的連線物件的。 再看下Connection的屬性 (來自 org.apache.hadoop.ipc.Client.Connection     privateInetSocketAddressserver;            // server ip:port privatefinalConnectionIdremoteId;               // connection id privateAuthMethodauthMethod;// authentication method privateAuthProtocolauthProtocol; privateintserviceClass; privateSaslRpcClientsaslRpcClient; privateSocketsocket=null;                // connected socket privateDataInputStreamin; privateDataOutputStreamout; privateintrpcTimeout; privateintmaxIdleTime;//connections will be culled if it was idle for //maxIdleTime msecs privatefinalRetryPolicyconnectionRetryPolicy; privateintmaxRetriesOnSocketTimeouts; privatebooleantcpNoDelay;// if T then disable Nagle's Algorithm privatebooleandoPing;//do we need to send ping message privateintpingInterval;// how often sends ping to the server in msecs privateByteArrayOutputStreampingRequest;// ping message // currently active calls privateHashtable<Integer, Call>calls=newHashtable<Integer, Call>(); privateAtomicLonglastActivity=newAtomicLong();// last I/O activity time privateAtomicBooleanshouldCloseConnection=newAtomicBoolean(); // indicate if the connection is closed privateIOExceptioncloseException;// close reason privatefinalObjectsendRpcRequestLock=newObject(); 基本上都是建立與服務端的連線所需要的基本配置資訊,有一個calls屬性,存放的是提交到當前這個連線的請求物件。 Call物件就表示一次遠端過程呼叫業務,因此它含有遠端呼叫業務所需要的引數資訊,來看Call的屬性 (來自 org.apache.hadoop.ipc.Client.Call   /**    * Class that represents an RPC call    */ staticclassCall { finalintid;              // call id finalintretry;          // retry count finalWritablerpcRequest// the serialized rpc request     WritablerpcResponse;      // null if rpc has error     IOExceptionerror;         // exception, null if success finalRPC.RpcKindrpcKind;     // Rpc EngineKind booleandone;              // true when call is done OK,在結構上了解了類的作用後,就可以來看下客戶端的一次遠端呼叫的流程了。只需要研究Client.call即可,程式碼如下 publicWritablecall(RPC.RpcKind rpcKind, Writable rpcRequest,       ConnectionId remoteId,intserviceClass)throwsIOException { finalCall call = createCall(rpcKind, rpcRequest);    Connection connection = getConnection(remoteId, call, serviceClass); try{      connection.sendRpcRequest(call);                // send the rpc request     }catch(RejectedExecutionException e) { thrownewIOException("connection has been closed", e);     }catch(InterruptedException e) {       Thread.currentThread().interrupt(); LOG.warn("interrupted waiting to send rpc request to server", e); thrownewIOException(e);     } booleaninterrupted =false; synchronized(call) { while(!call.done) { try{          call.wait();                           // wait for the result         }catch(InterruptedException ie) { // save the fact that we were interrupted           interrupted =true;         }       } if(interrupted) { // set the interrupt flag now that we are done waiting         Thread.currentThread().interrupt();       } if(call.error!=null) { if(call.errorinstanceofRemoteException) {           call.error.fillInStackTrace(); throwcall.error;         }else{// local exception           InetSocketAddress address = connection.getRemoteAddress(); throwNetUtils.wrapException(address.getHostName(),                   address.getPort(),                   NetUtils.getHostname(),                   0,                   call.

相關推薦

Hadoop RPC分析 -- Client

[Hadoop RPC呼叫入口] 在使用Hadoop RPC基本框架中,主要是通過getProxy來獲得一個客戶端代理物件,通過這個物件來向服務端傳送RPC請求。 getProxy有多個過載方法,最終都是呼叫到了下面這個函式來進行實現 (來自org.apache.hadoo

Docker Client原始碼分析

主要內容: Docker Client在Docker中的定位,以及Docker Client原始碼的初步分析。 本文選取Docker拆分為DockerCE(社群版)和DockerEE(企業版)之後的Docker-CE的第一個穩定版本v17.06.0-ce。 https://github.com/docker

Docker Client源碼分析

... stringvar 文件中 digest tput emp 技術分享 nim 那種 主要內容: Docker Client在Docker中的定位,以及Docker Client源碼的初步分析。 本文選取Docker拆分為DockerCE(社區版)和DockerEE(

Cat原始碼分析Client

前言 cat的Client端所做的工作就是收集埋點資訊,將埋點資訊處理成messageTree,放到傳送佇列中,在啟動另一個執行緒,非同步消費佇列,進行訊息的傳送。 本文涉及到三個內容: 客戶端初始化:做了哪些準備工作 message的建立過程 客戶端的傳

appium python-client程式碼分析

https://github.com/appium/python-client/tree/master/appium 先看看程式碼的結構: common目錄: ——主要是一些初始化 webdriver目錄: ——是appium工作最重要的基礎,裡面包含我們程式設計需

Hadoop作業提交分析

bin/hadoop jar xxx.jar mainclass args……   這樣的命令,各位玩Hadoop的估計已經呼叫過NN次了,每次寫好一個Project或對Project做修改後,都必須打個Jar包,然後再用上面的命令提交到Hadoop Cluster上

Hadoop基礎學習分析、編寫並執行WordCount詞頻統計程序

contains trace net tails super val 詞頻統計 上傳 str 版權聲明:本文為博主原創文章,未經博主同意不得轉載。 https://blog.csdn.ne

Hadoop自學筆記常見Hadoop相關項目一覽

-a https class Lucene 百萬 data fcm you 轉換 本自學筆記來自於Yutube上的視頻Hadoop系列。網址: https://www.youtube.com/watch?v=-TaAVaAwZTs(當中一個) 以後不再贅述 自學筆

【Java】【Flume】Flume-NG啟動過程源代碼分析

code extends fix tar top 依據 oid article gif 從bin/flume 這個shell腳本能夠看到Flume的起始於org.apache.flume.node.Application類,這是flume的main函數所在。   m

linux設備驅動之misc驅動框架源碼分析

linux驅動開發misc設備驅動 1、misc設備驅動框架源碼部分是由內核開發者實現提供的,主要是創建misc類和為驅動開發者提供misc_register函數,來進行創建misc設備。 這部分的源碼在/drvier/char/misc.c裏,代碼如下:/* * linux/drivers/c

linux驅動開發之蜂鳴器驅動源碼分析

linux 蜂鳴器 驅動 蜂鳴器的驅動源碼在/driver/char/buzzer/x210-buzzer.c文件中,源碼如下#include <linux/module.h> #include <linux/kernel.h> #include <linux

【雷電】源代碼分析-- 進入遊戲開始界面

模式 show 源代碼下載 popu 按鍵事件 file fcm md5 顯示 轉載請註明出處:http://blog.csdn.net/oyangyufu/article/details/24941949 源代碼下載:http://download.csdn.n

java代碼實現highchart與數據庫數據結合完整案例分析---餅狀圖

隱藏 des log cred 數據庫數據 idt string 時間 input 作者原創:轉載請註明出處 在做項目的過程中,經常會用到統計數據,同時會用到highchart或echart進行數據展示,highchart是外國開發的數據統計圖插件, echa

使用Apriori進行關聯分析

不一定 再計算 add 在一起 num create images loaddata scan   大型超市有海量交易數據,我們可以通過聚類算法尋找購買相似物品的人群,從而為特定人群提供更具個性化的服務。但是對於超市來講,更有價值的是如何找出商品的隱藏關聯,從而打包促銷,以

python的計數引用分析

結果 class 默認 htm ron 如果 目前 解釋器 bject python的垃圾回收采用的是引用計數機制為主和分代回收機制為輔的結合機制,當對象的引用計數變為0時,對象將被銷毀,除了解釋器默認創建的對象外。(默認對象的引用計數永遠不會變成0) 所有的計數引用+1的

python的random模塊函數分析

分析 之前 pri orm cnblogs 進行 ron sample 函數封裝 random是python產生偽隨機數的模塊,隨機種子默認為系統時鐘。下面分析模塊中的方法: 1.random.randint(start,stop): 這是一個產生整數隨機數的函數,參數st

Java淺談數組之內存分析

靜態 引用變量 ati static ges 組元 strong 淺談 nbsp 數組的內存 1.內存中的數組 數組是一種引用內存,數組引用變量只是一個引用,數組元素和數組變量在內存裏是分開存放的。 實際的數組對象被存放在堆內存(heap)中,數組的引用變量被存儲在棧內存中

SpringMVC的流程分析—— 整體流程概括

classes amp 不同 方法 restfu equals 類圖 strong .get SpringMVC的整體概括 之前也寫過springmvc的流程分析,只是當時理解的還不透徹所以那篇文章就放棄了,現在比之前好了些,想著寫下來分享下,也能增強記憶,也希望可以幫助到

[讀書筆記] Python數據分析 準備工作

基礎 htm 環境 防止 功能 多維 處理工具 ati 增強 1. python中數據結構:矩陣,數組,數據框,通過關鍵列相互聯系的多個表(SQL主鍵,外鍵),時間序列 2. python 解釋型語言,程序員時間和CPU時間衡量,高頻交易系統 3. 全局解釋器鎖GIL,

Ocata Neutron代碼分析——Neutron API啟動過程分析

process fig ddr arch 異常 run tap 文件中 bridge 首先,Neutron Server作為一種服務(neutron-server.service),可以到Neutron項目目錄中的setup.cfg配置文件中找到對應的代碼入口。 [ent