1. 程式人生 > >Storm集群組件和編程模型

Storm集群組件和編程模型

連接 上傳 應用程序 系統 本地文件 src == 基礎 字段名



Storm工作原理:

Storm是一個開源的分布式實時計算系統,常被稱為流式計算框架。什麽是流式計算呢?通俗來講,流式計算顧名思義:數據流源源不斷的來,一邊來,一邊計算結果,再進入下一個流。

比如一般金融系統一直不斷的執行,金融交易、用戶全部行為都記錄進日誌裏,日誌分析出站點運維、獵戶信息。海量數據使得單節點處理只是來。所以就用到分布式計算機型,storm 是當中的典型代表之中的一個,一般應用場景是:中間使用一個消息隊列系統如kafka,先將消息緩存起來,storm 中有非常多的節點,分布式並行執行處理程序,進行數據處理。

僅僅要不是人為幹預。storm 就一直實時不斷地進行數據處理。值得註意的是:並非storm去處理,而是它能夠將我們程序的非常多jar包。業務程序,同一時候放到不同的server中並發的執行, 終於得到的結果就是不同系統的海量數據就會分散到不同的server中並發的進行處理,負載能力非常強。 所以真正進行數據處理的是我們寫好的數據處理程序,storm的強大作用之中的一個就是它為這些程序提供了執行溫床,將應用程序上傳到storm 集群中,在多臺機器上並發執行,這樣就能夠擴展程序的負載處理能力實現流式計算。

技術分享

Storm 集群組件:

集群角色:

Nimbus:集群主節點。主要負責

任務分配、響應client提交topology請求以及任務失敗的調度

Supervisor:集群從節點。主要負責啟動、停止業務邏輯組件程序進程

主從節點之間通過zookeeper集群進行連接,主從節點之間是fail-fastjava的一種錯誤機制)、無狀態的,主從節點的狀態信息均保存到zookeeper中或者本地硬盤裏。

這種優點就在於,哪怕是主節點kill掉了,storm會自己主動起一個備份主節點。由於無狀態的關系,所以隨意一個節點都能夠充當Nimbus一角。

這種設計使得storm十分穩定。【譯自apache storm官網】

技術分享

Storm 編程模型

Topology

業務處理模型

Spout

數據源組件。用於獲取數據,可通過文件或者消息隊列【kafkaactiveMQ】中獲取數據

Bolt

邏輯處理組件

技術分享

簡單理解,topology【拓撲結構】就是包括了數據源、邏輯處理組件的一個外在集合框架,使用storm能夠定義一個topologyset多少個數據源組件。多少個邏輯處理組件。

以下通過demo來詳細解釋Storm編程模型的幾個主要元組

比如如今須要對一組數據進行處理,將數據中全部的英文轉成大寫,再加上標識後綴,最後保存到本地文本中。當然這僅僅是一個特別簡單的數據處理邏輯。僅用於幫助大家理解Storm編程模型。 那依據Storm的編程模型。實現這個數據處理需求須要建立1個數據源Spout組件。2個業務邏輯組件Bolt,以及一個Topology結構,將這3個組件增加到這個topology結構中。

public class RandomSpout extends BaseRichSpout{
	SpoutOutputCollector collector=null;
	String[] goods={"iphone","xiaomi","meizu","zhongxing","huawei","moto","sumsung","simens"};
	/*
	 * 獲取消息並發送給下一個組件的方法。會被storm 不斷地調用
	 * 從goods 數組中隨機獲取一個商品名封裝到tuple中去
	 */
	@Override
	public void nextTuple() {
		Random random=new Random();
		String good=goods[random.nextInt(goods.length)];
		
		//封裝到tuple中發送給下一個組件
		collector.emit(new Values(good));
	}

	//進行初始化,僅僅在開始時調用一次
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		this.collector=collector;		
	}
	/*
	 * 定義tunple的schema
	 * 
	 */
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("src_word"));
	}
}

數據源Spout組件通過繼承Storm基類。重寫三個最核心的方法,各自是open、nextTuple、和delcare方法。open是在將運行數據傳遞之前所運行的方法,用於初始化數據。nextTuple中核心方法就是collector的emit方法,用於將數據傳遞給下一個元組。delcare用於成名元組傳遞、接收數據的格式,能夠簡單的理解為給傳遞的數據加上一個標識鍵。

public class UpperBolt extends BaseBasicBolt {

	//每來一個消息元組tuple,都會被運行一次該方法
	@Override
	public void execute(Tuple tuple,BasicOutputCollector collector) {
		//從tuple 中拿到數據--原始商品名
		String src_word=tuple.getString(0);//獲取下標第一個消息
		String  upper=src_word.toUpperCase();
		//發送出去
		collector.emit(new Values(upper));		
	}
	//給消息申明一個字段名
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declare) {
		declare.declare(new Fields("upper"));
	}
}

這個邏輯處理bolt 用於將spout數據源組件中傳遞的元組轉成大寫格式,先獲取tuple的數據,然後emit發送給下一個元組。

/*
 * 給商品名稱加入後綴。然後寫入文件裏
 */
public class SuffixBolt extends BaseBasicBolt{
	FileWriter file =null;
	@Override
	public void prepare(Map stormConf, TopologyContext context) {
		try {
			 file = new FileWriter("D://eclipse_plugin"+UUID.randomUUID());
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	//每一次執行都去new 一個writer 。應該在調用excute 之前先把writer 初始化好==持續執行
	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		//從消息元組中拿到上一個組件發送過來的數據
		String upper=tuple.getString(0);
		String result=upper +"_suffix";
			try {
				file.append(result);
				file.append("/n");
			} catch (IOException e) {
			
				e.printStackTrace();
			}		
	}
	//聲明該組件要發送出去的tuple的字段定義
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declare) {
	}
}

bolt和spout一樣,繼承storm基類之後,也會有prepare方法用於準備數據,初始化一些對象;excute方法則是每每傳遞過來一個元組。便會觸發運行一次。這個bolt的作用在於將上一個元組傳遞過來的數據加上後綴處理,然後寫入本地文件裏。

那麽。寫好了這些基礎的數據源和業務邏輯處理元組,怎樣組織他們的數據傳遞關系。這就是Topology類的職責。

/*
 * 描寫敘述topology的結構,以及創建topology並提交給集群
 */
public class TopoMain {
	public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
		TopologyBuilder builder=new TopologyBuilder();
		
		//設置消息源組件  4表示spout進程個數
		builder.setSpout("randomSpout", new RandomSpout(),4);
		
		//設置邏輯處理組件
		//shuffleGrouping 指定接收哪個組件傳過來的消息
		builder.setBolt("upper", new UpperBolt(),4).shuffleGrouping("randomSpout");
		builder.setBolt("result", new SuffixBolt(),4).shuffleGrouping("upper");
				
		//創建一個topology
		StormTopology topology=builder.createTopology();
		
		Config config=new Config();
		config.setNumWorkers(4);//設置進程個數
		config.setDebug(true);//設置調試狀態
		config.setNumAckers(0);//消息應答器,事務性不是非常強。可設置為0
		
//提交topology到storm  定義一個名稱。好在集群裏去標識;通過配置對象傳遞參數給集群,集群依據這些參數,任務調度進行調整	
		StormSubmitter.submitTopology("demotopo", config, topology);
	}
}

Topology類便將之前編寫的1個spout 和2個bolt組裝到一個topology中。並通過追加shuffleGrouping方法設置了他們之間的數據傳遞方向,以及進程個數。

通過這個實例應該對storm的編程模型和編碼流程有了簡單的認識。

但這僅僅是storm的大山一小角,比如zookeeper對storm集群主從節點的管理、storm與消息中間件的結合處理海量數據。復雜的數據處理流程。這些才是storm真正大展身手的地方。



Storm集群組件和編程模型