Storm雜談之Topology的啟動過程(二)
在一中講到了topology提交給nimbus
nimbus
Nimbus可以 說是storm中最核心的部分,它的主要功能有兩個:
- 對Topology的任務進行分配資源
- 接收使用者的命令並做相應的處理,如Topology的提交,殺死,啟用等等
服務介面的定義都在storm.thrift檔案中定義,貼下部分程式碼:
service Nimbus { void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite); void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite); void killTopology(1: string name) throws (1: NotAliveException e); void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e); void activate(1: string name) throws (1: NotAliveException e); void deactivate(1: string name) throws (1: NotAliveException e); void rebalance(1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e, 2: InvalidTopologyException ite); // need to add functions for asking about status of storms, what nodes they're running on, looking at task logs string beginFileUpload(); void uploadChunk(1: string location, 2: binary chunk); void finishFileUpload(1: string location); string beginFileDownload(1: string file); //can stop downloading chunks when receive 0-length byte array back binary downloadChunk(1: string id); // returns json string getNimbusConf(); // stats functions ClusterSummary getClusterInfo(); TopologyInfo getTopologyInfo(1: string id) throws (1: NotAliveException e); //returns json string getTopologyConf(1: string id) throws (1: NotAliveException e); StormTopology getTopology(1: string id) throws (1: NotAliveException e); StormTopology getUserTopology(1: string id) throws (1: NotAliveException e); }
當執行命令 nohup ${STORM_HOME}/bin/storm nimbus & 時,會啟動nimbus服務,具體的程式碼執行:
storm python指令碼程式碼,預設啟動backtype.storm.daemon.nimbus程式:
def nimbus(klass="backtype.storm.daemon.nimbus"): """Syntax: [storm nimbus] Launches the nimbus daemon. This command should be run under supervision with a tool like daemontools or monit. See Setting up a Storm cluster for more information. (http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster) """ cppaths = [CLUSTER_CONF_DIR] jvmopts = parse_args(confvalue("nimbus.childopts", cppaths)) + [ "-Dlogfile.name=nimbus.log", "-Dlogback.configurationFile=" + STORM_DIR + "/logback/cluster.xml", ] exec_storm_class( klass, jvmtype="-server", extrajars=cppaths, jvmopts=jvmopts)
然後執行nimbus.clj 指令碼,主要涉及兩個方法——launch-server!(nimbus的啟動入口)和service-handler(真正定義處理邏輯的地方)。
nimbus啟動後,對外提供了一些服務,topology的提交,UI資訊,topology的kill,rebalance等等。在文章一中講到提交topology給nimbus,這些服務的處理邏輯全部在service-handler方法中。以下擷取service-handler裡面處理提交Topology的邏輯
檢查Topology的DAG圖是否是有效連線圖、以及該topology Name是否已經存在,然後分配資源和任務排程(mk-assignments )方法,等分配好資源之後,把資料寫入到zookeeper,watcher發現有資料,就通知supervisor讀取資料啟動新的worker,一個worker就是一個JVM程序,worker啟動後就會按照使用者事先定好的task數來啟動task,一個task就是一個thread(reify Nimbus$Iface (^void submitTopologyWithOpts [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology ^SubmitOptions submitOptions] (try (assert (not-nil? submitOptions)) (validate-topology-name! storm-name) (check-storm-active! nimbus storm-name false) (let [topo-conf (from-json serializedConf)] (try (validate-configs-with-schemas topo-conf) (catch IllegalArgumentException ex (throw (InvalidTopologyException. (.getMessage ex))))) (.validate ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus) storm-name topo-conf topology)) (swap! (:submitted-count nimbus) inc) (let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs)) storm-conf (normalize-conf conf (-> serializedConf from-json (assoc STORM-ID storm-id) (assoc TOPOLOGY-NAME storm-name)) topology) total-storm-conf (merge conf storm-conf) topology (normalize-topology total-storm-conf topology) storm-cluster-state (:storm-cluster-state nimbus)] (system-topology! total-storm-conf topology) ;; this validates the structure of the topology (log-message "Received topology submission for " storm-name " with conf " storm-conf) ;; lock protects against multiple topologies being submitted at once and ;; cleanup thread killing topology in b/w assignment and starting the topology (locking (:submit-lock nimbus) (setup-storm-code conf storm-id uploadedJarLocation storm-conf topology) (.setup-heartbeats! storm-cluster-state storm-id) (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive TopologyInitialStatus/ACTIVE :active}] (start-storm nimbus storm-name storm-id (thrift-status->kw-status (.get_initial_status submitOptions)))) (mk-assignments nimbus))) (catch Throwable e (log-warn-error e "Topology submission exception. (topology name='" storm-name "')") (throw e)))) (^void submitTopology [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology] (.submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology (SubmitOptions. TopologyInitialStatus/ACTIVE)))
在executor.clj中mk-threads: spout ,mk-threads: bolt方法就是啟動task,而task就是對應的spout或bolt 元件,而且這時Spout的open,nextTuple方法,以及bolt的preapre,execute方法都是在這裡被呼叫的,結合文章一中提到的,對於
Spout 方法呼叫順序:
declareOutputFields-> open -> nextTuple -> fail/ack or other
Bolt 方法呼叫順序:
declareOutputFields-> prepare -> execute
需要的注意的是在Spout中fail、ack方法和nextTuple是在同一執行緒中被順序呼叫的,所以在nextTuple中不要做延遲很大的操作。
至此,一個topology算是可以正式啟動工作了。
相關推薦
Storm雜談之Topology的啟動過程(二)
在一中講到了topology提交給nimbus nimbus Nimbus可以 說是storm中最核心的部分,它的主要功能有兩個: 對Topology的任務進行分配資源接收使用者的命令並做相應的處理,如Topology的提交,殺死,啟用等等Nimbus本身是基於Thr
elasticsearch原始碼分析之啟動過程(二)
最近開始廣泛的使用elasticsearch,也開始寫一些java程式碼了,為了提高java程式碼能力,也為了更加深入一點了解elasticsearch的內部運作機制,所以開始看一些elasticsearch的原始碼了。對於這種廣受追捧的開源專案,細細品讀一定會受益匪淺,
hadoop啟動過程(二)secondNameNode
作用:定期將namenode的fsimage和edits合併(資料或者操作不多的時候可以關閉 ),可加速hdfs啟動(如果edits很多的話,開啟會很難) SecondNameNode: 它會定期的和namenode就行通訊來完成整個的備份操作(????更
AndroidO audio系統之audioflinger啟動分析(二)
1. audioflinger建立過程 在Android8.0的音訊系統中,AudioFlinger是一個C++的Binder服務,執行在HAL程序中,它是在audioserver.c //frameworks/av/media/audioserver/
Spring啟動流程(二)之Spring載入Bean Definition的流程
繼上篇Spring啟動流程(一) prepareRefresh() prepareRefresh();//初始化配置和環境 obtainFreshBeanFactory() // Tell the subclass to refresh the internal bean fac
Node.js之在服務端啟動網頁(二)
現在我們學習一下關於檔案路徑的相關服務。 http://127.0.0.1 這是網路路徑 var http = require('http');//建立伺服器的 var fs = require('fs'); var path = require('path'); //引
大資料叢集搭建之節點的網路配置過程(二)
緊接著上一章來設定windows的vmnet8的ip地址和虛擬機器中centos的ip地址。 NAT虛擬網路的配置圖如下圖所示: 1、這裡根據VMware中得到的閘道器地址去設定vmnet8的ip地址。 閘道器地址檢視: 2、得到的閘道器地址後去
Android系統啟動流程(二)解析Zygote程序啟動過程
前言 上一篇文章我們分析了init程序的啟動過程,啟動過程中主要做了三件事,其中一件就是建立了Zygote程序,那麼Zygote程序是什麼,它做了哪些事呢?這篇文章會給你這些問題的答案。 1.Zygote簡介 在Android系統中,DVM(D
Linux載入啟動可執行程式的過程(二)直譯器完成動態連結
接著上一篇部落格。前面的工作都是在核心完成的,接下來會回到使用者空間。第一步,直譯器(也可以叫動態連結器)首先檢查可執行程式所依賴的共享庫,並在需要的時候對其進行載入。ELF 檔案有一個特別的節區: .dynamic,它存放了和動態連結相關的很多資訊,例如動態連結器通過它找到
Oracle儲存過程之merge into 函式(二)
今天主要說明的是merger into 的各個層次關係,這個對於寫儲存過程非常重要!!!希望對大家有所幫助。 首先貼出的是今天寫的一些錯誤SQL: 執行提示無效的SQL語句!!! 第一個錯誤,單獨寫儲存過程來測試的話,第一句是不需要的,這個大家切
從壹開始微服務 [ DDD ] 之十一 ║ 基於原始碼分析,命令分發的過程(二)
緣起 哈嘍小夥伴週三好,老張又來啦,DDD領域驅動設計的第二個D也快說完了,下一個系列我也在考慮之中,是 Id4 還是 Dockers 還沒有想好,甚至昨天我還想,下一步是不是可以寫一個簡單的Angular 入門教程,本來是想來個前後端分離的教學視訊的,簡單試了試,發現自己的聲音不好聽,真心不好聽那種,就作
OpenStack原始碼分析之Nova-Compute服務啟動過程(icehouse)
學習OpenStack有半年多了,一直都停留在使用和trouble shooting的階段,最近有時間來好好研究了一下程式碼,因為以前是C++/Windows出生的,所以對Linux下面的Python開發不是很熟悉,本文適合一些已經使用過OpenStack並且想要初步瞭解程
神奇的 SQL 之 聯表細節 → MySQL JOIN 的執行過程(二)
開心一刻 一頭母牛在吃草,突然一頭公牛從遠處狂奔而來說:“快跑啊!!樓主來了!” 母牛說:“樓主來了關我屁事啊?” 公牛急忙說:“樓主吹牛逼呀!” 母牛大驚,拔腿就跑,邊跑邊問:“你是公牛你怕什麼啊?&
精盡MyBatis原始碼分析 - SQL執行過程(二)之 StatementHandler
> 該系列文件是本人在學習 Mybatis 的原始碼過程中總結下來的,可能對讀者不太友好,請結合我的原始碼註釋([Mybatis原始碼分析 GitHub 地址](https://github.com/liu844869663/mybatis-3)、[Mybatis-Spring 原始碼分析 GitHub 地址
STL之set具體解釋(二)
基本操作 二叉樹 mono itl 自己 pair leading 左右子樹 ews 首先來看看set集合容器: set集合容器實現了紅黑樹的平衡二叉樹數據結構。在插入元素時它會自己主動調整二叉樹的排列,把該元素放到適當的位置,而且 保證左右子樹平衡。平衡二
性能測試從0到1的過程(二)
sleep efi python腳本 size sel 應用服務器 完全 展示 服務 本人,從畢業開始接觸測試,但是性能方面一無所知。之前在第一份工作,測過安卓客戶端,當時寫過一個非常簡單的shell腳本,push到手機系統內,用於手機硬件信息。但是在服務端的性能方面,
LIVE555研究之五:RTPServer(二)
tpch live555 循環調用 family 每一個 函數 計算 ack close LIVE555研究之五:RTPServer(二) 接上文,main函數的幾行代碼創建了RTSPSe
springboot啟動過程(1)-初始化
好的 事件監聽 spa 兩個 包括 servlet 實例對象 ice 機制 1 springboot啟動時,[email protected]/* */函數,執行SpringApplication.run(DemoApplication.class, arg
jenkins實戰之jenkins安裝部署(二)
自動化運維 上一小節介紹了Jenkins安裝(Linux/uninx平臺),這節我們講講Jenkins界面操作(包括系統設置,工具安裝,插件管理,系統升級,安全設置等等操作); 登錄jenkins首頁,分別有以下選項欄,從左側看起,點擊Jenkins系統管理我們會看到右側list欄,內
Jmeter之Bean shell使用(二)
.get 方法 tro 邏輯 麻煩 str www title jar 上一篇Jmeter之Bean shell使用(一)簡單介紹了下Jmeter中的Bean shell,本文是對上文的一個補充,主要總結下常用的幾種場景和方法,相信這些基本可以涵蓋大部分的需求。本節內容如