Storm的分散式的遠端方法呼叫(DRPC)
阿新 • • 發佈:2019-01-05
一、DRPC就是分散式的遠端方法呼叫。
在Storm裡面引入DRPC主要是利用storm的實時計算能力來並行化CPU密集型的計算任務,DRPC的storm topology以函式的引數流作為資料,而把這些函式呼叫的返回值作為topology的輸出流
DRPC其實不能算是storm本身的特性,他是通過組合storm的stream、spout、bolt、topology而成的一種模式,本來應該把DRPC單獨打成一個包,但是DRPC實在是太有用了,所以我們把他和storm捆綁在一起
DRPC是通過一個‘DRPC Server’來實現
DRPC Server的整體工作過程如下:
1、接收一個RPC請求
2、傳送請求到storm topology
3、從storm topology 接收結果
4、把結果返回給等待的客戶端
下面例子我只用到了2個類:
由於我們使用DRPC的輸入流作為引數,也就是不需要編寫spout類,這裡直接把bolts作為靜態內部類寫到BasicDRPCTopology裡面
再寫一個遠端呼叫方法:
這樣我的程式碼就寫完了。現在我們需要對環境做一些修改。 實現DRPC步驟( 1、需要修改配置檔案內容為(分別修改每臺機器配置):
2、需要啟動storm的drpc服務(192.168.100.240啟動):
用命令啟動叢集模式
package hello.drpc1; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.StormSubmitter; import backtype.storm.drpc.LinearDRPCTopologyBuilder; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class BasicDRPCTopology { public static class ExclaimBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String input = tuple.getString(1); //System.out.println("==========" + tuple.getValue(0)); collector.emit(new Values(tuple.getValue(0), input + "!")); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "result")); } } public static void main(String[] args) throws Exception { //建立drpc例項 LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation"); //新增bolt builder.addBolt(new ExclaimBolt(), 3); Config conf = new Config(); if (args == null || args.length == 0) { LocalDRPC drpc = new LocalDRPC(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc)); for (String word : new String[]{ "hello", "goodbye" }) { System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word)); } cluster.shutdown(); drpc.shutdown(); } else { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology()); } } }
再寫一個遠端呼叫方法:
package hello.drpc1; import backtype.storm.utils.DRPCClient; public class DrpcExclam { public static void main(String[] args) throws Exception { DRPCClient client = new DRPCClient("192.168.100.240", 3772); for (String word : new String[]{ "test1", "test2" }) { System.out.println(client.execute("exclamation", word)); } } }
這樣我的程式碼就寫完了。現在我們需要對環境做一些修改。 實現DRPC步驟( 1、需要修改配置檔案內容為(分別修改每臺機器配置):
vim /user/local/apache-storm-0.9.2/conf/storm.yaml
drpc.servers:
-'192.168.100.240'
2、需要啟動storm的drpc服務(192.168.100.240啟動):
storm drpc &
3、把相應的topology上傳到storm伺服器上去
如下圖所示
用命令啟動叢集模式
4、在本地呼叫遠端topology(這裡我們可以想象test1、test2是一個非常龐大的資料流,我們可以使用這種DPRC的方式直接呼叫,這樣就非常方便了)storm jar storm04.jar hello.drpc1.BasicDRPCTopology exc