Storm之Spout、Bolt、Topology元件
Strom使用元組作為資料模型,元組就是一組命名的值,元組中的每個欄位都可以是任何型別的物件。Storm支援所有基本型別,string和byte陣列作為元組欄位值。如果要使用自己定義的型別,也只需要為你自己定義的型別實現並且註冊一個serializer即可。每個節點還必須要為輸出的元組定義欄位名稱。
Spout要麼繼承BaseRichSpout要麼實現IRichSpout和IComponent介面,對與實現來說主要是實現以下這些函式(參考TestWordSpout的實現):
void open(java.util.Map conf,
TopologyContext context,
SpoutOutputCollector collector)
當一個Supervisor初始化該Spout元件時呼叫,提供Spout執行所必需的環境
引數
conf - Storm關於這個Spout的配置
context - 這個配置被用來獲取該Spout任務的資訊,包括任務id,元件id,輸入輸出資訊等等
collector - 用來從這個Spout裡傳送元組,元組可以在任何時間裡傳送,包括open和close函式裡。collector是執行緒安全的,應該被作為一
個例項物件儲存到Spout物件裡
void declareOutputFields(OutputFieldsDeclarer declarer)
定義topology裡的Stream的schema
declarer - 定義輸出stream的ids,輸出的欄位,輸出stream是不是直接stream(direct stream)
java.util.Map<java.lang.String,java.lang.Object> getComponentConfiguration()
定義該元件的配置
void ack(java.lang.Object msgId)
以msgId訊息告訴Storm這個Spout已經成功輸出了該元組
void activate()
啟用Spout,Spout從deactivate模式轉化為activate模式,Spout開始呼叫nextTuple輸出資料。
void close()
關閉Spout
void deactivate()
解除啟用Spout,Spout從activate模式轉化為deactivate模式,Spout停止呼叫nextTuple輸出資料
void fail(java.lang.Object msgId)
以msgId訊息告訴Storm這個Spout輸出該元組失敗,主要用於將該元組重新放回訊息佇列,以在一段時間後重發該元組
void nextTuple()
呼叫該函式請求Storm傳送元組到Output Collector,這個函式不應該是阻塞的,當沒有元組傳送時,一般呼叫sleep,以充分利用CPU
2.Bolt的實現
最主要的三個函式是,其餘的關於元件介面的函式和Spout的實現是一樣的,這裡就不說了
void prepare(java.util.Map stormConf,
TopologyContext context,
OutputCollector collector)
和Spout的open函式的作用類似,在Bolt元件初始化的時候呼叫,提供Bolt所必需的環境
void execute(Tuple input)
處理單個輸入的元組,元組物件包含了從元件/流/任務得來的元資料。元組的值通過Tuple#getValue訪問,Bolt並不需要馬上處理元組,可以先將資料儲存在合適的時間處理。Bolt使用在prepare函式中得到的OutputCollector物件輸出元組,必須在這個函式裡面確保使用OutputCollector#ack或者OutputCollector#fail告知Storm已經處理成功或者處理失敗,否則Storm將無法確定Spout裡元組是否已經被處理完成。
void cleanup()
當Bolt要關閉的時候呼叫,但是不能保證該函式一定可以被呼叫,當使用kill -9命令殺死工作程序時該函式就無法呼叫,一般用於local mode下清理使用
3.Topology構建
構建相當直接,使用TopologyBuilder構建,如例子中的main函式的程式碼所示。TopologyBuilder#setSpout設定Topology的Spout,使用TopologyBuilder#setBolt設定Topology的Bolt。
其中
public BoltDeclarer setBolt(java.lang.String id,
IBasicBolt bolt,
java.lang.Number parallelism_hint)
id-需要消費該元件輸出的流的元件用來識別該元件的唯一標識
bolt-該節點處理資料的Bolt
parallelism_hint-用來執行該Bolt的任務的數量,每個任務會在叢集的某個程序的某個執行緒裡面執行
其中BoltDeclarer中包含了很多元組從一個節點怎麼對映到另一個節點的規則,例子中的builder.setBolt("exclaim1", new ExclamationBolt(), 3)
.shuffleGrouping("words")表示設定exclaim1節點的Bolt為ExclamationBolt,並行度為3,從words節點到exclaim1節點使用隨機散發規則。
public SpoutDeclarer setSpout(java.lang.String id,
IRichSpout spout,
java.lang.Number parallelism_hint)
id-需要消費該元件輸出的流的元件用來識別該元件的唯一標識
spout-Spout類
parallelism_hint-用來執行該Bolt的任務的數量,每個任務會在叢集的某個程序的某個執行緒裡面執行
相關推薦
Storm之Spout、Bolt、Topology元件
1.Spout實現 Strom使用元組作為資料模型,元組就是一組命名的值,元組中的每個欄位都可以是任何型別的物件。Storm支援所有基本型別,string和byte陣列作為元組欄位值。如果要使用自己定義的型別,也只需要為你自己定義的型別實現並且註冊一個serializer
Storm中Spout和Bolt的生命週期
1、在定義Topology例項過程中,定義好Spout例項和Bolt例項2、在提交Topology例項給Nimbus的過程中,會呼叫TopologyBuilder例項的createTopology()方法,以獲取定義的Topology例項。在執行createTopology
Storm拓撲,元件之spout、bolt,並行策略
軟體版本:Storm:0.9.3 ,Redis:2.8.19;jedis:2.6.2;參考:http://storm.apache.org/documentation/Understanding-the-parallelism-of-a-Storm-topology.html
Storm應用系列之——Spout、Bolt API
前言: 昨天有朋友聊天說,我寫的前面三篇太簡單了,沒有太多深入的東西。好吧,這說明我的目的達到了。我寫這個系列的原因就是為了面向應用,進一步細化為兩點: 1. 以例子說話,由簡入深,一步步瞭解如何在Storm上開發應用,不會讀起來吃力;
小程式學習之旅----圖片image媒體元件camera、audio、video、live-player、live-pusher
<!--pages/image/image.wxml--> <text>這是一個image元件</text> <!-- <image src='../../images/0.jpg'></image> <image src='
小程式學習之旅----基礎內容元件icon 、text 、rich-text、progress
Page({ data: { motto: 'Hello World', nodes: [{ name: 'h2', attrs: { class: 'h2_class', style: 'line-height: 60p
元件傳值之父傳子、子傳父
父元件傳值給子元件 父元件 <template> <div id="app"> <h1>props使用方式</h1> <hello txt='元件txt' v-bind:ddd="btn
C#元件之errorProvider 、eventLog、helpProvider、performanceCounter
errorProvider 給控制元件繫結錯誤訊息 errorProvider1.SetError(textBox1, "錯誤提示訊息"); eventLog 寫入Windows事件日誌 EventLog類在System.Diagnostics名稱空間中。可以在“管理工具” > "
Android 四大元件之Service的啟動、繫結小述
一、概述 學習過Android的小夥伴就不可能不知道Service是什麼,因為Service是Android四大元件之一,聲名赫赫有木有,所以在這裡我就不詳細介紹了,本節主要還是充當筆記的作用,因為我待記性如初戀,記性虐我千百遍。 二、Service的建立 Service是一
Vue之父子元件間通訊例項講解(props、$ref、$emit)
元件是 vue.js 最強大的功能之一,而元件例項的作用域是相互獨立的,這就意味著不同元件之間的資料無法相互引用。那麼元件間如何通訊,也就成為了vue中重點知識了。這篇文章將會通過props、$ref和 $emit 這幾個知識點,來講解如何實現父子元件間通訊。 在說如何實現通訊
Android開發學習筆記(十二)基礎UI控制元件之ImageView、CheckBox、RadioButton
一、ImageView:直接繼承自View,它的作用是在介面上顯示Drawable物件。 ImageView在佈局檔案(如main_activity.xml)中常用的屬性 有 scaleType ,s
Rest_Framework之認證、許可權、頻率元件原始碼剖析
一:使用RestFramwork,定義一個檢視 from rest_framework.viewsets import ModelViewSet class BookView(ModelViewSet): queryset = Book.objects.all()
DRF之版本控制、認證和許可權元件
一、版本控制組件 1、為什麼要使用版本控制 首先我們開發專案是有多個版本的當我們專案越來越更新,版本就越來越多,我們不可能新的版本出了,以前舊的版本就不進行維護了像bootstrap有2、3、4版本的,每個版本都有它對應的url,https://v2.bootcss.com/ 、 https://v3.b
Android Jetpack架構元件之 Paging(使用、原始碼篇)
1、前言 最近簡單看了下google推出的框架Jetpack,感覺此框架的內容可以對平時的開發有很大的幫助,也可以解決很多開發中的問題,對程式碼的資料邏輯和UI介面深層解耦,實現資料驅動型的ui。 Android Architecture元件是Android Jetpac
vue之父子元件傳值、以及驗證父子元件傳值的合法性
vue之父子元件傳值 一、父元件給子元件傳值 1. 父元件給子元件傳值 注意:傳值時,傳遞的引數最好不要和子元件裡已有的引數名衝突。 並且也可以傳方法,傳方法時不要加擴號,擴號意味著執行。 而且還可以把父元件整個例項傳遞過去 1.1 傳遞屬性 a.父元件呼叫子元件的時候,繫結動態屬性;
多執行緒下的其它元件之CyclicBarrier、Callable、Future、FutureTask
public static class CyclicBarrierThread extends Thread { private CyclicBarrier cb; private int sleepSecond; public CyclicBarrierThre
Java多執行緒20:多執行緒下的其他元件之CyclicBarrier、Callable、Future和FutureTask
CyclicBarrier 接著講多執行緒下的其他元件,第一個要講的就是CyclicBarrier。CyclicBarrier從字面理解是指迴圈屏障,它可以協同多個執行緒,讓多個執行緒在這個屏障前等待,直到所有執行緒都達到了這個屏障時,再一
Java多執行緒19:多執行緒下的其他元件之CountDownLatch、Semaphore、Exchanger
前言 在多執行緒環境下,JDK給開發者提供了許多的元件供使用者使用(主要在java.util.concurrent下),使得使用者不需要再去關心在具體場景下要如何寫出同時兼顧執行緒安全性與高效率的程式碼。之前講過的執行緒池、BlockingQueue都是
何為面向元件程式設計?與之相比面向物件、面向服務又是什麼?
件技術和思想的出現都是為了解決所在的那個年代軟體開發的複雜性,物件技術和元件技術也不例外。當然還有很多其它技術,像DBC(契約式程式設計)、AOP、MDA等,這些思想都在影響程式的設計與實現方式。 面向物件、面向服務、面向元件,他們真實的叫法應該是:“面向物件程式設計”“
下拉框、下拉控制元件之Select2
一、Select2的功能簡介select2外掛給我們帶來了更加友好的互動方式,比如查詢控制元件展開後可通過關鍵字進行檢索例如: Select2也可以選擇帶查詢控制元件的選擇框... Select2更是支援多值選擇框... 二、如何使用