1. 程式人生 > >storm和kafka的wordCount

storm和kafka的wordCount

onf scope work exe 本地 values 編寫代碼 ase shuffle

這個是在window環境下面安裝的kafka

  • 下載pom依賴

    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-kafka-client</artifactId>
        <version>1.1.1</version>
      </dependency>
      <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.0.0</version>
      </dependency>
      <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>1.1.1</version>
        <!--  本地測試註釋集群運行打開 -->
        <!--  <scope>provided</scope>-->
      </dependency>

編寫代碼

  • 編寫SplitSentenceBolt
    public class SplitSentenceBolt extends BaseRichBolt {
    private OutputCollector collector;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector=outputCollector;
    }
    
    @Override
    public void execute(Tuple tuple) {
        //String sentece = tuple.getStringByField("sentence");
        String sentece=tuple.getString(4);
        String[] words = sentece.split(" ");
        for (String word:words){
            collector.emit(new Values(word));
        }
    }
    
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("words"));
    }

    }

  • 編寫WordCountBolt
    public class WordCountBolt extends BaseRichBolt {

    private  OutputCollector collector;
    private HashMap<String,Long> counts =null;
    
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        this.counts = new HashMap<>();
    }
    
    @Override
    public void execute(Tuple tuple) {
         String word = tuple.getStringByField("words");
        // String word =tuple.getString(0);
        Long count=this.counts.get(word);
        if(count==null){
            count=0L;
        }
        count++;
        //出現就添加到map中,word相同的,會覆蓋掉 所以最後的word就是準確的數據
        this.counts.put(word,count);
        this.collector.emit(new Values(word,count));
    }
    
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("word","count"));
    }
    }
  • 編寫ReportBolt
    public class ReportBolt extends BaseRichBolt {
    private HashMap

    @Override
    public void execute(Tuple input) {
        String word=input.getStringByField("word");
        Long count=input.getLongByField("count");
        this.counts.put(word, count);
    
    
        System.out.println("--------FINAL COUNTS--------");
        List<String> keys=new ArrayList<String>();
        keys.addAll(this.counts.keySet());
        Collections.sort(keys);
        for(String key:keys){
            System.out.println(key+":"+this.counts.get(key));
        }
        System.out.println("----------------------------");
    }
    
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    
    }
    }
  • 編寫Topology
    public class MainTopology {
    public static void main(String[] args)throws Exception {
    TopologyBuilder builder = new TopologyBuilder();
    KafkaSpoutConfig.Builder

        //設置kafka屬於哪個組
        kafkabuilder.setGroupId("testgroup");
        //創建kafkaspoutConfig
        KafkaSpoutConfig<String, String> build = kafkabuilder.build();
        //通過kafkaspoutconfig獲取kafkaspout
        KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(build);
        //設置四個線程接收數據
        builder.setSpout("kafkaspout",kafkaSpout,4);

    // builder.setBolt("printBolt", new PrintBolt(),2).localOrShuffleGrouping("kafkaspout");

        builder.setBolt("split-bolt",new SplitSentenceBolt(),2).setNumTasks(4).shuffleGrouping("kafkaspout");
        // 有時候我們需要將特定數據的tuple路由到特殊的bolt實例中,在此我們使用fieldsGrouping
        // 來保證所有"word"字段值相同的tuple會被路由到同一個WordCountBolt實例中
        builder.setBolt("count-bolt",new WordCountBolt(),2).fieldsGrouping("split-bolt",new Fields("words"));
        builder.setBolt("report-bolt",new ReportBolt()).globalGrouping("count-bolt");
    
        Config config=new Config();
        config.setDebug(false);
        config.setNumWorkers(2);
        LocalCluster cluster =new LocalCluster();
        cluster.submitTopology("kafkaspout",config,builder.createTopology());
    
    }

storm和kafka的wordCount