JStorm源代碼閱讀——消息的確認機制
阿新 • • 發佈:2018-10-11
閱讀 mut 就是 核心數 execute integer comment etl 格式
Acker
//Acker相當於一個bolt,用於處理事件
public class Acker implements IBolt {
private RotatingMap<Object, AckObject> pending = null;
@Override
public void execute(Tuple input) {
Object id = input.getValue(0);
AckObject curr = pending.get(id);
String stream_id = input.getSourceStreamId();
if (Acker.ACKER_INIT_STREAM_ID.equals(stream_id)) {//處理開始追蹤事件,放進自己的pending隊列。只有spout會發送該事件
if (curr == null) {
curr = new AckObject();
curr.val = input.getLong(www.dfgjpt.com);
curr.spout_task = input.getInteger(2);
pending.put(id, curr);
} else {
// bolt‘s ack first come
curr.update_ack(input.getValue(1));
curr.spout_task = input.getInteger(2);
}
} else if (Acker.ACKER_ACK_STREAM_ID.equals(stream_id)) {//bolt發送過來的ack事件
if (curr != null)www.zhongdayule.cn {
curr.update_ack(input.getValue(1));//bolt發送過來的值是它要ack的tup的ID和它產生的tup的ID的異或值。
} else {
// two case
// one is timeout
// the other is bolt‘s ack first come
curr = new AckObject();
curr.val = input.getLong(1);
pending.put(id, curr);
}
} else if (Acker.ACKER_FAIL_STREAM_ID.equals(stream_id)) {//bolt發送過來的失敗
if (curr ==www.thd540.com null) {
// do nothing
// already timeout, should go fail
return;
}
curr.failed = true;
} else {
LOG.info("Unknow source stream, " + stream_id + " from task-" + input.getSourceTask());
return;
}
Integer task = curr.spout_task;
if (task != null) {
if (curr.val == 0) {//如果校驗值為0,則證明發送成功
pending.remove(id);
List values = JStormUtils.mk_list(id);
collector.emitDirect(task, Acker.ACKER_ACK_STREAM_ID, values);
} else