storm消費kafka資料
http://blog.csdn.net/tonylee0329/article/details/43016385
使用storm-kafka模組讀取kafka中的資料,按照以下兩步進行構建(我使用的版本是0.9.3)
1. 使用BrokerHosts介面來配置kafka broker host與partition的mapping資訊;
2. 使用KafkaConfig來配置一些與kafka自身相關的選項,如fetchSizeBytes、socketTimeoutMs
下面分別介紹這兩塊的實現:
對於配置1,目前支援兩種實現方式:zk配置、靜態ip埠方式
第一種方式:Zk讀取(比較常見)
[html] view plain copy
ZkHosts支援兩種建立方式,
public ZkHosts(String brokerZkStr, String brokerZkPath)
//使用預設brokerZkPath:”/brokers”
public ZkHosts(String brokerZkStr)
通過這種方式訪問的時候,經過60s會重新整理一下host->partition的mapping
第二步:構建KafkaConfig物件
目前提供兩種建構函式,
[html] view plain copy
public KafkaConfig(BrokerHosts hosts, String topic)
//clientId如果不想每次隨機生成的話,就自己設定一個
public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
程式碼參考:
[html] view plain copy
//這個地方其實就是kafka配置檔案裡邊的zookeeper.connect這個引數,可以去那裡拿過來。
String brokerZkStr = “10.100.90.201:2181/kafka_online_sample”;
String brokerZkPath = “/brokers”;
ZkHosts zkHosts = new ZkHosts(brokerZkStr, brokerZkPath);
String topic = "mars-wap";
//以下:將offset彙報到哪個zk叢集,相應配置
// String offsetZkServers = “10.199.203.169”;
String offsetZkServers = “10.100.90.201”;
String offsetZkPort = “2181”;
List zkServersList = new ArrayList();
zkServersList.add(offsetZkServers);
//彙報offset資訊的root路徑
String offsetZkRoot = “/stormExample”;
//儲存該spout id的消費offset資訊,譬如以topoName來命名
String offsetZkId = “storm-example”;
SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topic, offsetZkRoot, offsetZkId);
kafkaConfig.zkRoot = offsetZkRoot;
kafkaConfig.zkPort = Integer.parseInt(offsetZkPort);
kafkaConfig.zkServers = zkServersList;
kafkaConfig.id = offsetZkId;
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout spout = new KafkaSpout(kafkaConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", spout, 1);
builder.setBolt("bolt", new Bolt(), 1).shuffleGrouping("spout");
Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", config, builder.createTopology());
// cluster submit.
// try {
// StormSubmitter.submitTopology(“storm-kafka-example”,config,builder.createTopology());
// } catch (AlreadyAliveException e) {
// e.printStackTrace();
// } catch (InvalidTopologyException e) {
// e.printStackTrace();
// }
第二種方式:靜態ip埠方式
[html] view plain copy
String kafkaHost = “10.100.90.201”;
Broker brokerForPartition0 = new Broker(kafkaHost);//localhost:9092
Broker brokerForPartition1 = new Broker(kafkaHost, 9092);//localhost:9092 but we specified the port explicitly
GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
partitionInfo.addPartition(0, brokerForPartition0);//mapping form partition 0 to brokerForPartition0
partitionInfo.addPartition(1, brokerForPartition1);//mapping form partition 1 to brokerForPartition1
StaticHosts hosts = new StaticHosts(partitionInfo);
String topic="mars-wap";
String offsetZkRoot ="/stormExample";
String offsetZkId="staticHost";
String offsetZkServers = "10.100.90.201";
String offsetZkPort = "2181";
List<String> zkServersList = new ArrayList<String>();
zkServersList.add(offsetZkServers);
SpoutConfig kafkaConfig = new SpoutConfig(hosts,topic,offsetZkRoot,offsetZkId);
kafkaConfig.zkPort = Integer.parseInt(offsetZkPort);
kafkaConfig.zkServers = zkServersList;
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout spout = new KafkaSpout(kafkaConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", spout, 1);
builder.setBolt("bolt", new Bolt(), 1).shuffleGrouping("spout");
Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", config, builder.createTopology());
Kafka之Consumer獲取消費資料全過程圖解
字數198 閱讀557 評論0 喜歡1
這篇文章是作為:跟我學Kafka原始碼之Consumer分析 的補充材料,看過我們之前原始碼分析的同學可能知道。
本文將從客戶端程式如何呼叫Consumer獲取到最終Kafka訊息的全過程以圖解的方式作一個原始碼級別的梳理。
廢話不多說,請圖看
時序圖
Business Process Model.jpg
流程圖
20140809174809543.png
文章短小的目的是便於大家快速找到內容的核心加以理解,避免文章又臭又長抓不住重點。
對於Kafka技術,如果大家對此有任何疑問,請給我留言,我們可以深入探討。
相關推薦
簡單Storm消費Kafka資料並存儲到redis例項(訂單資訊處理)
maven依賴 <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId&g
storm消費kafka資料
http://blog.csdn.net/tonylee0329/article/details/43016385 使用storm-kafka模組讀取kafka中的資料,按照以下兩步進行構建(我使用的版本是0.9.3) 1. 使用BrokerHosts介面來
Storm-Kafka模組常用介面分析及消費kafka資料例子
使用storm-kafka模組讀取kafka中的資料,按照以下兩步進行構建(我使用的版本是0.9.3) 1. 使用BrokerHosts介面來配置kafka broker host與partition的mapping資訊; 2. 使用KafkaConfig來配置一些與kaf
storm實時消費kafka資料
程式環境,在kafka建立名稱為data的topic,開啟消費者模式,準備輸入資料。 程式的pom.xml檔案 <dependencies> <dependency> <groupId>org.
kafka中topic的partition數量和customerGroup的customer數量關係以及storm消費kafka時並行度設定問題總結:
前段時間通過網上查詢和自己測試仔細研究了partition和customer關係以及工作中遇到的storm並行度調整的問題,認真梳理了一下現總結如下: 一、先說kafka部分: produce方面: 如果有多個分割槽,傳送的時候按照key值hashCode%partit
Spark 消費Kafka資料
spark RDD消費的哦,不是spark streaming。 導maven包: 注意版本哦,要跟自己機器的一致 <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
spark Streaming 直接消費Kafka資料,儲存到 HDFS 實戰程式設計實踐
最近在學習spark streaming 相關知識,現在總結一下 主要程式碼如下 def createStreamingContext():StreamingContext ={ val sparkConf = new SparkConf().setAppName("
Storm消費Kafka值得注意的坑
問題描述: kafka是之前早就搭建好的,新建的storm叢集要消費kafka的主題,由於kafka中已經記錄了很多訊息,storm消費時從最開始消費問題解決: 下面是摘自官網的一段話:How KafkaSpout stores offsets of a Kafka to
python消費kafka資料批量插入到es
1、es的批量插入 這是為了方便後期配置的更改,把配置資訊放在logging.conf中 用elasticsearch來實現批量操作,先安裝依賴包,sudo pip install Elasticsearch2 from elasticsear
38套大資料,雲端計算,架構,資料分析師,Hadoop,Spark,Storm,Kafka,人工智慧,機器學習,深度學習,專案實戰視訊教程
38套大資料,雲端計算,架構,資料分析師,Hadoop,Spark,Storm,Kafka,人工智慧,機器學習,深度學習,專案實戰視訊教程 視訊課程包含: 38套大資料和人工智慧高階課包含:大資料,雲端計算,架構,資料探勘實戰,實時推薦系統實戰,電視收視率專案實戰,實時流統計專案實戰,離線電
SparkStreaming消費Kafka中的資料 使用zookeeper和MySQL儲存偏移量的兩種方式
Spark讀取Kafka資料的方式有兩種,一種是receiver方式,另一種是直連方式。今天分享的SparkStreaming消費Kafka中的資料儲存偏移量的兩種方式都是基於直連方式上的 話不多說 直接上程式碼 ! 第一種是使用zookeeper儲存偏移量 object Kafka
大資料入門(21)storm和kafka結合的例項
1、原理: storm的lib下的jar, external\storm-kafka\storm-kafka-0.9.2-incubating.jar 此jar中的sqout已經寫好 2、/********** KafkaTopoMain :執行,在本地生成檔案******
Spark Streaming消費Kafka的資料進行統計
流處理平臺: 這裡是第四步的實現: Spark Streaming整合Kafka採用的是Receiver-based,另一種方式Direct Approach,稍作修改就行。 package spark import org.apache.spark.SparkConf impo
Spark Streaming消費Kafka Direct方式資料零丟失實現
一、概述 上次寫這篇文章文章的時候,Spark還是1.x,kafka還是0.8x版本,轉眼間spark到了2.x,kafka也到了2.x,儲存offset的方式也發生了改變,筆者根據上篇文章和網上文章,將offset儲存到Redis,既保證了併發也保證了資料不丟失,經過測試,有效。 二、
Kafka程式碼實現--from-beginning,讀取歷史未消費的資料
Kafka實際環境有可能會出現Consumer全部宕機,雖然基於Kafka的高可用特性,消費者群組中的消費者可以實現再均衡,所有Consumer不處理資料的情況很少,但是還是有可能會出現,此時就要求Consumer重啟的時候能夠讀取在宕機期間Producer傳送的資料。基於消費者訂閱模式預設
用flume-ng-sql-source 從mysql 抽取資料到kafka被storm消費
1.下載編譯flume-ng-sql-source 下載地址:https://github.com/keedio/flume-ng-sql-source.git 安裝說明文件編譯和拷貝jar包 2.編寫flume-ng 配置檔案 1.channels = ch-1 a
使用storm trident消費kafka訊息
一、前言 storm通過保證資料至少被處理一次來保證資料的完整性,由於元祖可以重發,對於一些需要資料精確的場景,可以考慮用storm trident實現。 傳統的事物型拓撲中存在幾種bolt: 1.1 BasicBolt 這是最基本的Bolt,BasicBolt每次只能處理一個tuple,而且必
sparkstreaming+kafka+redis+hbase消費kafka的資料實現exactly-once的語義
最近在做實時流處理的一個專案,遇到N多問題,經過不斷的除錯,終於有點進展,記錄一下,防止後人遇到同樣的問題. 1,sparkstreaming消費kafka有兩種方法,這裡我就不介紹了,網上關於這方面的資料很多,我就簡單說一下兩者的區別吧, (1)基於receiver的方
structuredstreaming消費kafka的資料實現wordcount
最近也是有很多同學問我,StructuredStreaming結合kafka的使用,我簡單的寫了一個wordcount的demo,後續會有更加具體和詳細的介紹,今天先來一個簡單的demo吧.程式碼測試過了,可以執行. package spark import org.ap
Spark Streaming通過直連的方式消費Kafka中的資料
為什麼採用直連(createDirectStream)的方式,主要有以下幾個原因: 1.createDirectStream的方式從Kafka叢集中讀取資料,並且在Spark Streaming系統裡面維護偏移量相關的資訊,實現零資料丟失,保證不重複消費,比createS