1. 程式人生 > >storm的可靠性

storm的可靠性

eat stat ger 發送數據 else open 字段 olt output

技術分享

消息確認機制:

技術分享

在數據發送的過程中可能會數據丟失導致沒能接收到,spout有個超時時間(默認是30S),如果30S過去了還是沒有接收到數據,也認為是處理失敗。

技術分享

技術分享

技術分享

技術分享

運行結果都是處理成功

技術分享

技術分享

參考代碼StormTopologyAcker.java

package yehua.storm;

import java.util.Map;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils;
public class StormTopologyAcker { public static class MySpout extends BaseRichSpout{ private Map conf; private TopologyContext context; private SpoutOutputCollector collector; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.conf = conf; this.collector = collector; this.context = context; } int num = 0; @Override public void nextTuple() { num++; System.out.println("spout:"+num); int messageid = num; //開啟消息確認機制,就是在發送數據的時候發送一個messageid,一般情況下,messageid可以理解為mysql數據裏面的主鍵id字段 //要保證messageid和tuple之間有一個唯一的對應關系,這個關系需要程序員自己維護 this.collector.emit(new Values(num),messageid); Utils.sleep(1000); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("num")); } @Override public void ack(Object msgId) { System.out.println("處理成功!"+msgId); } @Override public void fail(Object msgId) { System.out.println("處理失敗!"+msgId); //TODO 可以吧這個數據單獨記錄下來 } } public static class MyBolt extends BaseRichBolt{ private Map stormConf; private TopologyContext context; private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.stormConf = stormConf; this.context = context; this.collector = collector; } int sum = 0; @Override public void execute(Tuple input) { try{ Integer num = input.getIntegerByField("num"); sum += num; System.out.println("sum="+sum); this.collector.ack(input); }catch(Exception e){ this.collector.fail(input); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } public static void main(String[] args) { TopologyBuilder topologyBuilder = new TopologyBuilder(); String spout_id = MySpout.class.getSimpleName(); String bolt_id = MyBolt.class.getSimpleName(); topologyBuilder.setSpout(spout_id, new MySpout()); topologyBuilder.setBolt(bolt_id, new MyBolt()).shuffleGrouping(spout_id); Config config = new Config(); config.setMaxSpoutPending(1000);//如果設置了這個參數,必須要保證開啟了acker機制才有效 String topology_name = StormTopologyAcker.class.getSimpleName(); if(args.length==0){ //在本地運行 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology(topology_name, config, topologyBuilder.createTopology()); }else{ //在集群運行 try { StormSubmitter.submitTopology(topology_name, config, topologyBuilder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } catch (AuthorizationException e) { e.printStackTrace(); } } } }

storm的可靠性