1. 程式人生 > >Hadoop RPC框架

Hadoop RPC框架

階段 none order 網絡異常 mat pro resp 邏輯 實現原理

1、RPC框架概述 1.1 RPC(Remote Procedure Call Protocol)——遠程過程調用協議,它是一種通過網絡從遠程計算機程序上請求服務,而不須要了解底層網絡技術的協議。

RPC協議假定某些傳輸協議的存在,如TCP或UDP。為通信程序之間攜帶信息數據。

在OSI網絡通信模型中。RPC跨越了傳輸層和應用層。RPC使得開發包括網絡分布式多程序在內的應用程序更加easy。 1.2 RPC通常採用clientserver模型,其框架主要有下面幾部分

  • 通信模塊:實現請求應該協議。主要分為同步方式和異步方式。

  • stub程序:client和server均包括stub程序,能夠看做代理程序。使得遠程函數表現的跟本地調用一樣,對用戶程序全然透明。
  • 調度程序:接受來自通信模塊的請求消息。依據標識選擇stub程序處理。並發量大一般採用線程池處理。
  • 客戶程序/服務過程:請求發出者和請求的處理者。
1.3 RPC流程圖 技術分享圖片

2、Hadoop RPC基本框架 2.1Hadoop RPC的用法見代碼
服務
public interface MyBiz extends VersionedProtocol {
??? long PROTOCOL_VERSION = 12321443L;
??? String hello(String name);
}
public class MyBizImpl implements MyBiz {
??? @Override
??? public long getProtocolVersion(String arg0, long arg1) throws IOException {
??????? return PROTOCOL_VERSION;
??? }

??? @Override
??? public String hello(String name) {
??????? System. out.println( "invoked");
??????? return "hello " + name;
??? }
}

server
public class MyServer {
??? public static final String SERVER_ADDRESS = "localhost";
??? public static final int SERVER_PORT = 12345;

??? public static void main(String[] args) throws IOException {
??????? Server server = RPC. getServer(new MyBizImpl(), SERVER_ADDRESS, SERVER_PORT , new Configuration());
??????? server.start();
??? }
}

client
public class MyClient {
??? public static void main(String[] args) throws IOException {
??????? MyBiz proxy = (MyBiz) RPC. getProxy(MyBiz.class, MyBiz.PROTOCOL_VERSION,
??????????????? new InetSocketAddress(MyServer. SERVER_ADDRESS,MyServer.SERVER_PORT),
??????????????? new Configuration());
??????? String result = proxy.hello( "5");
??????? System. out.println(result);
??????? RPC.stopProxy(proxy);
??? }
}

2.2 org.apache.hadoop.ipc.RPC類解析 RPC類主要包括三部分:
  • ClientCache(成員變量):依據用戶提供的SocketFactory來緩存Client對象。以便重用Client對象。
  • Server(內部類):繼承Server抽象類,利用反射實現了call方法。即client請求的方法和相應參數完畢方法調用。

  • Invocation(內部類):將要調用的方法名和參數打包成可序列化的對象,方便client和server之間傳遞。


2.3 client和server端的關系
  • Client-NameNode之間,當中NameNode是server
  • Client-DataNode之間。當中DataNode是server
  • DataNode-NameNode之間,當中NameNode是server
  • DataNode-DateNode之間,當中某一個DateNode是server,還有一個是client
2.4?org.apache.hadoop.ipc.Client類解析 2.4.1 Client類中主要包括:
  • Call(內部類):封裝了一個RPC請求,包括5個成員變量,唯一表示id、函數調用信息param、函數返回值value、函數異常信息error、函數完畢標識done。

    Hadoop rpc?server採用異步方式處理client請求,使得遠程過程調用的發生順序和返回順序無直接關系,而client正是通過id識別不同的函數調用。

    當client向server發送請求。僅僅需填充id和param兩個變量。其余3個變量由server端依據函數運行情況填充。

  • Connection(內部類。一個線程):是client和server之間的一個通信連接。封裝了連接先關的基本信息和操作。基本信息包括:通信連接唯一標識remoteId(ConnectionId)、與Server端通信的scoket、網絡輸入輸出流in/out、保存RPC請求的哈希表calls(Hashtable<Integer, Call>)。操作包括:addCall將一個Call對象加入到哈希表中;sendParam想server端發送RPC請求;receiveResponse從server端接收已經處理完畢的RPC請求。run調用receiveResponse方法,等待返回結果。

  • ConnectionId(內部類):連接的標記(包括server地址,協議,其它一些連接的配置項信息)
  • ParallelCall(內部類):實現並行調用的請求
  • ParallelResults(內部類):並行調用的運行結果
2.4.2 Client類中主要對外通過兩個接口,分別用於單個遠程調用和批量遠程調用。
public Writable call(Writable param, ConnectionId remoteId)? throws InterruptedException, IOException public Writable call(Writable param, InetSocketAddress addr,? Class<?

> protocol, UserGroupInformation ticket, ?????????????????????? int rpcTimeout, Configuration conf)? throws InterruptedException, IOException


2.4.3 調用流程分析,當調用call函數運行某個遠程方法時,有下面幾個步驟: 1)創建一個Connection對象,並將遠程方法調用信息封裝成Call對象,放到Connection對象中的哈希表中;
2)調用Connection類中的sendRpcRequest()方法將當前Call對象發送給Server端;
3)Server端處理完RPC請求後。將結果通過網絡返回給Client端,Client端通過receiveRpcResponse()函數獲取結果。
4)Client檢查結果處理狀態(成功還是失敗),並將相應Call對象從哈希表中刪除。
2.4.4 一個Client包括多個連接。private Hashtable<ConnectionId, Connection> connections = new Hashtable<ConnectionId, Connection>();
2.5?org.apache.hadoop.ipc.Server類解析
2.5.1 背景 Hadoop採用了Master/Slave結構,當中Master是整個系統的單點。如NameNode或JobTracker,這是制約系統性能和可擴展性的最關鍵因素之中的一個;而Master通過ipc.Server接收並處理所有Slave發送的請求,這就要求ipc.Server 將高並發和可擴展性作為設計目標。為此,ipc.Server採用了非常多提高並發處理能力的技術,主要包括線程池、事件驅動和Reactor設計模式等。這些技術均採用了JDK自帶的庫實現,這裏重點分析它是怎樣利用Reactor設計模式提高總體性能的。


2.5.2 reactor設計模式 Reactor是並發編程中的一種基於事件驅動的設計模式。它具有下面兩個特點:通過派發/分離I/O操作事件提高系統的並發性能;提供了粗粒度的並發控制,使用單線程實現,避免了復雜的同步處理。典型的Reactor實現原理如圖所看到的。

技術分享圖片
典型的Reactor模式中主要包括下面幾個角色。


  • Reactor:I/O事件的派發者。
  • Acceptor:接受來自Client的連接。建立與Client相應的Handler。並向Reactor註冊此Handler。

  • Handler:與一個Client通信的實體,並按一定的過程實現業務的處理。

    Handler內部往往會有更進一步的層次劃分,用來抽象諸如read、decode、compute、encode和send等過程。在Reactor模式中,業務邏輯被分散的I/O事件所打破。所以Handler須要有適當的機制在所需的信息還不全(讀到一半)的時候保存上下文,並在下一次I/O事件到來的時候(還有一半可讀)能繼續上次中斷的處理。

  • Reader/Sender:為了加速處理速度,Reactor模式往往構建一個存放數據處理線程的線程池,這樣數據讀出後,馬上扔到線程池中等待興許處理就可以。為此,Reactor模式一般分離Handler中的讀和寫兩個過程,分別註冊成單獨的讀事件和寫事件。並由相應的Reader和Sender線程處理。
2.5.3 java nio代碼實例
package com.sohu.tv.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/**
* NIO服務端
* @author 小路
*/
public class NIOServer {
??? //通道管理器
??? private Selector selector;

??? /**
???? * 獲得一個ServerSocket通道,並對該通道做一些初始化的工作
???? * @param port? 綁定的端口號
???? * @throws IOException
???? */
??? public void initServer(int port) throws IOException {
??????? // 獲得一個ServerSocket通道
??????? ServerSocketChannel serverChannel = ServerSocketChannel.open();
??????? // 設置通道為非堵塞
??????? serverChannel.configureBlocking(false);
??????? // 將該通道相應的ServerSocket綁定到port端口
??????? serverChannel.socket().bind(new InetSocketAddress(port));
??????? // 獲得一個通道管理器
??????? this.selector = Selector.open();
??????? //將通道管理器和該通道綁定,並為該通道註冊SelectionKey.OP_ACCEPT事件,註冊該事件後,
??????? //當該事件到達時,selector.select()會返回。假設該事件沒到達selector.select()會一直堵塞。


??????? serverChannel.register(selector, SelectionKey.OP_ACCEPT);
??? }

??? /**
???? * 採用輪詢的方式監聽selector上是否有須要處理的事件。假設有,則進行處理
???? * @throws IOException
???? */
??? @SuppressWarnings("unchecked")
??? public void listen() throws IOException {
??????? System.out.println("服務端啟動成功!

");
??????? // 輪詢訪問selector
??????? while (true) {
??????????? //當註冊的事件到達時,方法返回。否則,該方法會一直堵塞
????????????selector.select();
??????????? // 獲得selector中選中的項的叠代器,選中的項為註冊的事件
??????????? Iterator ite = this.selector.selectedKeys().iterator();
??????????? while (ite.hasNext()) {
??????????????? SelectionKey key = (SelectionKey) ite.next();
??????????????? // 刪除已選的key,以防反復處理
??????????????? ite.remove();
??????????????? // client請求連接事件
??????????????? if (key.isAcceptable()) {
??????????????????? ServerSocketChannel server = (ServerSocketChannel) key
??????????????????????????? .channel();
??????????????????? // 獲得和client連接的通道
??????????????????? SocketChannel channel = server.accept();
??????????????????? // 設置成非堵塞
??????????????????? channel.configureBlocking(false);

??????????????????? //在這裏能夠給client發送信息哦
??????????????????? channel.write(ByteBuffer.wrap(new String("向client發送了一條信息").getBytes()));
??????????????????? //在和client連接成功之後。為了能夠接收到client的信息,須要給通道設置讀的權限。
????????????????????channel.register(this.selector, SelectionKey.OP_READ);

??????????????????? // 獲得了可讀的事件
??????????????? } else if (key.isReadable()) {
??????????????????? read(key);
??????????????? }

??????????? }

??????? }
??? }
??? /**
???? * 處理讀取client發來的信息 的事件
???? * @param key
???? * @throws IOException
???? */
??? public void read(SelectionKey key) throws IOException{
??????? // server可讀取消息:得到事件發生的Socket通道
??????? SocketChannel channel = (SocketChannel) key.channel();
??????? // 創建讀取的緩沖區
??????? ByteBuffer buffer = ByteBuffer.allocate(10);
??????? channel.read(buffer);
??????? byte[] data = buffer.array();
??????? String msg = new String(data).trim();
??????? System.out.println("服務端收到信息:"+msg);
??????? ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
??????? channel.write(outBuffer);// 將消息回送給client
??? }

??? /**
???? * 啟動服務端測試
???? * @throws IOException
???? */
??? public static void main(String[] args) throws IOException {
??????? NIOServer server = new NIOServer();
??????? server.initServer(8000);
??????? server.listen();
??? }
}



package com.sohu.tv.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/**
* NIOclient
* @author 小路
*/
public class NIOClient {
??? //通道管理器
??? private Selector selector;

??? /**
???? * 獲得一個Socket通道,並對該通道做一些初始化的工作
???? * @param ip 連接的server的ip
???? * @param port? 連接的server的端口號
???? * @throws IOException
???? */
??? public void initClient(String ip,int port) throws IOException {
??????? // 獲得一個Socket通道
??????? SocketChannel channel = SocketChannel.open();
??????? // 設置通道為非堵塞
??????? channel.configureBlocking(false);
??????? // 獲得一個通道管理器
??????? this.selector = Selector.open();

??????? // client連接server,事實上方法運行並沒有實現連接,須要在listen()方法中調
??????? //用channel.finishConnect();才幹完畢連接
??????? channel.connect(new InetSocketAddress(ip,port));
??????? //將通道管理器和該通道綁定,並為該通道註冊SelectionKey.OP_CONNECT事件。


??????? channel.register(selector, SelectionKey.OP_CONNECT);
??? }

??? /**
???? * 採用輪詢的方式監聽selector上是否有須要處理的事件。假設有。則進行處理
???? * @throws IOException
???? */
??? @SuppressWarnings("unchecked")
??? public void listen() throws IOException {
??????? // 輪詢訪問selector
??????? while (true) {
??????????? selector.select();
??????????? // 獲得selector中選中的項的叠代器
??????????? Iterator ite = this.selector.selectedKeys().iterator();
??????????? while (ite.hasNext()) {
??????????????? SelectionKey key = (SelectionKey) ite.next();
??????????????? // 刪除已選的key,以防反復處理
??????????????? ite.remove();
??????????????? // 連接事件發生
??????????????? if (key.isConnectable()) {
??????????????????? SocketChannel channel = (SocketChannel) key
??????????????????????????? .channel();
??????????????????? // 假設正在連接,則完畢連接
??????????????????? if(channel.isConnectionPending()){
??????????????????????? channel.finishConnect();

??????????????????? }
??????????????????? // 設置成非堵塞
??????????????????? channel.configureBlocking(false);

??????????????????? //在這裏能夠給服務端發送信息哦
??????????????????? channel.write(ByteBuffer.wrap(new String("向服務端發送了一條信息").getBytes()));
??????????????????? //在和服務端連接成功之後,為了能夠接收到服務端的信息,須要給通道設置讀的權限。


??????????????????? channel.register(this.selector, SelectionKey.OP_READ);

??????????????????? // 獲得了可讀的事件
??????????????? } else if (key.isReadable()) {
??????????????????? read(key);
??????????????? }
??????????? }
??????? }
??? }
??? /**
???? * 處理讀取服務端發來的信息 的事件
???? * @param key
???? * @throws IOException
???? */
??? public void read(SelectionKey key) throws IOException{
??????? //和服務端的read方法一樣
??? }


??? /**
???? * 啟動client測試
???? * @throws IOException
???? */
??? public static void main(String[] args) throws IOException {
??????? NIOClient client = new NIOClient();
??????? client.initClient("localhost",8000);
??????? client.listen();
??? }

}



2.5.4 server處理流程 ? ? ?ipc.Server的主要功能是接收來自client的RPC請求,經過調用相應的函數獲取結果後,返回給相應的client。為此,ipc.Server被劃分成3個階段:接收請求、處理請求和返回結果。

(1)接收請求
? ? ?該階段主要任務是接收來自各個client的RPC請求,並將它們封裝成固定的格式(Call類)放到一個共享隊列(callQueue)中,以便進行興許處理。該階段內部又分為建立連接和接收請求兩個子階段,分別由Listener和Reader兩種線程完畢。
? ? ?整個Server僅僅有一個Listener線程,統一負責監聽來自client的連接請求。一旦有新的請求到達。它會採用輪詢的方式從線程池中選擇一個Reader線程進行處理,而Reader線程可同一時候存在多個,它們分別負責接收一部分client連接的RPC請求,至於每一個Reader線程負責哪些client連接。全然由Listener決定,當前Listener僅僅是採用了簡單的輪詢分配機制。


? ? ?Listener和Reader線程內部各自包括一個Selector對象。分別用於監聽SelectionKey.OP_ACCEPT和SelectionKey.OP_READ事件。

對於Listener線程。主循環的實現體是監聽是否有新的連接請求到達。並採用輪詢策略選擇一個Reader線程處理新連接。對於Reader線程。主循環的實現體是監聽(它負責的那部分)client連接中是否有新的RPC請求到達,並將新的RPC請求封裝成Call對象,放到共享隊列callQueue中。



(2)處理請求
? ? ?該階段主要任務是從共享隊列callQueue中獲取Call對象,運行相應的函數調用,並將結果返回給client,這所有由Handler線程完畢。
? ? ?Server端可同一時候存在多個Handler線程,它們並行從共享隊列中讀取Call對象,經運行相應的函數調用後。將嘗試著直接將結果返回給相應的client。但考慮到某些函數調用返回結果非常大或者網絡速度過慢。可能難以將結果一次性發送到client,此時Handler將嘗試著將興許發送任務交給Responder線程。

(3)返回結果
? ? ?前面提到。每一個Handler線程運行完函數調用後,會嘗試著將運行結果返回給client。但對於特殊情況。比方函數調用返回結果過大或者網絡異常情況(網速過慢),會將發送任務交給Responder線程。
? ? ?Server端僅存在一個Responder線程,它的內部包括一個Selector對象,用於監聽SelectionKey.OP_WRITE事件。當Handler沒能將結果一次性發送到client時。會向該Selector對象註冊SelectionKey.OP_WRITE事件,進而由Responder線程採用異步方式繼續發送未發送完畢的結果。
技術分享圖片

Hadoop RPC框架