14.7 DRPC
什麼是DRPC: --是分散式遠端呼叫
– RPC(Remote Procedure Call Protocol)——遠端過程呼叫協議
– Distributed RPC:rpc請求流式、並進行處理
– RPC請求引數當做輸入流,結果當做輸出流
– 利用storm的分散式進行處理機制和能力
– 藉助DRPC server接收請求、返回相應
Storm只能獲取資料,不能接請求和發響應,所以這裡藉助一個DRPC Server來幫 助完成
DRPC把大量請求分散式的去做,一次請求如果序列的話可能會比較慢,我並行的來 處理,另一方面通過來降低平均一次請求的時間,解決了響應的吞吐
配置DRPC:apache-storm-0.9.5/conf/storm.yaml
修改drpc.servers配置,同步到其他節點
drpc.servers:
- "node2"
#啟動DRPC server:
1,首先啟動storm叢集
, 2,然後:./bin/storm drpc >> ./logs/drpc.out 2>&1 & 在主節點上啟動就行
在叢集建立DRPC:給這個方法打jar包,放到storm叢集中執行:
./bin/storm jar /opt/local/drpc.jar com.bjsxt.basic.drpc.ManualDRPC drpc
package com.bjsxt.basic.drpc;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.drpc.DRPCSpout;
import backtype.storm.drpc.ReturnResults;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
/**
* 在叢集建立DRPC
* @author root
*
*/
public class ManualDRPC {
public static class ExclamationBolt extends BaseBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("result", "return-info"));
}
//可以寫自己的邏輯
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String arg = tuple.getString(0);
Object retInfo = tuple.getValue(1);
collector.emit(new Values(arg + "!!!", retInfo));
}
}
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
LocalDRPC drpc = new LocalDRPC();
if (args.length > 0) {
DRPCSpout spout = new DRPCSpout("exclamation");
builder.setSpout("drpc", spout);
builder.setBolt("exclaim", new ExclamationBolt(), 3)
.shuffleGrouping("drpc");
builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping(
"exclaim");
Config conf = new Config();
try {
StormSubmitter.submitTopology(args[0], conf,
builder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
} else {
DRPCSpout spout = new DRPCSpout("exclamation", drpc);
builder.setSpout("drpc", spout);
builder.setBolt("exclaim", new ExclamationBolt(), 3)
.shuffleGrouping("drpc");
builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping(
"exclaim");
LocalCluster cluster = new LocalCluster();
Config conf = new Config();
cluster.submitTopology("exclaim", conf, builder.createTopology());
}
}
}
Java建立客戶端連線DRPC:DRPC在叢集上建立之後,eclipse可以執行
package com.bjsxt.basic.drpc;
import org.apache.thrift7.TException;
import backtype.storm.generated.DRPCExecutionException;
import backtype.storm.utils.DRPCClient;
/**
* 建立客戶端訪問DRPC
* @author root
*
*/
public class MyDRPCclient {
/**
* @param args
*/
public static void main(String[] args) {
DRPCClient client = new DRPCClient("node2", 3772);
try {
String result = client.execute("exclamation", "hello ");
System.out.println(result);
} catch (TException e) {
e.printStackTrace();
} catch (DRPCExecutionException e) {
e.printStackTrace();
}
}
}