Storm學習筆記(7)- DRPC
阿新 • • 發佈:2019-01-10
文章目錄
官方網站:
http://storm.apache.org/releases/1.2.2/Distributed-RPC.html
RPC原理圖解
基於Hadoop的RPC實現
新增依賴
<!--新增cloudera的repository-->
<repositories>
<repository>
< id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
</repository>
</repositories>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version> 2.6.0-cdh5.7.0</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
/**
* 使用者的服務介面
*/
public interface UserService {
public static final long versionID = 88888888;
/**
* 新增使用者
* @param name 名字
* @param age 年齡
*/
public void addUser(String name, int age);
}
/**
* 使用者的服務介面實現類
*/
public class UserServiceImpl implements UserService{
public void addUser(String name, int age) {
System.out.println("From Server Invoked: add user success... , name is :" + name);
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
/**
* RPC Server服務
*/
public class RPCServer {
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration();
RPC.Builder builder = new RPC.Builder(configuration);
// Java Builder模式
RPC.Server server = builder.setProtocol(UserService.class)
.setInstance(new UserServiceImpl())
.setBindAddress("localhost")
.setPort(9999)
.build();
server.start();
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import java.net.InetSocketAddress;
/**
* RPC客戶端
*/
public class RPCClient {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
long clientVersion = 88888888;
UserService userService = RPC.getProxy(UserService.class,clientVersion,
new InetSocketAddress("localhost",9999),
configuration);
userService.addUser("lisi", 30);
System.out.println("From client... invoked");
RPC.stopProxy(userService);
}
}
測試:先啟動服務端;再啟動客戶端。
Storm DRPC概述
分散式RPC (DRPC)背後的思想是使用Storm並行地計算真正強大的函式。Storm topology 接受函式引數流作為輸入,併為每個函式呼叫發出結果流。
與其說DRPC是Storm的一個特性,不如說它是從Storm的streams, spouts, bolts, topologies的原語中表達出來的模式。DRPC可以從Storm中打包為一個單獨的庫,但是它非常有用,所以與Storm繫結在一起。
分散式RPC由“DRPC伺服器”協調(Storm附帶了一個實現)。DRPC伺服器協調接收RPC請求、將請求傳送到Storm topology、接收Storm topology的結果並將結果傳送回等待的客戶機。從客戶機的角度來看,分散式RPC呼叫看起來就像常規RPC呼叫。例如,下面是客戶機如何使用引數“http://twitter.com”計算“reach”函式的結果:
Config conf = new Config();
conf.put("storm.thrift.transport", "org.apache.storm.security.auth.plain.PlainSaslTransportPlugin");
conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 3);
conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL, 10);
conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING, 20);
DRPCClient client = new DRPCClient(conf, "drpc-host", 3772);
String result = client.execute("reach", "http://twitter.com");
客戶機向DRPC伺服器傳送要執行的函式的名稱和引數。實現該函式的拓撲使用DRPCSpout從DRPC伺服器接收函式呼叫流。每個函式呼叫都由DRPC伺服器用惟一的id標記。topology計算結果和最終拓topology為ReturnResults bolts連線到DRPC伺服器和函式呼叫的id的結果。然後DRPC伺服器使用id匹配客戶機正在等待的結果,解除阻塞等待的客戶機,並將結果傳送給它。
本地DRPC
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.LocalDRPC;
import org.apache.storm.drpc.LinearDRPCTopologyBuilder;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
/**
* 本地DRPC
*/
public class LocalDRPCTopology {
public static class MyBolt extends BaseRichBolt {
private OutputCollector outputCollector;
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.outputCollector = collector;
}
public void execute(Tuple input) {
Object requestId = input.getValue(0); //請求的id
String name = input.getString(1); //請求的引數
/**
* TODO... 業務邏輯處理
*/
String result = "add user: " + name ;
this.outputCollector.emit(new Values(requestId, result));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id","result"));
}
}
public static void main(String[] args) {
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("addUser");
builder.addBolt(new MyBolt());
LocalCluster localCluster = new LocalCluster();
LocalDRPC drpc = new LocalDRPC();
localCluster.submitTopology("local-drpc", new Config(),
builder.createLocalTopology(drpc));
String result = drpc.execute("addUser", "zhangsan");
System.err.println("From client: " + result);
localCluster.shutdown();
drpc.shutdown();
}
}
遠端DRPC
改動ymal檔案
drpc.servers:
- "hadoop000"
啟動服務
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.LocalDRPC;
import org.apache.storm.StormSubmitter;
import org.apache.storm.drpc.LinearDRPCTopologyBuilder;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
/**
* 遠端DRPC
*/
public class RemoteDRPCTopology {
public static class MyBolt extends BaseRichBolt {
private OutputCollector outputCollector;
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.outputCollector = collector;
}
public void execute(Tuple input) {
Object requestId = input.getValue(0); //請求的id
String name = input.getString(1); //請求的引數
/**
* TODO... 業務邏輯處理
*/
String result = "add user: " + name ;
this.outputCollector.emit(new Values(requestId, result));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id","result"));
}
}
public static void main(String[] args) {
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("addUser");
builder.addBolt(new MyBolt());
try {
StormSubmitter.submitTopology("drpc-topology",
new Config(),
builder.createRemoteTopology());
} catch (Exception e) {
e.printStackTrace();
}
}
}
server端的程式碼打包上傳伺服器
import org.apache.storm.Config;
import org.apache.storm.utils.DRPCClient;
/**
* Remote DRPC客戶端測試類.
*/
public class RemoteDRPCClient {
public static void main(String[] args) throws Exception {
Config config = new Config();
// 這一組引數
config.put("storm.thrift.transport", "org.apache.storm.security.auth.SimpleTransportPlugin");
config.put(Config.STORM_NIMBUS_RETRY_TIMES, 3);
config.put(Config.STORM_NIMBUS_RETRY_INTERVAL, 10);
config.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING, 20);
config.put(Config.DRPC_MAX_BUFFER_SIZE, 1048576);
DRPCClient client = new DRPCClient(config,"hadoop000", 3772);
String result = client.execute("addUser", "wangwu");
System.out.println("Client invoked: " + result);
}
}