1. 程式人生 > >Storm-Kafka模組常用介面分析及消費kafka資料例子

Storm-Kafka模組常用介面分析及消費kafka資料例子

使用storm-kafka模組讀取kafka中的資料,按照以下兩步進行構建(我使用的版本是0.9.3) 1. 使用BrokerHosts介面來配置kafka broker host與partition的mapping資訊; 2. 使用KafkaConfig來配置一些與kafka自身相關的選項,如fetchSizeBytes、socketTimeoutMs 下面分別介紹這兩塊的實現: 對於配置1,目前支援兩種實現方式:zk配置、靜態ip埠方式 第一種方式:Zk讀取(比較常見)
ZkHosts支援兩種建立方式,
public ZkHosts(String brokerZkStr, String brokerZkPath)
//使用預設brokerZkPath:"/brokers"
public ZkHosts(String brokerZkStr)

通過這種方式訪問的時候,經過60s會重新整理一下host->partition的mapping 第二步:構建KafkaConfig物件 目前提供兩種建構函式,
public KafkaConfig(BrokerHosts hosts, String topic)
//clientId如果不想每次隨機生成的話,就自己設定一個
public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
程式碼參考:
        //這個地方其實就是kafka配置檔案裡邊的zookeeper.connect這個引數,可以去那裡拿過來。
        String brokerZkStr = "10.100.90.201:2181/kafka_online_sample";
        String brokerZkPath = "/brokers";
        ZkHosts zkHosts = new ZkHosts(brokerZkStr, brokerZkPath);

        String topic = "mars-wap";
        //以下:將offset彙報到哪個zk叢集,相應配置
//        String offsetZkServers = "10.199.203.169";
        String offsetZkServers = "10.100.90.201";
        String offsetZkPort = "2181";
        List<String> zkServersList = new ArrayList<String>();
        zkServersList.add(offsetZkServers);
        //彙報offset資訊的root路徑
        String offsetZkRoot = "/stormExample";
        //儲存該spout id的消費offset資訊,譬如以topoName來命名
        String offsetZkId = "storm-example";


        SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topic, offsetZkRoot, offsetZkId);
        kafkaConfig.zkRoot = offsetZkRoot;
        kafkaConfig.zkPort = Integer.parseInt(offsetZkPort);
        kafkaConfig.zkServers = zkServersList;
        kafkaConfig.id = offsetZkId;
        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());


        KafkaSpout spout = new KafkaSpout(kafkaConfig);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", spout, 1);
        builder.setBolt("bolt", new Bolt(), 1).shuffleGrouping("spout");

        Config config = new Config();

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("test", config, builder.createTopology());

        // cluster submit.
//        try {
//            StormSubmitter.submitTopology("storm-kafka-example",config,builder.createTopology());
//        } catch (AlreadyAliveException e) {
//            e.printStackTrace();
//        } catch (InvalidTopologyException e) {
//            e.printStackTrace();
//        }
第二種方式:靜態ip埠方式
String kafkaHost = "10.100.90.201";
        Broker brokerForPartition0 = new Broker(kafkaHost);//localhost:9092
        Broker brokerForPartition1 = new Broker(kafkaHost, 9092);//localhost:9092 but we specified the port explicitly
        GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
        partitionInfo.addPartition(0, brokerForPartition0);//mapping form partition 0 to brokerForPartition0
        partitionInfo.addPartition(1, brokerForPartition1);//mapping form partition 1 to brokerForPartition1
        StaticHosts hosts = new StaticHosts(partitionInfo);

        String topic="mars-wap";
        String offsetZkRoot ="/stormExample";
        String offsetZkId="staticHost";
        String offsetZkServers = "10.100.90.201";
        String offsetZkPort = "2181";
        List<String> zkServersList = new ArrayList<String>();
        zkServersList.add(offsetZkServers);

        SpoutConfig kafkaConfig = new SpoutConfig(hosts,topic,offsetZkRoot,offsetZkId);

        kafkaConfig.zkPort = Integer.parseInt(offsetZkPort);
        kafkaConfig.zkServers = zkServersList;
        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

        KafkaSpout spout = new KafkaSpout(kafkaConfig);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", spout, 1);
        builder.setBolt("bolt", new Bolt(), 1).shuffleGrouping("spout");

        Config config = new Config();

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("test", config, builder.createTopology());


完整的使用例子,見github原始碼 參考: https://github.com/apache/storm/blob/v0.9.3/external/storm-kafka/README.md
本文為原創,轉載請標明出處!From Tony_老七