1. 程式人生 > >Storm的分散式的遠端方法呼叫(DRPC)

Storm的分散式的遠端方法呼叫(DRPC)

一、DRPC就是分散式的遠端方法呼叫。 在Storm裡面引入DRPC主要是利用storm的實時計算能力來並行化CPU密集型的計算任務,DRPC的storm topology以函式的引數流作為資料,而把這些函式呼叫的返回值作為topology的輸出流 DRPC其實不能算是storm本身的特性,他是通過組合storm的stream、spout、bolt、topology而成的一種模式,本來應該把DRPC單獨打成一個包,但是DRPC實在是太有用了,所以我們把他和storm捆綁在一起 DRPC是通過一個‘DRPC Server’來實現 DRPC Server的整體工作過程如下: 1、接收一個RPC請求 2、傳送請求到storm topology 3、從storm topology 接收結果 4、把結果返回給等待的客戶端 下面例子我只用到了2個類: 由於我們使用DRPC的輸入流作為引數,也就是不需要編寫spout類,這裡直接把bolts作為靜態內部類寫到BasicDRPCTopology裡面
package hello.drpc1;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.drpc.LinearDRPCTopologyBuilder;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
 
public class BasicDRPCTopology {
	  
	public static class ExclaimBolt extends BaseBasicBolt {
		@Override
	    public void execute(Tuple tuple, BasicOutputCollector collector) {
	      String input = tuple.getString(1);
	      //System.out.println("==========" + tuple.getValue(0));
	      collector.emit(new Values(tuple.getValue(0), input + "!"));
	    }

	    @Override
	    public void declareOutputFields(OutputFieldsDeclarer declarer) {
	      declarer.declare(new Fields("id", "result"));
	    }
    }

	  public static void main(String[] args) throws Exception {
		  	//建立drpc例項
		    LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
		    //新增bolt
		    builder.addBolt(new ExclaimBolt(), 3);
		    Config conf = new Config();
		    if (args == null || args.length == 0) {
			      LocalDRPC drpc = new LocalDRPC();
			      LocalCluster cluster = new LocalCluster();
			      cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
			      for (String word : new String[]{ "hello", "goodbye" }) {
			    	  System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word));
			      }
			
			      cluster.shutdown();
			      drpc.shutdown();
		    }
		    else {
			      conf.setNumWorkers(3);
			      StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());
		    }
	  }
}

再寫一個遠端呼叫方法:
package hello.drpc1;

import backtype.storm.utils.DRPCClient;

public class DrpcExclam {

	public static void main(String[] args) throws Exception {
		DRPCClient client = new DRPCClient("192.168.100.240", 3772);
	      for (String word : new String[]{ "test1", "test2" }) {
	    	  System.out.println(client.execute("exclamation", word));
	      }
	}
}

這樣我的程式碼就寫完了。現在我們需要對環境做一些修改。 實現DRPC步驟( 1、需要修改配置檔案內容為(分別修改每臺機器配置):
   vim /user/local/apache-storm-0.9.2/conf/storm.yaml
    drpc.servers:
               -'192.168.100.240'

2、需要啟動storm的drpc服務(192.168.100.240啟動):
 storm drpc &
3、把相應的topology上傳到storm伺服器上去 如下圖所示

用命令啟動叢集模式
storm jar storm04.jar hello.drpc1.BasicDRPCTopology exc
4、在本地呼叫遠端topology(這裡我們可以想象test1、test2是一個非常龐大的資料流,我們可以使用這種DPRC的方式直接呼叫,這樣就非常方便了)