cluster模式下storm kill topology時做cleanup的解決方法
阿新 • • 發佈:2019-01-04
public class MySpout extends BaseRichSpout { private static final Logger logger = LoggerFactory.getLogger(MySpout.class); private SpoutOutputCollector _collector; public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { logger.info("shutdown spout open function called"); _collector = collector; } public void activate() { logger.info("shutdown spout activate function called"); } public void deactivate() { logger.info("shutdown deaactivate to spout and bolt"); try { String mes = "shutDown"; long id = 11111111111111111L; _collector.emit("stop", new Values(mes), id); //Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } } public void nextTuple() { try { Thread.sleep(10000); } catch (Exception e) { e.printStackTrace(); } } public void declareOutputFields(OutputFieldsDeclarer message) { message.declareStream("stop", new Fields("stop")); } public void ack(Object msgId) { logger.info("shutDown spout ack, msId " + msgId); } public void fail(Object msgId) { logger.error("shutDown spout fail, msId " + msgId); } }