Storm 與Kafka 整合
這裡的目標是kafka 負責生產資料,storm 消費資料並將結果輸出
這裡用的是引進別人家寫的整合程式碼,因為使用的人也比較多,下面是專案地址
下載、解壓以及將這個目錄下的程式碼新增進專案
將kafka 和 storm 的JAR 新增進專案,作為依賴jar 包
然後新增com.netflix.curator 的相關包括client、framework和recipes
最新的所有com.google.common類,下載地址
這樣storm-kafka-0.8-plus專案應該就不會報錯了。
二、kafka 生產者的建立
在我的這篇文章裡3.6、Producer JAVA API,有生產者的例子,可以拿來直接用。
三、建立消費 kafka 資料的Topology
storm-kafka-0.8-plus 給我們寫了個測試程式碼
地址是:
程式碼如下:
package storm.kafka; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.StormTopology; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; public class KafkaSpoutTestTopology { public static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutTestTopology.class); public static class PrinterBolt extends BaseBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public void execute(Tuple tuple, BasicOutputCollector collector) { LOG.info(tuple.toString()); } } private final BrokerHosts brokerHosts; public KafkaSpoutTestTopology(String kafkaZookeeper) { brokerHosts = new ZkHosts(kafkaZookeeper); } public StormTopology buildTopology() { SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "storm-sentence", "", "storm"); kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("words", new KafkaSpout(kafkaConfig), 10); builder.setBolt("print", new PrinterBolt()).shuffleGrouping("words"); return builder.createTopology(); } public static void main(String[] args) throws Exception { String kafkaZk = args[0]; KafkaSpoutTestTopology kafkaSpoutTestTopology = new KafkaSpoutTestTopology(kafkaZk); Config config = new Config(); config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 2000); StormTopology stormTopology = kafkaSpoutTestTopology.buildTopology(); if (args != null && args.length > 1) { String name = args[1]; String dockerIp = args[2]; config.setNumWorkers(2); config.setMaxTaskParallelism(5); config.put(Config.NIMBUS_HOST, dockerIp); config.put(Config.NIMBUS_THRIFT_PORT, 6627); config.put(Config.STORM_ZOOKEEPER_PORT, 2181); config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(dockerIp)); StormSubmitter.submitTopology(name, config, stormTopology); } else { config.setNumWorkers(2); config.setMaxTaskParallelism(2); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("kafka", config, stormTopology); } } }
這裡清晰的寫出了建立一個與kafka整合的storm Topology,觀察main 函式,從上往下看:
下面是關於zookeeper的設定以及spout和bolt 的設定
String kafkaZk = args[0];
KafkaSpoutTestTopology kafkaSpoutTestTopology = new KafkaSpoutTestTopology(kafkaZk);
StormTopology stormTopology = kafkaSpoutTestTopology.buildTopology();
下面的語句中,storm-sentence是話題,下面的語句是要求在
SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, " storm-sentence ", "", "storm");
builder.setSpout("words", new KafkaSpout(kafkaConfig), 10); 這裡是設定spout,負責從kafka消費資料,其中word 是spout 名稱,KafkaSpout 由storm-kafka-0.8-plus 提供,10為併發數。
builder.setBolt("print", new PrinterBolt()).shuffleGrouping("words"); 這個是設定spout 接下去的bolt, PrinterBolt看名稱應該負責列印bolt的資料的類。shuffleGrouping("words")表示資料是採用隨機模式。後面接的資料來自與叫做words的spout
下面是設定Topology的相關設定
Config config = new Config(); 初始化一個storm設定
config.setNumWorkers(2); 這個代表分配2個Worker。
StormSubmitter.submitTopology(args[0], config, builder.createTopology()); 這個表示想Storm 伺服器提交Topology任務,其中第一個引數是Topology的name.
config.setMaxTaskParallelism(3); 一個work的最大併發數為3
LocalCluster cluster = new LocalCluster(); 開啟Storm本地模式
cluster.submitTopology("special-topology", config, builder.createTopology()); 在本地網模式下提交storm任務。
cluster.shutdown(); 關閉Storm本地模式。
下面是我修改後的指令碼
import com.google.common.collect.ImmutableList;
import com.ks.bolt.CounterBolt;
import com.ks.bolt.DateCutBolt;
import com.ks.bolt.InsertMysqlBolt;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
public class CountTopology {
/**
* @param args
*/
public static void main(String[] args) {
try{
String kafkaZookeeper = "carl:2181,slave1:2181,slave2:2181";
BrokerHosts brokerHosts = new ZkHosts(kafkaZookeeper);
SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "test", "/storm", "stormid");
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.zkServers = ImmutableList.of("carl","slave1","slave2");
kafkaConfig.zkPort = 2181;
//kafkaConfig.forceFromStart = true;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new KafkaSpout(kafkaConfig), 2);
//*************************下面是所有處理邏輯,只關注這個*****************************
builder.setBolt("datecut", new CounterBolt(),1).shuffleGrouping("spout");
//*************************下面是所有處理邏輯,只關注這個*****************************
Config config = new Config();
config.setDebug(true);
if(args!=null && args.length > 0) {
config.setNumWorkers(2);
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} else {
config.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("special-topology", config, builder.createTopology());
Thread.sleep(500000);
cluster.shutdown();
}
}catch (Exception e) {
e.printStackTrace();
}
}
}
這裡在本地模式下讓他執行20秒鐘自動結束,因為這個比較耗資源。注意以下這句,
SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "test", "/storm", "stormid");
請記得在zookeeper 根目錄下面建立資料夾storm,然後在storm 資料夾下面繼續建立資料夾stormid 用於存放kafka資訊資料
上面的Topology 設定了bolt 為CounterBolt,因此還要建一個CounterBolt的bolt 類。
這裡設定了,執行jar包敲引數為提交到storm伺服器,不敲引數則是執行storm本地模式。
四、建立資料輸出的Bolt
這裡實現一個十分簡單的bolt 類
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
public class CounterBolt extends BaseBasicBolt {
/**
*
*/
private static final long serialVersionUID = -5508421065181891596L;
private static long counter = 0;
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println("msg = "+tuple.getString(0)+" -------------counter = "+(counter++));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
}
這裡很簡單就是將bolt 獲取的資料進行簡單的輸出,並統計接收到的資料條目數。這裡繼續BaseBasicBolt 類,因為這樣開發會比較簡單。因為這個是唯一的bolt,沒有輸出,因此在declareOutputFields 方法中不需要宣告output。
System.out.println("msg = "+ tuple.getString(0)+"-------------counter = "+(counter++));
這裡tuple就是這個bolt 從上一個spout獲取的資料集合。
這裡是控制檯輸出,因此請用本地模式進行除錯。
打包上傳到伺服器,執行
Storm jar jarname CountTopology 回車,會看到他在等待資料傳入。
這個時候執行kafka消費者程式,將資料輸出,則會看到storm 會迅速輸出資料和統計數目。
這裡測試不寫了。