1. 程式人生 > >Storm拓撲,元件之spout、bolt,並行策略

Storm拓撲,元件之spout、bolt,並行策略

軟體版本:Storm:0.9.3 ,Redis:2.8.19;jedis:2.6.2;

參考:http://storm.apache.org/documentation/Understanding-the-parallelism-of-a-Storm-topology.html

一、Storm原理

   Storm簡述:Storm中有兩個元件:nimbus和supervisor,nimbus主要負責分配資源和schedule和協調任務,supervisor主要啟動worker,每個worker可以啟動一個到多個executor,一個executor可以啟動一個到多個task,(預設一個executor對應一個task)實際執行任務的是task。

二、Storm程式設計

1. Topology (拓撲)

 1.1 定義spout、bolt以及其關係

 //Topology definition
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("wc-spout",new WCSpout(),Integer.parseInt(args[2]));
		builder.setBolt("split-bolt", new SplitBolt(),Integer.parseInt(args[3]))
			.shuffleGrouping("wc-spout");
		builder.setBolt("count-bolt", new CountBolt(),Integer.parseInt(args[4]))
			.fieldsGrouping("split-bolt", new Fields("word"));

在設定spout和bolt的時候還可以設定並行的個數,即executor的個數,當然也可以設定task的個數,如下程式碼,兩個executor,四個task,則每個executor配置兩個task。

topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
               .setNumTasks(4)
               .shuffleGrouping("blue-spout);
<程式碼來自:http://storm.apache.org/documentation/Understanding-the-parallelism-of-a-Storm-topology.html
>

1.2 配置引數項(在spout和bolt中可通過此項設定獲取引數值)
// 定義Configuration
		Config conf = new Config();
		conf.put("storeFrequent", Long.parseLong(args[0]));
		conf.put("slow_fast", args[5]);
		conf.put("printWC", args[6]);
		conf.setNumWorkers(Integer.parseInt(args[1]));
上面的程式碼除了設定三個變數值之外,還設定了worker的數量;

1.3 提交Topology
叢集提交方式:

// 提交任務
		StormSubmitter.submitTopology("wc-redis", conf,builder. createTopology());

其中的“wc-redis”是Topology的名字,後面兩個是基本的模式寫法,可以就按照上面的寫即可。

單機提交方式:

LocalCluster cluster = new LocalCluster();
 cluster. submitTopology("wc-redis", conf,
 builder. createTopology());
2. Spout 

2.1 繼承BasiRichSpout

     繼承這個類後,定義一個域變數SpoutOutputCollector collector,這個用於輸出;

2.2 覆寫open方法
     

public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
在這裡首先使用collector來初始化之前定義的域變數,如果有需要獲取的引數值可以從conf中獲取,context裡面是當前的spout的相關資訊上下文;

2.3 覆寫nextTuple

在這個方法裡面使用collector.emit方法即可進行輸出,一般使用下面的方式即可:

public List<Integer> emit(List<Object> tuple) {
        return emit(tuple, null);
    }
如果要求容錯基本較高,可以使用一個streamid的方式進行輸出,如下:
 public List<Integer> emit(String streamId, List<Object> tuple) {
        return emit(streamId, tuple, null);
    }
2.4 覆寫declareOutputFields方法

一般定義一個field名字即可,如下:

@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("line"));
	}
3. Bolt

3.1 繼承BaseBasicBolt類

3.2 覆寫prepare方法

public void prepare(@SuppressWarnings("rawtypes") Map conf,TopologyContext context)
這個方法類似Spout的open方法,進行一些初始化或獲取引數值的操作;

3.3 覆寫exec方法

public void execute(Tuple input, BasicOutputCollector collector) 
這個方法的input即是從Spout中輸出的資料,通過對這個資料進行處理,然後使用collector.emit方法進行輸出,可以是輸出到下一個Bolt的處理,作為下一個Bolt的輸入。

3.4 覆寫declareOutputFields方法

這個方法和Spout的declareOutputFields方法類似,宣告field的名字。

4. 提交執行

storm jar wc.jar test.TopologyMain
三、Storm並行策略驗證
1. worker只是用來分配各個元件的,包括Spout和Bolt。

比如分配了一個worker,然後一個Spout S分配兩個executor,一個Bolt A分配一個executor,另一個Bolt B分配一個executor,那麼這些executor一共有4個task(使用預設一個executor對應一個task),就會全部分配在一個worker上。

如果分配了兩個worker,還按上的元件分配,那麼可能worker 1上面分配了一個Spout S的executor和Bolt A的executor,worker 2 上面分配了一個Spout S的另一個executor,和Bolt B的executor。

2. Spout executor並行

Spout如果單單設定executor的並行個數,那麼其輸出可能是有重複的,這樣的並行策略是有問題的。

比如下面的Spout:

package wc.redis.spout;

import java.util.Map;

import redis.clients.jedis.Jedis;
import wc.redis.util.RedisUtils;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class WCSpout extends BaseRichSpout {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	private SpoutOutputCollector collector;
	private Jedis jedis;
	Integer taskId;
	String conponentId;
	String slow_fast;
	@Override
	public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
		this.collector = collector;
		slow_fast = (String)conf.get("slow_fast");
		jedis = RedisUtils.connect(RedisUtils.HOSTNAME, RedisUtils.PORT, RedisUtils.INSERT_DB);
		taskId = context.getThisTaskId();
		conponentId = context.getThisComponentId();
		context.getThisTaskIndex();
		System.out.println(RedisUtils.getCurrDateWithInfo(conponentId, taskId, " WCSpout初始化完成!"));
	}

	@Override
	public void nextTuple() {
		long interval =0;
		while(true){// 獲取資料
			interval++;
			String zero = getItem("0");
			String one = getItem("1");
			String two =  getItem("2");
			
			try {
				Thread.sleep(200);// 每200毫秒傳送一次資料
			} catch (InterruptedException e) {
				e.printStackTrace();
			} 
			if(zero==null||one==null||two==null){
				// do nothing
				// 沒有資料
				if(interval%15==0){
//					System.out.println(new java.util.Date()+":ConponentId:"+conponentId+",taskID:"+taskId+
//						",spout:No Data...");
//					System.out.println(RedisUtils.getCurrDateWithInfo(conponentId, taskId, "spout:No data..."));
				}
			}else{
				this.collector.emit(new Values(zero+","+one+","+two));
				if(interval%15==0&&"fast".equals(slow_fast)){
//					System.out.println(new java.util.Date()+":ConponentId:"+conponentId+",taskID:"+taskId+
//						",spout:["+zero+","+one+","+two+"]");
					System.out.println(RedisUtils.getCurrDateWithInfo(conponentId, taskId, "Spout:["+zero+","+one+","+two+"]"));
				}else if("slow".equals(slow_fast)){
					System.out.println(RedisUtils.getCurrDateWithInfo(conponentId, taskId, "Spout:["+zero+","+one+","+two+"]"));
				}else{
					new RuntimeException("Wrong argument!");
				}
			}
			
		}	
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("line"));
	}
	
	/**
	 * Redis中獲取鍵值並刪除對應的鍵
	 * @param index
	 */
	private String getItem(String index){
		if(!jedis.exists(index)){
			return null;
		}
		String val = jedis.get(index);
//		if(val==null||"null".equals("null")){
//			return ;
//		}
		
		jedis.del(index);
		return val;
	}

}

這個Spout從Redis伺服器中獲取資料,獲取後把對應的資料刪除。兩個Spout都同時讀取了資料,然後進行了輸出,同時只能有一個Spout刪除了Redis中的資料,這樣就會有重複資料輸出了,類似圖1:


圖1

從圖1紅色區域可以看到Spout的輸出,從時間可以看出兩個輸出只相差了1毫秒;從藍色的框也可以看出Spout的下一個Bolt獲取了兩條相同的資料,這就說明Spout輸出了重複的資料;

所以Spout的並行策略應該是獲取taskid,根據資料的特徵來選擇(可以隨機)需要處理的executor,程式碼如下:

package wc.redis.spout;

import java.util.Map;

import redis.clients.jedis.Jedis;
import wc.redis.util.RedisUtils;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class WCSpout extends BaseRichSpout {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	private SpoutOutputCollector collector;
	private Jedis jedis;
	Integer taskId;
	String componentId;
	String slow_fast;
	int numTasks ;
	int thisTaskId;
	@Override
	public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
		this.collector = collector;
		slow_fast = (String)conf.get("slow_fast");
		jedis = RedisUtils.connect(RedisUtils.HOSTNAME, RedisUtils.PORT, RedisUtils.INSERT_DB);
		taskId = context.getThisTaskId();
		componentId = context.getThisComponentId();
		numTasks = context.getComponentTasks(componentId).size();
		thisTaskId = context.getThisTaskIndex();
		System.out.println(RedisUtils.getCurrDateWithInfo(componentId, taskId, " WCSpout初始化完成!"));
	}

	@Override
	public void nextTuple() {
		long interval =0;
		while(true){// 獲取資料
			interval++;
			String zero = getItem("0");
			String one = getItem("1");
			String two =  getItem("2");
			
			try {
				Thread.sleep(200);// 每200毫秒傳送一次資料
			} catch (InterruptedException e) {
				e.printStackTrace();
			} 
			if(zero==null||one==null||two==null){
				// do nothing
				// 沒有資料
//				if(interval%15==0){
//				}
			}else{
				String tmpStr =zero+","+one+","+two;
				if(thisTaskId==tmpStr.hashCode()%numTasks){ // spout負載均衡
					this.collector.emit(new Values(tmpStr));
				
					if(interval%15==0&&"fast".equals(slow_fast)){
						System.out.println(RedisUtils.getCurrDateWithInfo(String.valueOf(thisTaskId),
								taskId, "Spout:["+zero+","+one+","+two+"]"));
					}else if("slow".equals(slow_fast)){
						System.out.println(RedisUtils.getCurrDateWithInfo(String.valueOf(thisTaskId),
								taskId, "Spout:["+zero+","+one+","+two+"]"));
					}else{
						new RuntimeException("Wrong argument!");
					}
				}
			}
			
		}	
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("line"));
	}
	
	/**
	 * Redis中獲取鍵值並刪除對應的鍵
	 * @param index
	 */
	private String getItem(String index){
		if(!jedis.exists(index)){
			return null;
		}
		String val = jedis.get(index);
//		if(val==null||"null".equals("null")){
//			return ;
//		}
		
		jedis.del(index);
		return val;
	}

}
使用上面的程式碼後,Spout的輸出就不會重複了,同時也達到了distribution的目的,如圖2


圖2

從圖2中紅色框中可以看到從時間21:58 883 taskId5 Spout輸出後,接著到了taskID6 21:59 494 Spout輸出,然後又到taskID5 21:59 905 的Spout輸出,並沒有重複記錄;

3. Bolt的並行

Bolt的並行只要設定了多個executor即可。

3.1 使用shuffle進行grouping

使用shuffle進行grouping,多個task的輸入中同樣的記錄可能被分到了任何一個taskid中,如圖3所示。


圖3

從圖3中的藍色框中可以看到Spout輸出了兩條相同的記錄,的那是一條記錄被送到了taskID為5的Bolt中(紅色框),一條被送到了taskID為6的Bolt中(紅色框),當然從下面的描述中也可以知道,事實就是這樣。

上圖引自《Getting Started with Storm》

3.2 使用field進行grouping

使用field進行grouping其實是和shuffle一樣的,但是有一點不一樣,就是相同的記錄只會被送往同一個taskid中,比如上面圖3中,如果使用field進行grouping,那麼Spout輸出的兩條相同的記錄就只會被送往taskid為5的task中(或者為6)。

分享,成長,快樂

腳踏實地,專注

轉載請註明blog地址:http://blog.csdn.net/fansy1990