大資料Strom流式處理的ACK機制
阿新 • • 發佈:2018-12-19
package com.neusoft.storm.ack; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class MyBolt implements IRichBolt { private static final long serialVersionUID = 1L; OutputCollector collector = null; @Override public void cleanup() { } int num = 0; String valueString = null; @Override public void execute(Tuple input) { try { valueString = input.getStringByField("log") ; if(valueString != null) { num ++ ; System.err.println(Thread.currentThread().getName()+" lines :"+num +" session_id:"+valueString.split("\t")[1]); } collector.emit(input, new Values(valueString)); // collector.emit(new Values(valueString)); collector.ack(input); Thread.sleep(2000); } catch (Exception e) { collector.fail(input); e.printStackTrace(); } } @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector ; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("session_id")) ; } @Override public Map<String, Object> getComponentConfiguration() { return null; } } package com.neusoft.storm.ack; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.InputStreamReader; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class MySpout implements IRichSpout{ private static final long serialVersionUID = 1L; int index = 0; FileInputStream fis; InputStreamReader isr; BufferedReader br; SpoutOutputCollector collector = null; String str = null; @Override public void nextTuple() { try { if ((str = this.br.readLine()) != null) { // 過濾動作 index++; collector.emit(new Values(str), index); // collector.emit(new Values(str)); } } catch (Exception e) { } } @Override public void close() { try { br.close(); isr.close(); fis.close(); } catch (Exception e) { e.printStackTrace(); } } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this.collector = collector; this.fis = new FileInputStream("track.log"); this.isr = new InputStreamReader(fis, "UTF-8"); this.br = new BufferedReader(isr); } catch (Exception e) { e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("log")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } @Override public void ack(Object msgId) { System.err.println(" [" + Thread.currentThread().getName() + "] "+ " spout ack:"+msgId.toString()); } @Override public void activate() { } @Override public void deactivate() { } @Override public void fail(Object msgId) { System.err.println(" [" + Thread.currentThread().getName() + "] "+ " spout fail:"+msgId.toString()); } } package com.neusoft.storm.ack; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.topology.TopologyBuilder; public class Main { /** * @param args */ public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new MySpout(), 1); builder.setBolt("bolt", new MyBolt(), 2).shuffleGrouping("spout"); // Map conf = new HashMap(); // conf.put(Config.TOPOLOGY_WORKERS, 4); Config conf = new Config() ; conf.setDebug(true); conf.setMessageTimeoutSecs(conf, 100); conf.setNumAckers(4); if (args.length > 0) { try { StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } }else { LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.createTopology()); } } }