Storm集群組件和編程模型
Storm工作原理:
Storm是一個開源的分布式實時計算系統,常被稱為流式計算框架。什麽是流式計算呢?通俗來講,流式計算顧名思義:數據流源源不斷的來,一邊來,一邊計算結果,再進入下一個流。
比如一般金融系統一直不斷的執行,金融交易、用戶全部行為都記錄進日誌裏,日誌分析出站點運維、獵戶信息。海量數據使得單節點處理只是來。所以就用到分布式計算機型,storm 是當中的典型代表之中的一個,一般應用場景是:中間使用一個消息隊列系統如kafka,先將消息緩存起來,storm 中有非常多的節點,分布式並行執行處理程序,進行數據處理。
Storm 集群組件:
集群角色:
Nimbus:集群主節點。主要負責
Supervisor:集群從節點。主要負責啟動、停止業務邏輯組件程序進程
主從節點之間通過zookeeper集群進行連接,主從節點之間是fail-fast(java的一種錯誤機制)、無狀態的,主從節點的狀態信息均保存到zookeeper中或者本地硬盤裏。
這種優點就在於,哪怕是主節點kill掉了,storm會自己主動起一個備份主節點。由於無狀態的關系,所以隨意一個節點都能夠充當Nimbus一角。
這種設計使得storm十分穩定。【譯自apache storm官網】
Storm 編程模型
Topology
業務處理模型
Spout
數據源組件。用於獲取數據,可通過文件或者消息隊列【kafka、activeMQ】中獲取數據
Bolt
邏輯處理組件
簡單理解,topology【拓撲結構】就是包括了數據源、邏輯處理組件的一個外在集合框架,使用storm能夠定義一個topology裏set多少個數據源組件。多少個邏輯處理組件。
以下通過demo來詳細解釋Storm編程模型的幾個主要元組
比如如今須要對一組數據進行處理,將數據中全部的英文轉成大寫,再加上標識後綴,最後保存到本地文本中。當然這僅僅是一個特別簡單的數據處理邏輯。僅用於幫助大家理解Storm編程模型。 那依據Storm的編程模型。實現這個數據處理需求須要建立1個數據源Spout組件。2個業務邏輯組件Bolt,以及一個Topology結構,將這3個組件增加到這個topology結構中。
public class RandomSpout extends BaseRichSpout{ SpoutOutputCollector collector=null; String[] goods={"iphone","xiaomi","meizu","zhongxing","huawei","moto","sumsung","simens"}; /* * 獲取消息並發送給下一個組件的方法。會被storm 不斷地調用 * 從goods 數組中隨機獲取一個商品名封裝到tuple中去 */ @Override public void nextTuple() { Random random=new Random(); String good=goods[random.nextInt(goods.length)]; //封裝到tuple中發送給下一個組件 collector.emit(new Values(good)); } //進行初始化,僅僅在開始時調用一次 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector=collector; } /* * 定義tunple的schema * */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("src_word")); } }
數據源Spout組件通過繼承Storm基類。重寫三個最核心的方法,各自是open、nextTuple、和delcare方法。open是在將運行數據傳遞之前所運行的方法,用於初始化數據。nextTuple中核心方法就是collector的emit方法,用於將數據傳遞給下一個元組。delcare用於成名元組傳遞、接收數據的格式,能夠簡單的理解為給傳遞的數據加上一個標識鍵。
public class UpperBolt extends BaseBasicBolt { //每來一個消息元組tuple,都會被運行一次該方法 @Override public void execute(Tuple tuple,BasicOutputCollector collector) { //從tuple 中拿到數據--原始商品名 String src_word=tuple.getString(0);//獲取下標第一個消息 String upper=src_word.toUpperCase(); //發送出去 collector.emit(new Values(upper)); } //給消息申明一個字段名 @Override public void declareOutputFields(OutputFieldsDeclarer declare) { declare.declare(new Fields("upper")); } }
這個邏輯處理bolt 用於將spout數據源組件中傳遞的元組轉成大寫格式,先獲取tuple的數據,然後emit發送給下一個元組。
/* * 給商品名稱加入後綴。然後寫入文件裏 */ public class SuffixBolt extends BaseBasicBolt{ FileWriter file =null; @Override public void prepare(Map stormConf, TopologyContext context) { try { file = new FileWriter("D://eclipse_plugin"+UUID.randomUUID()); } catch (IOException e) { e.printStackTrace(); } } //每一次執行都去new 一個writer 。應該在調用excute 之前先把writer 初始化好==持續執行 @Override public void execute(Tuple tuple, BasicOutputCollector collector) { //從消息元組中拿到上一個組件發送過來的數據 String upper=tuple.getString(0); String result=upper +"_suffix"; try { file.append(result); file.append("/n"); } catch (IOException e) { e.printStackTrace(); } } //聲明該組件要發送出去的tuple的字段定義 @Override public void declareOutputFields(OutputFieldsDeclarer declare) { } }
bolt和spout一樣,繼承storm基類之後,也會有prepare方法用於準備數據,初始化一些對象;excute方法則是每每傳遞過來一個元組。便會觸發運行一次。這個bolt的作用在於將上一個元組傳遞過來的數據加上後綴處理,然後寫入本地文件裏。
那麽。寫好了這些基礎的數據源和業務邏輯處理元組,怎樣組織他們的數據傳遞關系。這就是Topology類的職責。
/* * 描寫敘述topology的結構,以及創建topology並提交給集群 */ public class TopoMain { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { TopologyBuilder builder=new TopologyBuilder(); //設置消息源組件 4表示spout進程個數 builder.setSpout("randomSpout", new RandomSpout(),4); //設置邏輯處理組件 //shuffleGrouping 指定接收哪個組件傳過來的消息 builder.setBolt("upper", new UpperBolt(),4).shuffleGrouping("randomSpout"); builder.setBolt("result", new SuffixBolt(),4).shuffleGrouping("upper"); //創建一個topology StormTopology topology=builder.createTopology(); Config config=new Config(); config.setNumWorkers(4);//設置進程個數 config.setDebug(true);//設置調試狀態 config.setNumAckers(0);//消息應答器,事務性不是非常強。可設置為0 //提交topology到storm 定義一個名稱。好在集群裏去標識;通過配置對象傳遞參數給集群,集群依據這些參數,任務調度進行調整 StormSubmitter.submitTopology("demotopo", config, topology); } }
Topology類便將之前編寫的1個spout 和2個bolt組裝到一個topology中。並通過追加shuffleGrouping方法設置了他們之間的數據傳遞方向,以及進程個數。
通過這個實例應該對storm的編程模型和編碼流程有了簡單的認識。
但這僅僅是storm的大山一小角,比如zookeeper對storm集群主從節點的管理、storm與消息中間件的結合處理海量數據。復雜的數據處理流程。這些才是storm真正大展身手的地方。
Storm集群組件和編程模型