Storm-Kafka模組常用介面分析及消費kafka資料例子
阿新 • • 發佈:2018-12-31
使用storm-kafka模組讀取kafka中的資料,按照以下兩步進行構建(我使用的版本是0.9.3)
1. 使用BrokerHosts介面來配置kafka broker host與partition的mapping資訊;
2. 使用KafkaConfig來配置一些與kafka自身相關的選項,如fetchSizeBytes、socketTimeoutMs
下面分別介紹這兩塊的實現:
對於配置1,目前支援兩種實現方式:zk配置、靜態ip埠方式
第一種方式:Zk讀取(比較常見)
通過這種方式訪問的時候,經過60s會重新整理一下host->partition的mapping 第二步:構建KafkaConfig物件 目前提供兩種建構函式,
完整的使用例子,見github原始碼 參考: https://github.com/apache/storm/blob/v0.9.3/external/storm-kafka/README.md
本文為原創,轉載請標明出處!From Tony_老七
ZkHosts支援兩種建立方式, public ZkHosts(String brokerZkStr, String brokerZkPath) //使用預設brokerZkPath:"/brokers" public ZkHosts(String brokerZkStr)
通過這種方式訪問的時候,經過60s會重新整理一下host->partition的mapping 第二步:構建KafkaConfig物件 目前提供兩種建構函式,
public KafkaConfig(BrokerHosts hosts, String topic)
//clientId如果不想每次隨機生成的話,就自己設定一個
public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
程式碼參考:
第二種方式:靜態ip埠方式//這個地方其實就是kafka配置檔案裡邊的zookeeper.connect這個引數,可以去那裡拿過來。 String brokerZkStr = "10.100.90.201:2181/kafka_online_sample"; String brokerZkPath = "/brokers"; ZkHosts zkHosts = new ZkHosts(brokerZkStr, brokerZkPath); String topic = "mars-wap"; //以下:將offset彙報到哪個zk叢集,相應配置 // String offsetZkServers = "10.199.203.169"; String offsetZkServers = "10.100.90.201"; String offsetZkPort = "2181"; List<String> zkServersList = new ArrayList<String>(); zkServersList.add(offsetZkServers); //彙報offset資訊的root路徑 String offsetZkRoot = "/stormExample"; //儲存該spout id的消費offset資訊,譬如以topoName來命名 String offsetZkId = "storm-example"; SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topic, offsetZkRoot, offsetZkId); kafkaConfig.zkRoot = offsetZkRoot; kafkaConfig.zkPort = Integer.parseInt(offsetZkPort); kafkaConfig.zkServers = zkServersList; kafkaConfig.id = offsetZkId; kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout spout = new KafkaSpout(kafkaConfig); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", spout, 1); builder.setBolt("bolt", new Bolt(), 1).shuffleGrouping("spout"); Config config = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", config, builder.createTopology()); // cluster submit. // try { // StormSubmitter.submitTopology("storm-kafka-example",config,builder.createTopology()); // } catch (AlreadyAliveException e) { // e.printStackTrace(); // } catch (InvalidTopologyException e) { // e.printStackTrace(); // }
String kafkaHost = "10.100.90.201"; Broker brokerForPartition0 = new Broker(kafkaHost);//localhost:9092 Broker brokerForPartition1 = new Broker(kafkaHost, 9092);//localhost:9092 but we specified the port explicitly GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation(); partitionInfo.addPartition(0, brokerForPartition0);//mapping form partition 0 to brokerForPartition0 partitionInfo.addPartition(1, brokerForPartition1);//mapping form partition 1 to brokerForPartition1 StaticHosts hosts = new StaticHosts(partitionInfo); String topic="mars-wap"; String offsetZkRoot ="/stormExample"; String offsetZkId="staticHost"; String offsetZkServers = "10.100.90.201"; String offsetZkPort = "2181"; List<String> zkServersList = new ArrayList<String>(); zkServersList.add(offsetZkServers); SpoutConfig kafkaConfig = new SpoutConfig(hosts,topic,offsetZkRoot,offsetZkId); kafkaConfig.zkPort = Integer.parseInt(offsetZkPort); kafkaConfig.zkServers = zkServersList; kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout spout = new KafkaSpout(kafkaConfig); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", spout, 1); builder.setBolt("bolt", new Bolt(), 1).shuffleGrouping("spout"); Config config = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", config, builder.createTopology());
完整的使用例子,見github原始碼 參考: https://github.com/apache/storm/blob/v0.9.3/external/storm-kafka/README.md
本文為原創,轉載請標明出處!From Tony_老七