Storm拓撲,元件之spout、bolt,並行策略
軟體版本:Storm:0.9.3 ,Redis:2.8.19;jedis:2.6.2;
參考:http://storm.apache.org/documentation/Understanding-the-parallelism-of-a-Storm-topology.html
一、Storm原理
Storm簡述:Storm中有兩個元件:nimbus和supervisor,nimbus主要負責分配資源和schedule和協調任務,supervisor主要啟動worker,每個worker可以啟動一個到多個executor,一個executor可以啟動一個到多個task,(預設一個executor對應一個task)實際執行任務的是task。
二、Storm程式設計
1. Topology (拓撲)
1.1 定義spout、bolt以及其關係
//Topology definition TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("wc-spout",new WCSpout(),Integer.parseInt(args[2])); builder.setBolt("split-bolt", new SplitBolt(),Integer.parseInt(args[3])) .shuffleGrouping("wc-spout"); builder.setBolt("count-bolt", new CountBolt(),Integer.parseInt(args[4])) .fieldsGrouping("split-bolt", new Fields("word"));
在設定spout和bolt的時候還可以設定並行的個數,即executor的個數,當然也可以設定task的個數,如下程式碼,兩個executor,四個task,則每個executor配置兩個task。
topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
.setNumTasks(4)
.shuffleGrouping("blue-spout);
<程式碼來自:http://storm.apache.org/documentation/Understanding-the-parallelism-of-a-Storm-topology.html1.2 配置引數項(在spout和bolt中可通過此項設定獲取引數值)
// 定義Configuration
Config conf = new Config();
conf.put("storeFrequent", Long.parseLong(args[0]));
conf.put("slow_fast", args[5]);
conf.put("printWC", args[6]);
conf.setNumWorkers(Integer.parseInt(args[1]));
上面的程式碼除了設定三個變數值之外,還設定了worker的數量;1.3 提交Topology
叢集提交方式:
// 提交任務
StormSubmitter.submitTopology("wc-redis", conf,builder. createTopology());
其中的“wc-redis”是Topology的名字,後面兩個是基本的模式寫法,可以就按照上面的寫即可。
單機提交方式:
LocalCluster cluster = new LocalCluster();
cluster. submitTopology("wc-redis", conf,
builder. createTopology());
2. Spout 2.1 繼承BasiRichSpout
繼承這個類後,定義一個域變數SpoutOutputCollector collector,這個用於輸出;
2.2 覆寫open方法
public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context,
SpoutOutputCollector collector) {
在這裡首先使用collector來初始化之前定義的域變數,如果有需要獲取的引數值可以從conf中獲取,context裡面是當前的spout的相關資訊上下文;2.3 覆寫nextTuple
在這個方法裡面使用collector.emit方法即可進行輸出,一般使用下面的方式即可:
public List<Integer> emit(List<Object> tuple) {
return emit(tuple, null);
}
如果要求容錯基本較高,可以使用一個streamid的方式進行輸出,如下: public List<Integer> emit(String streamId, List<Object> tuple) {
return emit(streamId, tuple, null);
}
2.4 覆寫declareOutputFields方法一般定義一個field名字即可,如下:
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
3. Bolt3.1 繼承BaseBasicBolt類
3.2 覆寫prepare方法
public void prepare(@SuppressWarnings("rawtypes") Map conf,TopologyContext context)
這個方法類似Spout的open方法,進行一些初始化或獲取引數值的操作;3.3 覆寫exec方法
public void execute(Tuple input, BasicOutputCollector collector)
這個方法的input即是從Spout中輸出的資料,通過對這個資料進行處理,然後使用collector.emit方法進行輸出,可以是輸出到下一個Bolt的處理,作為下一個Bolt的輸入。3.4 覆寫declareOutputFields方法
這個方法和Spout的declareOutputFields方法類似,宣告field的名字。
4. 提交執行
storm jar wc.jar test.TopologyMain
三、Storm並行策略驗證1. worker只是用來分配各個元件的,包括Spout和Bolt。
比如分配了一個worker,然後一個Spout S分配兩個executor,一個Bolt A分配一個executor,另一個Bolt B分配一個executor,那麼這些executor一共有4個task(使用預設一個executor對應一個task),就會全部分配在一個worker上。
如果分配了兩個worker,還按上的元件分配,那麼可能worker 1上面分配了一個Spout S的executor和Bolt A的executor,worker 2 上面分配了一個Spout S的另一個executor,和Bolt B的executor。
2. Spout executor並行
Spout如果單單設定executor的並行個數,那麼其輸出可能是有重複的,這樣的並行策略是有問題的。
比如下面的Spout:
package wc.redis.spout;
import java.util.Map;
import redis.clients.jedis.Jedis;
import wc.redis.util.RedisUtils;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class WCSpout extends BaseRichSpout {
/**
*
*/
private static final long serialVersionUID = 1L;
private SpoutOutputCollector collector;
private Jedis jedis;
Integer taskId;
String conponentId;
String slow_fast;
@Override
public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
slow_fast = (String)conf.get("slow_fast");
jedis = RedisUtils.connect(RedisUtils.HOSTNAME, RedisUtils.PORT, RedisUtils.INSERT_DB);
taskId = context.getThisTaskId();
conponentId = context.getThisComponentId();
context.getThisTaskIndex();
System.out.println(RedisUtils.getCurrDateWithInfo(conponentId, taskId, " WCSpout初始化完成!"));
}
@Override
public void nextTuple() {
long interval =0;
while(true){// 獲取資料
interval++;
String zero = getItem("0");
String one = getItem("1");
String two = getItem("2");
try {
Thread.sleep(200);// 每200毫秒傳送一次資料
} catch (InterruptedException e) {
e.printStackTrace();
}
if(zero==null||one==null||two==null){
// do nothing
// 沒有資料
if(interval%15==0){
// System.out.println(new java.util.Date()+":ConponentId:"+conponentId+",taskID:"+taskId+
// ",spout:No Data...");
// System.out.println(RedisUtils.getCurrDateWithInfo(conponentId, taskId, "spout:No data..."));
}
}else{
this.collector.emit(new Values(zero+","+one+","+two));
if(interval%15==0&&"fast".equals(slow_fast)){
// System.out.println(new java.util.Date()+":ConponentId:"+conponentId+",taskID:"+taskId+
// ",spout:["+zero+","+one+","+two+"]");
System.out.println(RedisUtils.getCurrDateWithInfo(conponentId, taskId, "Spout:["+zero+","+one+","+two+"]"));
}else if("slow".equals(slow_fast)){
System.out.println(RedisUtils.getCurrDateWithInfo(conponentId, taskId, "Spout:["+zero+","+one+","+two+"]"));
}else{
new RuntimeException("Wrong argument!");
}
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
/**
* Redis中獲取鍵值並刪除對應的鍵
* @param index
*/
private String getItem(String index){
if(!jedis.exists(index)){
return null;
}
String val = jedis.get(index);
// if(val==null||"null".equals("null")){
// return ;
// }
jedis.del(index);
return val;
}
}
這個Spout從Redis伺服器中獲取資料,獲取後把對應的資料刪除。兩個Spout都同時讀取了資料,然後進行了輸出,同時只能有一個Spout刪除了Redis中的資料,這樣就會有重複資料輸出了,類似圖1:
圖1
從圖1紅色區域可以看到Spout的輸出,從時間可以看出兩個輸出只相差了1毫秒;從藍色的框也可以看出Spout的下一個Bolt獲取了兩條相同的資料,這就說明Spout輸出了重複的資料;
所以Spout的並行策略應該是獲取taskid,根據資料的特徵來選擇(可以隨機)需要處理的executor,程式碼如下:
package wc.redis.spout;
import java.util.Map;
import redis.clients.jedis.Jedis;
import wc.redis.util.RedisUtils;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class WCSpout extends BaseRichSpout {
/**
*
*/
private static final long serialVersionUID = 1L;
private SpoutOutputCollector collector;
private Jedis jedis;
Integer taskId;
String componentId;
String slow_fast;
int numTasks ;
int thisTaskId;
@Override
public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
slow_fast = (String)conf.get("slow_fast");
jedis = RedisUtils.connect(RedisUtils.HOSTNAME, RedisUtils.PORT, RedisUtils.INSERT_DB);
taskId = context.getThisTaskId();
componentId = context.getThisComponentId();
numTasks = context.getComponentTasks(componentId).size();
thisTaskId = context.getThisTaskIndex();
System.out.println(RedisUtils.getCurrDateWithInfo(componentId, taskId, " WCSpout初始化完成!"));
}
@Override
public void nextTuple() {
long interval =0;
while(true){// 獲取資料
interval++;
String zero = getItem("0");
String one = getItem("1");
String two = getItem("2");
try {
Thread.sleep(200);// 每200毫秒傳送一次資料
} catch (InterruptedException e) {
e.printStackTrace();
}
if(zero==null||one==null||two==null){
// do nothing
// 沒有資料
// if(interval%15==0){
// }
}else{
String tmpStr =zero+","+one+","+two;
if(thisTaskId==tmpStr.hashCode()%numTasks){ // spout負載均衡
this.collector.emit(new Values(tmpStr));
if(interval%15==0&&"fast".equals(slow_fast)){
System.out.println(RedisUtils.getCurrDateWithInfo(String.valueOf(thisTaskId),
taskId, "Spout:["+zero+","+one+","+two+"]"));
}else if("slow".equals(slow_fast)){
System.out.println(RedisUtils.getCurrDateWithInfo(String.valueOf(thisTaskId),
taskId, "Spout:["+zero+","+one+","+two+"]"));
}else{
new RuntimeException("Wrong argument!");
}
}
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
/**
* Redis中獲取鍵值並刪除對應的鍵
* @param index
*/
private String getItem(String index){
if(!jedis.exists(index)){
return null;
}
String val = jedis.get(index);
// if(val==null||"null".equals("null")){
// return ;
// }
jedis.del(index);
return val;
}
}
使用上面的程式碼後,Spout的輸出就不會重複了,同時也達到了distribution的目的,如圖2
圖2
從圖2中紅色框中可以看到從時間21:58 883 taskId5 Spout輸出後,接著到了taskID6 21:59 494 Spout輸出,然後又到taskID5 21:59 905 的Spout輸出,並沒有重複記錄;
3. Bolt的並行
Bolt的並行只要設定了多個executor即可。
3.1 使用shuffle進行grouping
使用shuffle進行grouping,多個task的輸入中同樣的記錄可能被分到了任何一個taskid中,如圖3所示。
圖3
從圖3中的藍色框中可以看到Spout輸出了兩條相同的記錄,的那是一條記錄被送到了taskID為5的Bolt中(紅色框),一條被送到了taskID為6的Bolt中(紅色框),當然從下面的描述中也可以知道,事實就是這樣。
上圖引自《Getting Started with Storm》
3.2 使用field進行grouping
使用field進行grouping其實是和shuffle一樣的,但是有一點不一樣,就是相同的記錄只會被送往同一個taskid中,比如上面圖3中,如果使用field進行grouping,那麼Spout輸出的兩條相同的記錄就只會被送往taskid為5的task中(或者為6)。
分享,成長,快樂
腳踏實地,專注
轉載請註明blog地址:http://blog.csdn.net/fansy1990