1. 程式人生 > >storm實時消費kafka資料

storm實時消費kafka資料

  1. 程式環境,在kafka建立名稱為data的topic,開啟消費者模式,準備輸入資料。
  2. 程式的pom.xml檔案


  <dependencies>

  <dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.0.2</version>
    <scope>provided</scope>
</dependency
>
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>1.0.2</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId
>
<version>0.8.2.0</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.14</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId
>
log4j-over-slf4j</artifactId> <version>1.7.21</version> </dependency> </dependencies>

3.spout程式碼


public class MykafkaSpout {

    /**
     * @param args
     * @throws AuthorizationException 
     */
    public static void main(String[] args) throws AuthorizationException {
        // TODO Auto-generated method stub

        String topic = "data" ;
        ZkHosts zkHosts = new ZkHosts("192.168.59.132:2181");
        SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, 
                "", 
                "MyTrack") ;
        List<String> zkServers = new ArrayList<String>() ;
        zkServers.add("192.168.59.132");
        spoutConfig.zkServers = zkServers;
        spoutConfig.zkPort = 2181;
        spoutConfig.socketTimeoutMs = 60 * 1000 ;
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()) ; 

        TopologyBuilder builder = new TopologyBuilder() ;
        builder.setSpout("spout", new KafkaSpout(spoutConfig) ,1) ;
        builder.setBolt("bolt1", new MyKafkaBolt(), 1).shuffleGrouping("spout") ;

        Config conf = new Config ();
        conf.setDebug(false) ;

        if (args.length > 0) {
            try {
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            }
        }else {
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("mytopology", conf, builder.createTopology());
        }

    }

}

4.bolt程式碼,這裡為了簡化,只把資料打印出來



public class MyKafkaBolt implements IBasicBolt {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    @Override
    public void cleanup() {
        // TODO Auto-generated method stub

    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        // TODO Auto-generated method stub

        String kafkaMsg = input.getString(0) ;
        System.err.println("bolt:"+kafkaMsg);
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        // TODO Auto-generated method stub

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

}

5.如何確定SpoutConfig中的zkRoot,檢視kafka中的server.properties檔案,如果zookeeper.connect後面沒有跟/bc這種就是,直接為”“,否則zkRoot為bc,就類似於zookeeper.connect=localhostlei1:2181,localhostlei2:2181,localhostlei3:2181/bc

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhostlei1:2181,localhostlei2:2181,localhostlei3:2181

6.開始任務後,嘗試往kafka中寫入資料,資料就能馬上被storm所消費。