1. 程式人生 > >Storm的WordCount案例spout bolt詳細總結 實現介面IRich IBASE區別

Storm的WordCount案例spout bolt詳細總結 實現介面IRich IBASE區別

spout介紹

一個spout是由流組成的資料來源在storm的拓撲裡,通常情況下會讀取外部的資料來源 
然後emit(發射)到拓撲裡面,比如是kafka,MySQL或者redis等等,Spout有兩種實現一種是可靠的訊息實現,如果傳送失敗則會重試,另外一種是不可靠的訊息實現可能會出現訊息丟失,spout可以一次宣告多個數據流通過OutputFieldsDeclarer類的declareStream方法,當然前提是你的SpoutOutputCollector裡的emit也是多個流 

Spout裡面主要的方法是nextTuple,它裡面可以發射新的tuple到拓撲,或者當沒有訊息的時候就return,需要注意,這個方法裡面不能阻塞,因為storm呼叫spout方法是單執行緒的,其他的主要方法是ack和fail,如果使用了可靠的spout,可以使用ack和fail來確定訊息傳送狀態 

相關擴充套件: 
IRichSpout:spout類必須實現的介面 
BaseRichSpout :可靠的spout有ack確保 
BaseBasicSpout :不可靠的spout 

1.Spout元件:建立Spout(WordCountSpout)元件採集資料,作為整個Topology的資料來源

WordCountSpout.java

package storm;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
import java.util.Random;

public class WordCountSpout extends BaseRichSpout {

    private SpoutOutputCollector collector;
    //模擬產生一些資料
    private String[] data = {"I I love Beijing","I love love love  China","Beijing is id is is the the capital of China"};

    /**
     * open方法的作用主要是將collector進行初始化
     * collector的作用:將採集到的資料傳送給下一個元件
     */
    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector collector) {
        this.collector=collector;
    }

    @Override
    public void nextTuple() {
        Utils.sleep(3000);
        int random = (new Random()).nextInt(3);
        String value = data[random];
        System.out.println("產生的隨機值是"+value);
        //傳送給下一個元件
        collector.emit(new Values(value));
    }


   //申明發送給下一個元件的tuple的schema(結構)
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }
}

bolt介紹

Bolts 業務處理單元 
所有的拓撲處理都會在bolt中進行,bolt裡面可以做任何etl,比如過濾,函式,聚合,連線,寫入資料庫系統或快取等,一個bolt可以做簡單的事件流轉換,如果是複雜的流轉化,往往需要多個bolt參與,這就是流計算,每個bolt都進行一個業務邏輯處理,bolt也可以emit多個流到下游,通過declareStream方法宣告輸出的schema。 

Bolt裡面主要的方法是execute方法,每次處理一個輸入的tuple,bolt裡面也可以發射新的tuple使用OutputCollector類,bolt裡面每處理一個tuple必須呼叫ack方法以便於storm知道某個tuple何時處理完成。Strom裡面的IBasicBolt介面可以自動 
呼叫ack。 

相關拓展: 
IRichBolt:bolts的通用介面 
IBasicBolt:擴充套件的bolt介面,可以自動處理ack 
OutputCollector:bolt發射tuple到下游bolt裡面 

2.Bolt元件1:建立Bolt(WordCountSplitBolt)元件進行分詞操作

WordCountSplitBolt.java

package storm;

import com.google.common.collect.Maps;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

public class WordCountSplitBolt extends BaseRichBolt{

    //bolt元件的收集器 用於將資料傳送給下一個bolt
    private OutputCollector collector;


    //初始化
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        //處理上一級發來的資料
        String value = tuple.getStringByField("sentence");
        String[] data= value.split(" ");
        //輸出
        for (String word : data){
            collector.emit(new Values(word,1));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        //申明發送給下一個元件的tuple schema結構
        declarer.declare(new Fields("word","count"));
    }
}

3.Bolt元件2:建立Bolt(WordCountTotalBolt)元件進行單詞統計操作

WordCountTotalBolt.java

package storm;

import com.google.common.collect.Maps;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;
import java.util.Set;

public class WordCountTotalBolt extends BaseRichBolt{

    private OutputCollector collector;
    Map<String,Integer> result=Maps.newHashMap();
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        Integer count = tuple.getIntegerByField("count");

        if (result.get(word) == null){
            result.put(word,count);
        }else {
            result.put(word,count + result.get(word));
        }
        result.entrySet().forEach(enty-> System.out.println("單詞:"+enty.getKey()+" " + "數量:"+ enty.getValue()));

        collector.emit(new Values(word,result.get(word)));

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word","total"));
    }
}

4.Topology主程式:(WordCountTopology

WordCountTopology.java

package storm;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class WordCountTopology {
    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();

        //1 指定任務的spout元件
        builder.setSpout("1",new WordCountSpout());

        //2 指定任務的第一個bolt元件
        builder.setBolt("2",new WordCountSplitBolt()).shuffleGrouping("1");

        //3 指定任務的第二個bolt元件
        builder.setBolt("3",new WordCountTotalBolt()).fieldsGrouping("2",new Fields("word"));

        //建立任務
        StormTopology job = builder.createTopology();

        Config config = new Config();

        //執行任務有兩種模式
        //1 本地模式   2 叢集模式

        //1、本地模式
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("MyWordCount",config,job);

        //2、叢集模式:用於打包jar,並放到storm執行
//        StormSubmitter.submitTopology(args[0], conf, job);
    }
}

pom.xml

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.3</version>
<!--<scope>provided</scope>-->
</dependency>

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-rename-hack</artifactId>
<version>1.0.3</version>
</dependency>

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hbase</artifactId>
<version>1.0.3</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-redis</artifactId>
<version>1.0.3</version>
</dependency>

相關推薦

Storm的WordCount案例spout bolt詳細總結 實現介面IRich IBASE區別

spout介紹 一個spout是由流組成的資料來源在storm的拓撲裡,通常情況下會讀取外部的資料來源  然後emit(發射)到拓撲裡面,比如是kafka,MySQL或者redis等等,Spout有兩種實現一種是可靠的訊息實現,如果傳送失敗則會重試,另外一種是不可靠的訊息實

Thread和實現介面Runnable的區別

首先,我更喜歡用Runnable的方式。Runnable的方式更接近共享同一資料的問題。1,Thread執行緒實現是靠繼承,我們知道java是單繼承的。而Runnable是現實介面。這樣Runnable可以“多繼承”;2,資料資源問題。Runnable實現多執行緒,是通過建立

十大經典排序算法詳細總結(含JAVA代碼實現

出現的次數 完全 放置 累加 有時 經典 整數 eap 分割 原文出處:http://mp.weixin.qq.com/s/feQDjby4uYGRLbYUJq7Lpg 0、排序算法說明 0.1 排序的定義 對一序列對象根據某個關鍵字進行排序。 0.2 術

十大經典排序演算法詳細總結(含JAVA程式碼實現)

文章目錄 十大經典排序演算法詳細總結(含JAVA程式碼實現) 0、排序演算法說明 1、氣泡排序(Bubble Sort) 2、選擇排序(Selection Sort) 3、插入排序(Insertion Sort) 4、希爾

java.io.Serializable(序列化)介面詳細總結

一、前言   在參加工作後,做的第一個專案是電商專案。當時不會做專案,只能照貓畫虎。其中一個VO類為何要實現Serializable介面一直沒有理解,不實現這個Serializable,會報錯。如下是隨手寫的一個VO類Person.java: import

萬字長文詳細總結!關於繼承、重寫與過載、封裝、介面的硬核乾貨

![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20210131200205437.jpg?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmN

chrome谷歌瀏覽器-DevTool開發者工具-詳細總結

相關 tail justify 任務管理器 log 兩個 停用 表格 需要 chrome的開發者工具可以說是十分強大了,是web開發者的一大利器,作為我個人而言平時用到的幾率很大,相信大家也很常見,但是不要僅僅停留在點選元素看看樣式的層面上哦,跟著我的總結一起學習實踐一下

python3、ipython3、setup-tools、pip等環境搭建詳細總結

python第一個python腳本:[[email protected] ~]# cat helloworld.py print("hello world")[[email protected] ~]# python helloworld.py hello world 安裝python3及

MyBatis學習總結——實現關聯表查詢(轉)

得到 into primary 字符串 student prim oci ssr ret 原文鏈接:孤傲蒼狼 一、一對一關聯 1.1、提出需求   根據班級id查詢班級信息(帶老師的信息) 1.2、創建表和數據   創建一張教師表和班級表,這裏我們假設一個老師只負責教一個

數據庫的備份與還原系列——單表備份和恢復詳細完整實現

單表備份 單表還原 表定義備份還原 表數據備份還原 參考實現:https://www.percona.com/doc/percona-xtrabackup/LATEST/innobackupex/innobackupex_script.htmlRestoring Individual Tabl

《c++ const 詳細總結》--轉載

一次 public 語法 family end 函數 就是 類型 不變   C++中的const關鍵字的用法非常靈活,而使用const將大大改善程序的健壯性,本人根據各方面查到的資料進行總結如下,期望對朋友們有所幫助。   const 是C++中常用的類型修飾符,常類型是

關於post與get的詳細總結

而是 然而 傳遞 長度 史記 是我 com post 真的   近日,對爬蟲的使用增多,總結在日常使用中的一些問題,其中get與post的問題是最需要重點總結的。   首先,在以往的使用經驗中,get與post是最常用的兩種請求方式,最直觀的區別就是get會將參數放在url

Python函數詳細總結

代碼塊 lan BE ssi python 命名 param 不定 更改 函數定義 函數代碼塊以 def 關鍵詞開頭,後接函數標識符名稱和圓括號()。 任何傳入參數和自變量必須放在圓括號中間。圓括號之間可以用於定義參數。 函數的第一行語句可以選擇性地使用文檔字符串

Python dict(字典) 詳細總結

one dump TE port 數字 class 轉換 PE 持久 示例: d={ ‘name‘:‘yy‘ } key值判斷 d.has_key(‘name‘) #如果有key返回True d.get(‘name‘) #如果沒有key返回None 添加與更

Python運算符詳細總結

true 位運算 兩個 次方 等於 邏輯運算符 進制 位與 計算 算術運算符 運算符 描述 實例 + 加 - 兩個對象相加 a + b 輸出結果 30 - 減 - 得到負數或是一個數減去另一個數 a - b 輸出結果 -10 * 乘 - 兩個數相乘或是

MyISAM與InnoDB兩者之間區別與選擇,詳細總結,性能對比

執行過程 val 性能問題 全部 marked rain row 演示 也會 1、MyISAM:默認表類型,它是基於傳統的ISAM類型,ISAM是Indexed Sequential Access Method (有索引的順序訪問方法) 的縮寫,它是存儲記錄和文件的標準方法

java集合(List,Set,Map)詳細總結

麻煩 array map接口 安全 content 想要 鍵值 more san 集合的由來:   數組是長度是固定的,當添加的元素超過數組的長度時需要對數組重新定義,太麻煩了,java內部給我們提供了集合類,能存儲任意對象,長度是可以改變的,隨著元素的增加而增加,隨著元素

Java異常超詳細總結

ace 垃圾回收器 通過 代碼 異常 執行 int 隧道 面試 什麽是異常:   異常就是Java程序在運行過程中出現的錯誤。 騷話:   世界上最真情的相依就是你在try我在catch,無論你發什麽脾氣,我都靜靜接受,默默處理(這個可以不記) 異常繼承體系圖:   

java操作樹莓派GPIO控制LED燈--結合springboot實現介面呼叫

1、概述 本文使用java結合springboot實現了對樹莓派GPIO介面的操作以達到控制LED燈的功能 2、pom檔案如下: <project xmlns="http://maven.apache.org/POM/4.0.0"      &nb

Springmvc藉助SimpleUrlHandlerMapping實現介面開關功能

一、介面開關功能   1、可配置化,依賴配置中心   2、介面訪問許可權可控   3、springmvc不會掃描到,即不會直接的將介面暴露出去 二、介面開關使用場景   和業務沒什麼關係,主要方便查詢系統中的一些狀態資訊。比如系統的配置資訊,中介軟體的狀態資訊。這就需要寫一些特定的介面,不能對外直接