Storm的WordCount案例spout bolt詳細總結 實現介面IRich IBASE區別
spout介紹
一個spout是由流組成的資料來源在storm的拓撲裡,通常情況下會讀取外部的資料來源
然後emit(發射)到拓撲裡面,比如是kafka,MySQL或者redis等等,Spout有兩種實現一種是可靠的訊息實現,如果傳送失敗則會重試,另外一種是不可靠的訊息實現可能會出現訊息丟失,spout可以一次宣告多個數據流通過OutputFieldsDeclarer類的declareStream方法,當然前提是你的SpoutOutputCollector裡的emit也是多個流
Spout裡面主要的方法是nextTuple,它裡面可以發射新的tuple到拓撲,或者當沒有訊息的時候就return,需要注意,這個方法裡面不能阻塞,因為storm呼叫spout方法是單執行緒的,其他的主要方法是ack和fail,如果使用了可靠的spout,可以使用ack和fail來確定訊息傳送狀態
相關擴充套件:
IRichSpout:spout類必須實現的介面
BaseRichSpout :可靠的spout有ack確保
BaseBasicSpout :不可靠的spout
1.Spout元件:建立Spout(WordCountSpout)元件採集資料,作為整個Topology的資料來源
WordCountSpout.java
package storm; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import java.util.Map; import java.util.Random; public class WordCountSpout extends BaseRichSpout { private SpoutOutputCollector collector; //模擬產生一些資料 private String[] data = {"I I love Beijing","I love love love China","Beijing is id is is the the capital of China"}; /** * open方法的作用主要是將collector進行初始化 * collector的作用:將採集到的資料傳送給下一個元件 */ @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector collector) { this.collector=collector; } @Override public void nextTuple() { Utils.sleep(3000); int random = (new Random()).nextInt(3); String value = data[random]; System.out.println("產生的隨機值是"+value); //傳送給下一個元件 collector.emit(new Values(value)); } //申明發送給下一個元件的tuple的schema(結構) @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } }
bolt介紹
Bolts 業務處理單元
所有的拓撲處理都會在bolt中進行,bolt裡面可以做任何etl,比如過濾,函式,聚合,連線,寫入資料庫系統或快取等,一個bolt可以做簡單的事件流轉換,如果是複雜的流轉化,往往需要多個bolt參與,這就是流計算,每個bolt都進行一個業務邏輯處理,bolt也可以emit多個流到下游,通過declareStream方法宣告輸出的schema。
Bolt裡面主要的方法是execute方法,每次處理一個輸入的tuple,bolt裡面也可以發射新的tuple使用OutputCollector類,bolt裡面每處理一個tuple必須呼叫ack方法以便於storm知道某個tuple何時處理完成。Strom裡面的IBasicBolt介面可以自動
呼叫ack。
相關拓展:
IRichBolt:bolts的通用介面
IBasicBolt:擴充套件的bolt介面,可以自動處理ack
OutputCollector:bolt發射tuple到下游bolt裡面
2.Bolt元件1:建立Bolt(WordCountSplitBolt)元件進行分詞操作
WordCountSplitBolt.java
package storm;
import com.google.common.collect.Maps;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
public class WordCountSplitBolt extends BaseRichBolt{
//bolt元件的收集器 用於將資料傳送給下一個bolt
private OutputCollector collector;
//初始化
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
//處理上一級發來的資料
String value = tuple.getStringByField("sentence");
String[] data= value.split(" ");
//輸出
for (String word : data){
collector.emit(new Values(word,1));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//申明發送給下一個元件的tuple schema結構
declarer.declare(new Fields("word","count"));
}
}
3.Bolt元件2:建立Bolt(WordCountTotalBolt)元件進行單詞統計操作
WordCountTotalBolt.java
package storm;
import com.google.common.collect.Maps;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
import java.util.Set;
public class WordCountTotalBolt extends BaseRichBolt{
private OutputCollector collector;
Map<String,Integer> result=Maps.newHashMap();
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Integer count = tuple.getIntegerByField("count");
if (result.get(word) == null){
result.put(word,count);
}else {
result.put(word,count + result.get(word));
}
result.entrySet().forEach(enty-> System.out.println("單詞:"+enty.getKey()+" " + "數量:"+ enty.getValue()));
collector.emit(new Values(word,result.get(word)));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","total"));
}
}
4.Topology主程式:(WordCountTopology)
WordCountTopology.java
package storm;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
public class WordCountTopology {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
//1 指定任務的spout元件
builder.setSpout("1",new WordCountSpout());
//2 指定任務的第一個bolt元件
builder.setBolt("2",new WordCountSplitBolt()).shuffleGrouping("1");
//3 指定任務的第二個bolt元件
builder.setBolt("3",new WordCountTotalBolt()).fieldsGrouping("2",new Fields("word"));
//建立任務
StormTopology job = builder.createTopology();
Config config = new Config();
//執行任務有兩種模式
//1 本地模式 2 叢集模式
//1、本地模式
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("MyWordCount",config,job);
//2、叢集模式:用於打包jar,並放到storm執行
// StormSubmitter.submitTopology(args[0], conf, job);
}
}
pom.xml
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.3</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-rename-hack</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hbase</artifactId>
<version>1.0.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-redis</artifactId>
<version>1.0.3</version>
</dependency>
相關推薦
Storm的WordCount案例spout bolt詳細總結 實現介面IRich IBASE區別
spout介紹 一個spout是由流組成的資料來源在storm的拓撲裡,通常情況下會讀取外部的資料來源 然後emit(發射)到拓撲裡面,比如是kafka,MySQL或者redis等等,Spout有兩種實現一種是可靠的訊息實現,如果傳送失敗則會重試,另外一種是不可靠的訊息實
Thread和實現介面Runnable的區別
首先,我更喜歡用Runnable的方式。Runnable的方式更接近共享同一資料的問題。1,Thread執行緒實現是靠繼承,我們知道java是單繼承的。而Runnable是現實介面。這樣Runnable可以“多繼承”;2,資料資源問題。Runnable實現多執行緒,是通過建立
十大經典排序算法詳細總結(含JAVA代碼實現)
出現的次數 完全 放置 累加 有時 經典 整數 eap 分割 原文出處:http://mp.weixin.qq.com/s/feQDjby4uYGRLbYUJq7Lpg 0、排序算法說明 0.1 排序的定義 對一序列對象根據某個關鍵字進行排序。 0.2 術
十大經典排序演算法詳細總結(含JAVA程式碼實現)
文章目錄 十大經典排序演算法詳細總結(含JAVA程式碼實現) 0、排序演算法說明 1、氣泡排序(Bubble Sort) 2、選擇排序(Selection Sort) 3、插入排序(Insertion Sort) 4、希爾
java.io.Serializable(序列化)介面詳細總結
一、前言 在參加工作後,做的第一個專案是電商專案。當時不會做專案,只能照貓畫虎。其中一個VO類為何要實現Serializable介面一直沒有理解,不實現這個Serializable,會報錯。如下是隨手寫的一個VO類Person.java: import
萬字長文詳細總結!關於繼承、重寫與過載、封裝、介面的硬核乾貨
![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20210131200205437.jpg?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmN
chrome谷歌瀏覽器-DevTool開發者工具-詳細總結
相關 tail justify 任務管理器 log 兩個 停用 表格 需要 chrome的開發者工具可以說是十分強大了,是web開發者的一大利器,作為我個人而言平時用到的幾率很大,相信大家也很常見,但是不要僅僅停留在點選元素看看樣式的層面上哦,跟著我的總結一起學習實踐一下
python3、ipython3、setup-tools、pip等環境搭建詳細總結
python第一個python腳本:[[email protected] ~]# cat helloworld.py print("hello world")[[email protected] ~]# python helloworld.py hello world 安裝python3及
MyBatis學習總結——實現關聯表查詢(轉)
得到 into primary 字符串 student prim oci ssr ret 原文鏈接:孤傲蒼狼 一、一對一關聯 1.1、提出需求 根據班級id查詢班級信息(帶老師的信息) 1.2、創建表和數據 創建一張教師表和班級表,這裏我們假設一個老師只負責教一個
數據庫的備份與還原系列——單表備份和恢復詳細完整實現
單表備份 單表還原 表定義備份還原 表數據備份還原 參考實現:https://www.percona.com/doc/percona-xtrabackup/LATEST/innobackupex/innobackupex_script.htmlRestoring Individual Tabl
《c++ const 詳細總結》--轉載
一次 public 語法 family end 函數 就是 類型 不變 C++中的const關鍵字的用法非常靈活,而使用const將大大改善程序的健壯性,本人根據各方面查到的資料進行總結如下,期望對朋友們有所幫助。 const 是C++中常用的類型修飾符,常類型是
關於post與get的詳細總結
而是 然而 傳遞 長度 史記 是我 com post 真的 近日,對爬蟲的使用增多,總結在日常使用中的一些問題,其中get與post的問題是最需要重點總結的。 首先,在以往的使用經驗中,get與post是最常用的兩種請求方式,最直觀的區別就是get會將參數放在url
Python函數詳細總結
代碼塊 lan BE ssi python 命名 param 不定 更改 函數定義 函數代碼塊以 def 關鍵詞開頭,後接函數標識符名稱和圓括號()。 任何傳入參數和自變量必須放在圓括號中間。圓括號之間可以用於定義參數。 函數的第一行語句可以選擇性地使用文檔字符串
Python dict(字典) 詳細總結
one dump TE port 數字 class 轉換 PE 持久 示例: d={ ‘name‘:‘yy‘ } key值判斷 d.has_key(‘name‘) #如果有key返回True d.get(‘name‘) #如果沒有key返回None 添加與更
Python運算符詳細總結
true 位運算 兩個 次方 等於 邏輯運算符 進制 位與 計算 算術運算符 運算符 描述 實例 + 加 - 兩個對象相加 a + b 輸出結果 30 - 減 - 得到負數或是一個數減去另一個數 a - b 輸出結果 -10 * 乘 - 兩個數相乘或是
MyISAM與InnoDB兩者之間區別與選擇,詳細總結,性能對比
執行過程 val 性能問題 全部 marked rain row 演示 也會 1、MyISAM:默認表類型,它是基於傳統的ISAM類型,ISAM是Indexed Sequential Access Method (有索引的順序訪問方法) 的縮寫,它是存儲記錄和文件的標準方法
java集合(List,Set,Map)詳細總結
麻煩 array map接口 安全 content 想要 鍵值 more san 集合的由來: 數組是長度是固定的,當添加的元素超過數組的長度時需要對數組重新定義,太麻煩了,java內部給我們提供了集合類,能存儲任意對象,長度是可以改變的,隨著元素的增加而增加,隨著元素
Java異常超詳細總結
ace 垃圾回收器 通過 代碼 異常 執行 int 隧道 面試 什麽是異常: 異常就是Java程序在運行過程中出現的錯誤。 騷話: 世界上最真情的相依就是你在try我在catch,無論你發什麽脾氣,我都靜靜接受,默默處理(這個可以不記) 異常繼承體系圖:
java操作樹莓派GPIO控制LED燈--結合springboot實現介面呼叫
1、概述 本文使用java結合springboot實現了對樹莓派GPIO介面的操作以達到控制LED燈的功能 2、pom檔案如下: <project xmlns="http://maven.apache.org/POM/4.0.0" &nb
Springmvc藉助SimpleUrlHandlerMapping實現介面開關功能
一、介面開關功能 1、可配置化,依賴配置中心 2、介面訪問許可權可控 3、springmvc不會掃描到,即不會直接的將介面暴露出去 二、介面開關使用場景 和業務沒什麼關係,主要方便查詢系統中的一些狀態資訊。比如系統的配置資訊,中介軟體的狀態資訊。這就需要寫一些特定的介面,不能對外直接