1. 程式人生 > >storm消費kafka資料

storm消費kafka資料

http://blog.csdn.net/tonylee0329/article/details/43016385
使用storm-kafka模組讀取kafka中的資料,按照以下兩步進行構建(我使用的版本是0.9.3)
1. 使用BrokerHosts介面來配置kafka broker host與partition的mapping資訊;
2. 使用KafkaConfig來配置一些與kafka自身相關的選項,如fetchSizeBytes、socketTimeoutMs
下面分別介紹這兩塊的實現:

對於配置1,目前支援兩種實現方式:zk配置、靜態ip埠方式

第一種方式:Zk讀取(比較常見)
[html] view plain copy
ZkHosts支援兩種建立方式,
public ZkHosts(String brokerZkStr, String brokerZkPath)
//使用預設brokerZkPath:”/brokers”
public ZkHosts(String brokerZkStr)

通過這種方式訪問的時候,經過60s會重新整理一下host->partition的mapping

第二步:構建KafkaConfig物件
目前提供兩種建構函式,
[html] view plain copy
public KafkaConfig(BrokerHosts hosts, String topic)
//clientId如果不想每次隨機生成的話,就自己設定一個
public KafkaConfig(BrokerHosts hosts, String topic, String clientId)

程式碼參考:
[html] view plain copy
//這個地方其實就是kafka配置檔案裡邊的zookeeper.connect這個引數,可以去那裡拿過來。
String brokerZkStr = “10.100.90.201:2181/kafka_online_sample”;
String brokerZkPath = “/brokers”;
ZkHosts zkHosts = new ZkHosts(brokerZkStr, brokerZkPath);

    String topic = "mars-wap";  
    //以下:將offset彙報到哪個zk叢集,相應配置  

// String offsetZkServers = “10.199.203.169”;
String offsetZkServers = “10.100.90.201”;
String offsetZkPort = “2181”;
List zkServersList = new ArrayList();
zkServersList.add(offsetZkServers);
//彙報offset資訊的root路徑
String offsetZkRoot = “/stormExample”;
//儲存該spout id的消費offset資訊,譬如以topoName來命名
String offsetZkId = “storm-example”;

    SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topic, offsetZkRoot, offsetZkId);  
    kafkaConfig.zkRoot = offsetZkRoot;  
    kafkaConfig.zkPort = Integer.parseInt(offsetZkPort);  
    kafkaConfig.zkServers = zkServersList;  
    kafkaConfig.id = offsetZkId;  
    kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());  


    KafkaSpout spout = new KafkaSpout(kafkaConfig);  

    TopologyBuilder builder = new TopologyBuilder();  
    builder.setSpout("spout", spout, 1);  
    builder.setBolt("bolt", new Bolt(), 1).shuffleGrouping("spout");  

    Config config = new Config();  

    LocalCluster cluster = new LocalCluster();  
    cluster.submitTopology("test", config, builder.createTopology());  

    // cluster submit.  

// try {
// StormSubmitter.submitTopology(“storm-kafka-example”,config,builder.createTopology());
// } catch (AlreadyAliveException e) {
// e.printStackTrace();
// } catch (InvalidTopologyException e) {
// e.printStackTrace();
// }

第二種方式:靜態ip埠方式
[html] view plain copy
String kafkaHost = “10.100.90.201”;
Broker brokerForPartition0 = new Broker(kafkaHost);//localhost:9092
Broker brokerForPartition1 = new Broker(kafkaHost, 9092);//localhost:9092 but we specified the port explicitly
GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
partitionInfo.addPartition(0, brokerForPartition0);//mapping form partition 0 to brokerForPartition0
partitionInfo.addPartition(1, brokerForPartition1);//mapping form partition 1 to brokerForPartition1
StaticHosts hosts = new StaticHosts(partitionInfo);

    String topic="mars-wap";  
    String offsetZkRoot ="/stormExample";  
    String offsetZkId="staticHost";  
    String offsetZkServers = "10.100.90.201";  
    String offsetZkPort = "2181";  
    List<String> zkServersList = new ArrayList<String>();  
    zkServersList.add(offsetZkServers);  

    SpoutConfig kafkaConfig = new SpoutConfig(hosts,topic,offsetZkRoot,offsetZkId);  

    kafkaConfig.zkPort = Integer.parseInt(offsetZkPort);  
    kafkaConfig.zkServers = zkServersList;  
    kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());  

    KafkaSpout spout = new KafkaSpout(kafkaConfig);  

    TopologyBuilder builder = new TopologyBuilder();  
    builder.setSpout("spout", spout, 1);  
    builder.setBolt("bolt", new Bolt(), 1).shuffleGrouping("spout");  

    Config config = new Config();  

    LocalCluster cluster = new LocalCluster();  
    cluster.submitTopology("test", config, builder.createTopology());  

Kafka之Consumer獲取消費資料全過程圖解
字數198 閱讀557 評論0 喜歡1
這篇文章是作為:跟我學Kafka原始碼之Consumer分析 的補充材料,看過我們之前原始碼分析的同學可能知道。
本文將從客戶端程式如何呼叫Consumer獲取到最終Kafka訊息的全過程以圖解的方式作一個原始碼級別的梳理。

廢話不多說,請圖看

時序圖

Business Process Model.jpg
流程圖

20140809174809543.png
文章短小的目的是便於大家快速找到內容的核心加以理解,避免文章又臭又長抓不住重點。
對於Kafka技術,如果大家對此有任何疑問,請給我留言,我們可以深入探討。

相關推薦

簡單Storm消費Kafka資料並存儲到redis例項(訂單資訊處理)

maven依賴 <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId&g

storm消費kafka資料

http://blog.csdn.net/tonylee0329/article/details/43016385 使用storm-kafka模組讀取kafka中的資料,按照以下兩步進行構建(我使用的版本是0.9.3) 1. 使用BrokerHosts介面來

Storm-Kafka模組常用介面分析及消費kafka資料例子

使用storm-kafka模組讀取kafka中的資料,按照以下兩步進行構建(我使用的版本是0.9.3) 1. 使用BrokerHosts介面來配置kafka broker host與partition的mapping資訊; 2. 使用KafkaConfig來配置一些與kaf

storm實時消費kafka資料

程式環境,在kafka建立名稱為data的topic,開啟消費者模式,準備輸入資料。 程式的pom.xml檔案 <dependencies> <dependency> <groupId>org.

kafka中topic的partition數量和customerGroup的customer數量關係以及storm消費kafka時並行度設定問題總結:

前段時間通過網上查詢和自己測試仔細研究了partition和customer關係以及工作中遇到的storm並行度調整的問題,認真梳理了一下現總結如下: 一、先說kafka部分: produce方面: 如果有多個分割槽,傳送的時候按照key值hashCode%partit

Spark 消費Kafka資料

spark RDD消費的哦,不是spark streaming。 導maven包: 注意版本哦,要跟自己機器的一致 <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->

spark Streaming 直接消費Kafka資料,儲存到 HDFS 實戰程式設計實踐

最近在學習spark streaming 相關知識,現在總結一下 主要程式碼如下 def createStreamingContext():StreamingContext ={ val sparkConf = new SparkConf().setAppName("

Storm消費Kafka值得注意的坑

問題描述:  kafka是之前早就搭建好的,新建的storm叢集要消費kafka的主題,由於kafka中已經記錄了很多訊息,storm消費時從最開始消費問題解決:  下面是摘自官網的一段話:How KafkaSpout stores offsets of a Kafka to

python消費kafka資料批量插入到es

1、es的批量插入 這是為了方便後期配置的更改,把配置資訊放在logging.conf中 用elasticsearch來實現批量操作,先安裝依賴包,sudo pip install Elasticsearch2 from elasticsear

38套大資料,雲端計算,架構,資料分析師,Hadoop,Spark,StormKafka,人工智慧,機器學習,深度學習,專案實戰視訊教程

38套大資料,雲端計算,架構,資料分析師,Hadoop,Spark,Storm,Kafka,人工智慧,機器學習,深度學習,專案實戰視訊教程 視訊課程包含: 38套大資料和人工智慧高階課包含:大資料,雲端計算,架構,資料探勘實戰,實時推薦系統實戰,電視收視率專案實戰,實時流統計專案實戰,離線電

SparkStreaming消費Kafka中的資料 使用zookeeper和MySQL儲存偏移量的兩種方式

Spark讀取Kafka資料的方式有兩種,一種是receiver方式,另一種是直連方式。今天分享的SparkStreaming消費Kafka中的資料儲存偏移量的兩種方式都是基於直連方式上的 話不多說 直接上程式碼 ! 第一種是使用zookeeper儲存偏移量 object Kafka

資料入門(21)stormkafka結合的例項

  1、原理: storm的lib下的jar, external\storm-kafka\storm-kafka-0.9.2-incubating.jar 此jar中的sqout已經寫好 2、/********** KafkaTopoMain :執行,在本地生成檔案******

Spark Streaming消費Kafka資料進行統計

流處理平臺: 這裡是第四步的實現: Spark Streaming整合Kafka採用的是Receiver-based,另一種方式Direct Approach,稍作修改就行。 package spark import org.apache.spark.SparkConf impo

Spark Streaming消費Kafka Direct方式資料零丟失實現

一、概述 上次寫這篇文章文章的時候,Spark還是1.x,kafka還是0.8x版本,轉眼間spark到了2.x,kafka也到了2.x,儲存offset的方式也發生了改變,筆者根據上篇文章和網上文章,將offset儲存到Redis,既保證了併發也保證了資料不丟失,經過測試,有效。 二、

Kafka程式碼實現--from-beginning,讀取歷史未消費資料

Kafka實際環境有可能會出現Consumer全部宕機,雖然基於Kafka的高可用特性,消費者群組中的消費者可以實現再均衡,所有Consumer不處理資料的情況很少,但是還是有可能會出現,此時就要求Consumer重啟的時候能夠讀取在宕機期間Producer傳送的資料。基於消費者訂閱模式預設

用flume-ng-sql-source 從mysql 抽取資料到kafkastorm消費

1.下載編譯flume-ng-sql-source 下載地址:https://github.com/keedio/flume-ng-sql-source.git   安裝說明文件編譯和拷貝jar包 2.編寫flume-ng 配置檔案 1.channels = ch-1 a

使用storm trident消費kafka訊息

一、前言   storm通過保證資料至少被處理一次來保證資料的完整性,由於元祖可以重發,對於一些需要資料精確的場景,可以考慮用storm trident實現。    傳統的事物型拓撲中存在幾種bolt: 1.1 BasicBolt  這是最基本的Bolt,BasicBolt每次只能處理一個tuple,而且必

sparkstreaming+kafka+redis+hbase消費kafka資料實現exactly-once的語義

最近在做實時流處理的一個專案,遇到N多問題,經過不斷的除錯,終於有點進展,記錄一下,防止後人遇到同樣的問題. 1,sparkstreaming消費kafka有兩種方法,這裡我就不介紹了,網上關於這方面的資料很多,我就簡單說一下兩者的區別吧, (1)基於receiver的方

structuredstreaming消費kafka資料實現wordcount

最近也是有很多同學問我,StructuredStreaming結合kafka的使用,我簡單的寫了一個wordcount的demo,後續會有更加具體和詳細的介紹,今天先來一個簡單的demo吧.程式碼測試過了,可以執行. package spark import org.ap

Spark Streaming通過直連的方式消費Kafka中的資料

為什麼採用直連(createDirectStream)的方式,主要有以下幾個原因: 1.createDirectStream的方式從Kafka叢集中讀取資料,並且在Spark Streaming系統裡面維護偏移量相關的資訊,實現零資料丟失,保證不重複消費,比createS