storm與kafka結合
一、kafka基本概念
1、kafka是一個分散式的訊息快取系統
2、kafka叢集中的伺服器都叫做broker
3、kafka有兩類客戶端,一類叫producer(訊息生產者),一類叫做consumer(訊息消費者),客戶端和broker伺服器之間採用tcp協議連線
4、kafka中不同業務系統的訊息可以通過topic進行區分,而且每一個訊息topic都會被分割槽,以分擔訊息讀寫的負載
5、每一個分割槽都可以有多個副本,以防止資料的丟失
6、某一個分割槽中的資料如果需要更新,都必須通過該分割槽所有副本中的leader來更新
7、消費者可以分組,比如有兩個消費者組A和B,共同消費一個topic:order_info,A和B所消費的訊息不會重複
比如 order_info 中有100個訊息,每個訊息有一個id,編號從0-99,那麼,如果A組消費0-49號,B組就消費50-99號
8、消費者在具體消費某個topic中的訊息時,可以指定起始偏移量
二、kafka叢集安裝
1、解壓
2、修改server.properties
broker.id=1
zookeeper.connect=weekend05:2181,weekend06:2181,weekend07:2181
3、將zookeeper叢集啟動
4、在每一臺節點上啟動broker
bin/kafka-server-start.sh config/server.properties
5、在kafka叢集中建立一個topic
bin/kafka-topics.sh --create --zookeeper weekend05:2181 --replication-factor 3 --partitions 1 --topic order
6、用一個producer向某一個topic中寫入訊息
bin/kafka-console-producer.sh --broker-list weekend:9092 --topic order
7、用一個comsumer從某一個topic中讀取資訊
bin/kafka-console-consumer.sh --zookeeper weekend05:2181 --from-beginning --topic order
8、檢視一個topic的分割槽及副本狀態資訊
bin/kafka-topics.sh --describe --zookeeper weekend05:2181 --topic order
三、kafka與storm結合
首先來看一張業務流程圖
這裡,flume作為生產者客戶端,storm作為消費者客戶端
例子:下面,我們開發了一個簡單WordCount示例程式,從Kafka讀取訂閱的訊息行,通過空格拆分出單個單詞,然後再做詞頻統計計算
import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import storm.kafka.BrokerHosts;//匯入storm-kafka-0.9.2-incubating.jar包 import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class MyKafkaTopology { public static class KafkaWordSplitter extends BaseRichBolt { private static final Log LOG = LogFactory.getLog(KafkaWordSplitter.class); private static final long serialVersionUID = 886149197481637894L; private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String line = input.getString(0); LOG.info("RECV[kafka -> splitter] " + line); String[] words = line.split("\\s+"); for(String word : words) { LOG.info("EMIT[splitter -> counter] " + word); collector.emit(input, new Values(word, 1)); } collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } public static class WordCounter extends BaseRichBolt { private static final Log LOG = LogFactory.getLog(WordCounter.class); private static final long serialVersionUID = 886149197481637894L; private OutputCollector collector; private Map<String, AtomicInteger> counterMap; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.counterMap = new HashMap<String, AtomicInteger>(); } @Override public void execute(Tuple input) { String word = input.getString(0); int count = input.getInteger(1); LOG.info("RECV[splitter -> counter] " + word + " : " + count); AtomicInteger ai = this.counterMap.get(word); if(ai == null) { ai = new AtomicInteger(); this.counterMap.put(word, ai); } ai.addAndGet(count); collector.ack(input); LOG.info("CHECK statistics map: " + this.counterMap); } @Override public void cleanup() { LOG.info("The final result:"); Iterator<Entry<String, AtomicInteger>> iter = this.counterMap.entrySet().iterator(); while(iter.hasNext()) { Entry<String, AtomicInteger> entry = iter.next(); LOG.info(entry.getKey() + "\t:\t" + entry.getValue().get()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException { String zks = "h1:2181,h2:2181,h3:2181"; String topic = "my-replicated-topic5"; String zkRoot = "/storm"; // default zookeeper root configuration for storm String id = "word"; BrokerHosts brokerHosts = new ZkHosts(zks); SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id); spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); spoutConf.forceFromStart = false;//該配置是指,如果該Topology因故障停止處理,下次正常執行時是否從Spout對應資料來源Kafka//中的該訂閱Topic的起始位置開始讀取,如果forceFromStart=true,則之前處理過的Tuple還要重新處理一遍,否則會從上次處理的位置//繼續處理,保證Kafka中的Topic資料不被重複處理,是在資料來源的位置進行狀態記錄 spoutConf.zkServers = Arrays.asList(new String[] {"h1", "h2", "h3"}); spoutConf.zkPort = 2181; TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5); // Kafka我們建立了一個5分割槽的Topic,這裡並行度設定為5 builder.setBolt("word-splitter", new KafkaWordSplitter(), 2).shuffleGrouping("kafka-reader"); builder.setBolt("word-counter", new WordCounter()).fieldsGrouping("word-splitter", new Fields("word")); Config conf = new Config(); String name = MyKafkaTopology.class.getSimpleName(); if (args != null && args.length > 0) { // Nimbus host name passed from command line conf.put(Config.NIMBUS_HOST, args[0]); conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(name, conf, builder.createTopology()); Thread.sleep(60000); cluster.shutdown(); } } }
可以通過檢視日誌檔案(logs/目錄下)或者Storm UI來監控Topology的執行狀況。如果程式沒有錯誤,可以使用前面我們使用的Kafka Producer來生成訊息,就能看到我們開發的Storm Topology能夠實時接收到並進行處理
四、kafka學習網址
學習官網:http://kafka.apache.org/documentation.html#introduction
storm+kafka+hdfs: http://shiyanjun.cn/archives/934.html