1. 程式人生 > >Storm學習筆記(7)- DRPC

Storm學習筆記(7)- DRPC

文章目錄


官方網站:
http://storm.apache.org/releases/1.2.2/Distributed-RPC.html

RPC原理圖解

在這裡插入圖片描述

基於Hadoop的RPC實現

新增依賴

<!--新增cloudera的repository-->
<repositories>
    <repository>
        <
id
>
cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos</url> </repository> </repositories> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>
2.6.0-cdh5.7.0</version> <exclusions> <exclusion> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </exclusion> </exclusions> </dependency>
/**
 * 使用者的服務介面
 */
public interface UserService {

    public static
final long versionID = 88888888; /** * 新增使用者 * @param name 名字 * @param age 年齡 */ public void addUser(String name, int age); }
/**
 * 使用者的服務介面實現類
 */
public class UserServiceImpl implements UserService{

    public void addUser(String name, int age) {
        System.out.println("From Server Invoked: add user success... , name is :" + name);
    }
}

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;

/**
 * RPC Server服務
 */
public class RPCServer {

    public static void main(String[] args) throws Exception{

        Configuration configuration = new Configuration();

        RPC.Builder builder = new RPC.Builder(configuration);

        // Java Builder模式
        RPC.Server server = builder.setProtocol(UserService.class)
        .setInstance(new UserServiceImpl())
        .setBindAddress("localhost")
        .setPort(9999)
        .build();

        server.start();

    }
}

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;

import java.net.InetSocketAddress;

/**
 * RPC客戶端
 */
public class RPCClient {

    public static void main(String[] args) throws Exception {

        Configuration configuration = new Configuration();

        long clientVersion = 88888888;

        UserService userService = RPC.getProxy(UserService.class,clientVersion,
                new InetSocketAddress("localhost",9999),
                configuration);

        userService.addUser("lisi", 30);
        System.out.println("From client... invoked");

        RPC.stopProxy(userService);

    }
}

測試:先啟動服務端;再啟動客戶端。

Storm DRPC概述

分散式RPC (DRPC)背後的思想是使用Storm並行地計算真正強大的函式。Storm topology 接受函式引數流作為輸入,併為每個函式呼叫發出結果流。

與其說DRPC是Storm的一個特性,不如說它是從Storm的streams, spouts, bolts, topologies的原語中表達出來的模式。DRPC可以從Storm中打包為一個單獨的庫,但是它非常有用,所以與Storm繫結在一起。

分散式RPC由“DRPC伺服器”協調(Storm附帶了一個實現)。DRPC伺服器協調接收RPC請求、將請求傳送到Storm topology、接收Storm topology的結果並將結果傳送回等待的客戶機。從客戶機的角度來看,分散式RPC呼叫看起來就像常規RPC呼叫。例如,下面是客戶機如何使用引數“http://twitter.com”計算“reach”函式的結果:

Config conf = new Config();
        conf.put("storm.thrift.transport", "org.apache.storm.security.auth.plain.PlainSaslTransportPlugin");
        conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 3);
        conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL, 10);
        conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING, 20);
DRPCClient client = new DRPCClient(conf, "drpc-host", 3772);
String result = client.execute("reach", "http://twitter.com");

在這裡插入圖片描述
客戶機向DRPC伺服器傳送要執行的函式的名稱和引數。實現該函式的拓撲使用DRPCSpout從DRPC伺服器接收函式呼叫流。每個函式呼叫都由DRPC伺服器用惟一的id標記。topology計算結果和最終拓topology為ReturnResults bolts連線到DRPC伺服器和函式呼叫的id的結果。然後DRPC伺服器使用id匹配客戶機正在等待的結果,解除阻塞等待的客戶機,並將結果傳送給它。

本地DRPC

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.LocalDRPC;
import org.apache.storm.drpc.LinearDRPCTopologyBuilder;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
 * 本地DRPC
 */
public class LocalDRPCTopology {


    public static class MyBolt extends BaseRichBolt {

        private OutputCollector outputCollector;

        public void prepare(Map stormConf, TopologyContext context,
                            OutputCollector collector) {
            this.outputCollector = collector;
        }

        public void execute(Tuple input) {

            Object requestId = input.getValue(0); //請求的id
            String name = input.getString(1);  //請求的引數

            /**
             * TODO... 業務邏輯處理
             */

            String result = "add user: " + name ;

            this.outputCollector.emit(new Values(requestId, result));
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id","result"));
        }
    }


    public static void main(String[] args) {
        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("addUser");
        builder.addBolt(new MyBolt());

        LocalCluster localCluster = new LocalCluster();
        LocalDRPC drpc = new LocalDRPC();
        localCluster.submitTopology("local-drpc", new Config(),
                builder.createLocalTopology(drpc));

        String result = drpc.execute("addUser", "zhangsan");
        System.err.println("From client: " + result);

        localCluster.shutdown();
        drpc.shutdown();
    }

}

遠端DRPC

改動ymal檔案

drpc.servers:
  - "hadoop000"

啟動服務
在這裡插入圖片描述
在這裡插入圖片描述

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.LocalDRPC;
import org.apache.storm.StormSubmitter;
import org.apache.storm.drpc.LinearDRPCTopologyBuilder;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
 * 遠端DRPC
 */
public class RemoteDRPCTopology {


    public static class MyBolt extends BaseRichBolt {

        private OutputCollector outputCollector;

        public void prepare(Map stormConf, TopologyContext context,
                            OutputCollector collector) {
            this.outputCollector = collector;
        }

        public void execute(Tuple input) {

            Object requestId = input.getValue(0); //請求的id
            String name = input.getString(1);  //請求的引數

            /**
             * TODO... 業務邏輯處理
             */

            String result = "add user: " + name ;

            this.outputCollector.emit(new Values(requestId, result));
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id","result"));
        }
    }


    public static void main(String[] args) {
        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("addUser");
        builder.addBolt(new MyBolt());
        try {
            StormSubmitter.submitTopology("drpc-topology",
                    new Config(),
                    builder.createRemoteTopology());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

server端的程式碼打包上傳伺服器

import org.apache.storm.Config;
import org.apache.storm.utils.DRPCClient;

/**
 * Remote DRPC客戶端測試類.
 */
public class RemoteDRPCClient {

    public static void main(String[] args) throws Exception {
        Config config = new Config();

        // 這一組引數
        config.put("storm.thrift.transport", "org.apache.storm.security.auth.SimpleTransportPlugin");
        config.put(Config.STORM_NIMBUS_RETRY_TIMES, 3);
        config.put(Config.STORM_NIMBUS_RETRY_INTERVAL, 10);
        config.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING, 20);
        config.put(Config.DRPC_MAX_BUFFER_SIZE, 1048576);

        DRPCClient client = new DRPCClient(config,"hadoop000", 3772);
        String result = client.execute("addUser", "wangwu");

        System.out.println("Client invoked: " + result);

    }
}