1. 程式人生 > >Hadoop基於Protocol Buffer的RPC實現程式碼分析-Server端

Hadoop基於Protocol Buffer的RPC實現程式碼分析-Server端

http://yanbohappy.sinaapp.com/?p=110

最新版本的Hadoop程式碼中已經默認了Protocol buffer(以下簡稱PB,http://code.google.com/p/protobuf/)作為RPC的預設實現,原來的WritableRpcEngine已經被淘汰了。來自cloudera的Aaron T. Myers在郵件中這樣說的“since PB can provide support for evolving protocols in a compatible fashion.”

首先要明白PB是什麼,PB是Google開源的一種輕便高效的結構化資料儲存格式,可以用於結構化資料序列化/反序列化,很適合做資料儲存或 RPC 資料交換格式。它可用於通訊協議、資料儲存等領域的語言無關、平臺無關、可擴充套件的序列化結構資料格式。目前提供了 C++、Java、Python 三種語言的 API。簡單理解就是某個程序把一些結構化資料通過網路通訊的形式傳遞給另外一個程序(典型應用就是RPC);或者某個程序要把某些結構化資料持久化儲存到磁碟上(這個有點類似於在Mongodb中的BSON格式)。對於儲存的這個例子來說,使用PB和XML,JSON相比的缺點就是儲存在磁碟上的資料使用者是無法理解的,除非用PB反序列化之後才行,這個有點類似於IDL。優點就是序列化/反序列化速度快,網路或者磁碟IO傳輸的資料少,這個在Data-Intensive Scalable Computing中是非常重要的。

Hadoop使用PB作為RPC實現的另外一個原因是PB的語言、平臺無關性。在mailing list裡聽說過社群的人有這樣的考慮:就是現在每個MapReduce task都是在一個JVM虛擬機器上執行的(即使是Streaming的模式,MR任務的資料流也是通過JVM與NN或者DN進行RPC交換的),JVM最嚴重的問題就是記憶體,例如OOM。我看社群裡有人討論說如果用PB這樣的RPC實現,那麼每個MR task都可以直接與NN或者DN進行RPC交換了,這樣就可以用C/C++來實現每一個MR task了。百度做的HCE(https://issues.apache.org/jira/browse/MAPREDUCE-1270

)和這種思路有點類似,但是由於當時的Hadoop RPC通訊還是通過WritableRpcEngine來實現的,所以MR task還是沒有擺脫通過本地的JVM代理與NN或者DN通訊的束縛,因為Child JVM Process還是存在的,還是由它來設定執行時環境和RPC互動。

下面來看看Hadoop程式碼中的RPC是如何實現的。RPC就是一臺機器上的某個程序要呼叫另外一臺機器上的某個程序的方法,中間通訊傳輸的就是類似於“方法名、引數1、引數2……”這樣的資訊,是結構化的。同時通訊除了這些RPC實體以外,還要有header等。

我們要定義一種PB實現的RPC傳輸格式,首先要定義相應的.proto檔案,在Hadoop common工程裡,這些檔案放在D:\Hadoop-trunk\hadoop-common-project\hadoop-common\src\main\proto目錄下;在Hadoop HDFS工程裡這些檔案放在D:\Hadoop-trunk\hadoop-hdfs-project\hadoop-hdfs\src\main\proto目錄下,以此類推。Hadoop編譯指令碼會呼叫相應的protoc二進位制程式來編譯這些以.proto結尾的檔案,生成相應的.java檔案。

以D:\Hadoop-trunk\hadoop-hdfs-project\hadoop-hdfs\src\main\proto目錄下的ClientNamenodeProtocol.proto為例說明。檔案最開始定義了一些引數:

option java_package = "org.apache.hadoop.hdfs.protocol.proto";

option java_outer_classname = "ClientNamenodeProtocolProtos";

option java_generic_services = true;

option java_generate_equals_and_hash = true;

這個表示這個.proto檔案經過protoc編譯之後會生成org.apache.hadoop.hdfs.protocol.proto這個包下面的ClientNamenodeProtocolProtos.java類檔案,那麼在Hadoop原始碼裡就可以呼叫這個類裡的方法了。

這個檔案的主體主要是兩種資料型別message和rpc,仔細看下這個檔案就知道了,message就是這個ClientNamenodeProtocol協議中傳輸的結構體,rpc就是呼叫的方法。那麼這兩種型別在經過編譯之後會生成什麼呢?

編譯之後,在Hadoop-trunk/hadoop-hdfs-project/hadoop-hdfs/target/generated-sources/java/org/apache/hadoop/hdfs/protocol/proto目錄裡生成了ClientNamenodeProtocolProtos.java檔案,裡面把message都包裝成了類,而把rpc都包裝成了方法。這個檔案是由PB編譯器自動生成的,所以不能修改。

有了這些java類之後,我們就可以看看在Server端是怎麼實現RPC的了。首先還是NameNode初始化的流程,會呼叫到rpcServer = createRpcServer(conf)來建立RPC server。下面看看NameNodeRpcServer的建構函式裡都做了哪些工作:

public NameNodeRpcServer(Configuration conf, NameNode nn)
      throws IOException {
    this.nn = nn;
    this.namesystem = nn.getNamesystem();
    this.metrics = NameNode.getNameNodeMetrics();

    int handlerCount =
      conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY,
                  DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
    InetSocketAddress socAddr = nn.getRpcServerAddress(conf);
    //設定ProtolEngine,目前只支援PB協議。表示接收到的RPC協議如果是ClientNamenodeProtocolPB,
    //那麼處理這個RPC協議的引擎是ProtobufRpcEngine
    RPC.setProtocolEngine(conf,ClientNamenodeProtocolPB.class,ProtobufRpcEngine.class);
    //宣告一個ClientNamenodeProtocolServerSideTranslatorPB,
    //這個類負責把Server接收到的PB格式物件的資料,拼裝成NameNode內村中的資料型別,
    //呼叫NameNodeRpcServer類中相應的邏輯,然後再把執行結果拼裝成PB格式。
    ClientNamenodeProtocolServerSideTranslatorPB
    clientProtocolServerTranslator =
         new ClientNamenodeProtocolServerSideTranslatorPB(this);
    BlockingService clientNNPbService = ClientNamenodeProtocol.
         newReflectiveBlockingService(clientProtocolServerTranslator);

    DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator =
        new DatanodeProtocolServerSideTranslatorPB(this);
    BlockingService dnProtoPbService = DatanodeProtocolService
        .newReflectiveBlockingService(dnProtoPbTranslator);

    NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator =
        new NamenodeProtocolServerSideTranslatorPB(this);
	  BlockingService NNPbService = NamenodeProtocolService
          .newReflectiveBlockingService(namenodeProtocolXlator);

    RefreshAuthorizationPolicyProtocolServerSideTranslatorPB refreshAuthPolicyXlator =
        new RefreshAuthorizationPolicyProtocolServerSideTranslatorPB(this);
    BlockingService refreshAuthService = RefreshAuthorizationPolicyProtocolService
        .newReflectiveBlockingService(refreshAuthPolicyXlator);

    RefreshUserMappingsProtocolServerSideTranslatorPB refreshUserMappingXlator =
        new RefreshUserMappingsProtocolServerSideTranslatorPB(this);
    BlockingService refreshUserMappingService = RefreshUserMappingsProtocolService
        .newReflectiveBlockingService(refreshUserMappingXlator);

    GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator =
        new GetUserMappingsProtocolServerSideTranslatorPB(this);
    BlockingService getUserMappingService = GetUserMappingsProtocolService
        .newReflectiveBlockingService(getUserMappingXlator);

    HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator =
        new HAServiceProtocolServerSideTranslatorPB(this);
    BlockingService haPbService = HAServiceProtocolService
        .newReflectiveBlockingService(haServiceProtocolXlator);

    WritableRpcEngine.ensureInitialized();

    InetSocketAddress dnSocketAddr = nn.getServiceRpcServerAddress(conf);
    if (dnSocketAddr != null) {
      int serviceHandlerCount =
        conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
                    DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
      // Add all the RPC protocols that the namenode implements
      this.serviceRpcServer =
          RPC.getServer(org.apache.hadoop.hdfs.protocolPB.
              ClientNamenodeProtocolPB.class, clientNNPbService,
          dnSocketAddr.getHostName(), dnSocketAddr.getPort(),
          serviceHandlerCount,
          false, conf, namesystem.getDelegationTokenSecretManager());
      DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
          serviceRpcServer);
      DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
          serviceRpcServer);
      DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
          serviceRpcServer);
      DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class,
          refreshAuthService, serviceRpcServer);
      DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class,
          refreshUserMappingService, serviceRpcServer);
      DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,
          getUserMappingService, serviceRpcServer);

      this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
      nn.setRpcServiceServerAddress(conf, serviceRPCAddress);
    } else {
      serviceRpcServer = null;
      serviceRPCAddress = null;
    }
    // Add all the RPC protocols that the namenode implements
    this.clientRpcServer = RPC.getServer(
        org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class,
        clientNNPbService, socAddr.getHostName(),
            socAddr.getPort(), handlerCount, false, conf,
            namesystem.getDelegationTokenSecretManager());
    DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
        clientRpcServer);
    DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
        clientRpcServer);
    DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
        clientRpcServer);
    DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class,
        refreshAuthService, clientRpcServer);
    DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class,
        refreshUserMappingService, clientRpcServer);
    DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,
        getUserMappingService, clientRpcServer);

    // set service-level authorization security policy
    if (serviceAuthEnabled =
          conf.getBoolean(
            CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
      this.clientRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
      if (this.serviceRpcServer != null) {
        this.serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
      }
    }

    // The rpc-server port can be ephemeral... ensure we have the correct info
    this.clientRpcAddress = this.clientRpcServer.getListenerAddress();
    nn.setRpcServerAddress(conf, clientRpcAddress);

    this.minimumDataNodeVersion = conf.get(
        DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY,
        DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT);
  }

ClientNamenodeProtocol是protoc編譯生成的ClientNamenodeProtocolProtos類中的inner class。

public static com.google.protobuf.BlockingService
       newReflectiveBlockingService(final BlockingInterface impl) {
	……
       }

這個方法也是由protoc編譯器自動生成的。這個方法會返回一個com.google.protobuf.BlockingService型別的物件,這種型別的物件定義了RPC的各種服務,後面會講。

this.clientRpcServer = RPC.getServer(
        org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class,
        clientNNPbService, socAddr.getHostName(),
        socAddr.getPort(), handlerCount, false, conf,
        namesystem.getDelegationTokenSecretManager());

這個RPC.getServer()函式生成一個Server物件,負責接收網路連線,讀取資料,呼叫處理資料函式,返回結果。這個Server物件裡有Listener, Handler, Responder內部類,分別開啟多個執行緒負責監聽、讀取、處理和返回結果。前兩個引數表示如果RPC傳送過來的是ClientNamenodeProtocolPB協議,那麼負責處理這個協議的服務(com.google.protobuf.BlockingService型別的物件)就是clientNNPbService。

這個RPC.getServer()會經過層層呼叫,因為現在預設的RPCEngine是ProtobufRpcEngine(ProtobufRpcEngine.java),就會呼叫到下面這個函式,在這生成了一個Server物件,就是用於接收client端RPC請求,處理,回覆的Server。這個Server物件是一個純粹的網路服務的Server,在RPC中起到基礎網路IO服務的作用。

public RPC.Server getServer(Class<?> protocol, Object protocolImpl,
      String bindAddress, int port, int numHandlers, int numReaders,
      int queueSizePerHandler, boolean verbose, Configuration conf,
      SecretManager<? extends TokenIdentifier> secretManager,
      String portRangeConfig)
      throws IOException {
    return new Server(protocol, protocolImpl, conf, bindAddress, port,
        numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
        portRangeConfig);
  }

現在該用到的東西都生成好了,就要看看client端來了一個RPC請求之後,Server端是怎麼處理的呢?

Server裡的Reader執行緒也是基於Selector的非同步IO模式,每次Select選出一個SelectionKey之後,會呼叫SelectionKey.attachment()把這個SelectionKey所attach的Connection物件獲取,然後執行對應的readAndProcess()方法,把這個SelectionKey所對應的管道上的網路IO資料讀入緩衝區。readAndProcess()方法會層層呼叫到Server.processData()方法,在這個方法內部,會把剛才從網路IO中讀取的資料反序列化成物件rpcRequest物件。rpcRequest物件的型別是繼承自Writable型別的子類的物件,也就是說可以序列化/反序列化的類。這裡rpcRequest物件裡包含的RPC請求的內容物件是由.proto檔案中Message生成的類,也就是說PB框架自動編譯出來的類,後面可以通過呼叫這個類的get方法獲取RPC中真正傳輸的資料。之後把生成的rpcRequest物件放到一個Call物件裡面,再把Call物件放到佇列Server.callQueue裡面。至此網路伺服器的Reader執行緒做的工作就OK了。

下面看看Handler執行緒是怎麼處理的。Handler執行緒預設有10個,所以處理邏輯是多執行緒的。每個Handler執行緒會從剛才提到的callQueue中取一個Call物件,然後呼叫Server.call()方法執行這個Call物件中蘊含的RPC請求。Server.call()->RPC.Server.call()->Server.getRpcInvoker()->ProtobufRpcInvoker.call()在最後這個call()函式裡面真正執行嘍。。。。重點看這個函式,首先校驗這個請求發過來的資料是不是合理的。然後就是獲取實現這個協議的服務。實現協議的服務在初始化的時候已經註冊過了,就是前面說的那個com.google.protobuf.BlockingService型別的物件,例如:

BlockingService clientNNPbService = ClientNamenodeProtocol.
         newReflectiveBlockingService(clientProtocolServerTranslator);

這個就是實現Client和NameNode之間的ClientNamenodeProtocol協議的服務。當然還有dnProtoPbService, NNPbService, refreshAuthService, refreshUserMappingService, haPbService等等這些不同的服務。

這個Service獲取了之後,通過呼叫這句程式碼

result = service.callBlockingMethod(methodDescriptor, null, param);

就會執行這個RPC請求的邏輯。

再往深入執行就要涉及到google protocol buffer內部的東西了,這個service物件會把相應的方法呼叫轉移到一個繼承自BlockingInterface介面的實現類上。Service的真正實現類就是clientProtocolServerTranslator,是newReflectiveBlockingService()這個函式的引數。

BlockingService clientNNPbService = ClientNamenodeProtocol.
         newReflectiveBlockingService(clientProtocolServerTranslator);

這個初始化過程中的引數,也就是service.callBlockingMethod()真正呼叫的是clientProtocolServerTranslator中對應的方法。這一點可以通過由protoc自動編譯生成的程式碼中看出:

public static com.google.protobuf.BlockingService
        newReflectiveBlockingService(final BlockingInterface impl) {
      return new com.google.protobuf.BlockingService() {
        public final com.google.protobuf.Descriptors.ServiceDescriptor
            getDescriptorForType() {
          return getDescriptor();
        }

        public final com.google.protobuf.Message callBlockingMethod(
            com.google.protobuf.Descriptors.MethodDescriptor method,
            com.google.protobuf.RpcController controller,
            com.google.protobuf.Message request)
            throws com.google.protobuf.ServiceException {
          if (method.getService() != getDescriptor()) {
            throw new java.lang.IllegalArgumentException(
              "Service.callBlockingMethod() given method descriptor for " +
              "wrong service type.");
          }
          switch(method.getIndex()) {
            case 0:
              return impl.getBlockLocations(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto)request);
            case 1:
              return impl.getServerDefaults(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto)request);
            case 2:
              return impl.create(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateRequestProto)request);
            case 3:
              return impl.append(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto)request);
            ……
}
……
}

上面就是proto編譯生成的ClientNamenodeProtocolProtos.java檔案,從中可以看出對callBlockingMethod()方法的呼叫都是轉移到BlockingInterface impl上面了。

然後我們看看clientProtocolServerTranslator是怎麼進一步執行的。下面以getBlockLocations()函式為例說明:

public GetBlockLocationsResponseProto getBlockLocations(
      RpcController controller, GetBlockLocationsRequestProto req)
      throws ServiceException {
    try {
      //下面這個server是由NameNodeRpcServer類生成的物件,定義了HDFS元資料操作邏輯。
      LocatedBlocks b = server.getBlockLocations(req.getSrc(), req.getOffset(),
          req.getLength());
      //由於server返回的是NameNode記憶體中的資料結構,要把這個結果通過RPC傳回client端,
      //那麼我們需要利用PB框架提供的對應Message的Builder類,把記憶體中的資料結構通過這個介面序列化。
      Builder builder = GetBlockLocationsResponseProto
          .newBuilder();
      if (b != null) {
        builder.setLocations(PBHelper.convert(b)).build();
      }
      return builder.build();
    } catch (IOException e) {
      throw new ServiceException(e);
    }
  }

至此,Hadoop的RPC流程Server端已經分析結束,不過這個是正確執行的流程。如果中間丟擲了異常呢?還是以上面這個getBlockLocations()函式為例,如果元資料操作邏輯NameNodeRpcServer裡面丟擲IOException,那麼它都會把它封裝成ServiceException,然後一路傳遞給client端。在client端,會通過ProtobufHelper.getRemoteException()把封裝在ServiceException中的IOException獲取出來。