Storm叢集模式下cleanup解決方法
阿新 • • 發佈:2019-01-04
背景
由於cleanup方法並不可靠,它只在local mode下生效,Storm叢集模式下cleanup不會被呼叫執行。很多資源得不到釋放解決方案
在kill topology之前,先deactivate相應的topology。在spout中實現deactivate()方法,deactivate()方法中給bolt emit特殊的資料(如:emit “shutDown”字串給bolt),bolt中判斷接收的資料為”shutDown”就呼叫cleanup()方法。在cleanup()方法中釋放需要釋放的資源。流程圖
相關程式碼
spout重寫deactivate()方法程式碼如下:
@Override
public void deactivate(){
LOGGER.info("deaactivate to spout and bolt");
try {
//storm deactivate時發一條訊息給bolt
collector.emit(new Values("shutDown"));
} catch (Exception e) {
e.printStackTrace();
}
}
bolt中execute()方法程式碼如下:
@Override
public void execute(Tuple input) {
String message = input.getStringByField("loan_message");
//判斷是不是spout的deactivate傳過來的訊息
if("shutDown".equals(message )) {
cleanup();
}
}
注意事項: deactivate topology時,建議等待時間儘量長,時間過短訊息來不及處理,會導致資料丟失