聊聊storm的ICommitterTridentSpout
序
本文主要研究一下storm的ICommitterTridentSpout
ICommitterTridentSpout
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/ICommitterTridentSpout.java
public interface ICommitterTridentSpout<X> extends ITridentSpout<X> {
public interface Emitter extends ITridentSpout.Emitter {
void commit(TransactionAttempt attempt);
}
@Override
public Emitter getEmitter(String txStateId, Map conf, TopologyContext context);
}
複製程式碼
- ICommitterTridentSpout繼承了ITridentSpout,主要是對getEmitter方法進行覆蓋,返回擴充套件的Emitter,它繼承ITridentSpout.Emitter ,多定義了一個commit介面
TridentTopologyBuilder.buildTopology
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentTopologyBuilder.java
public StormTopology buildTopology(Map<String, Number> masterCoordResources) {
TopologyBuilder builder = new TopologyBuilder();
Map<GlobalStreamId, String> batchIdsForSpouts = fleshOutStreamBatchIds(false );
Map<GlobalStreamId, String> batchIdsForBolts = fleshOutStreamBatchIds(true);
Map<String, List<String>> batchesToCommitIds = new HashMap<>();
Map<String, List<ITridentSpout>> batchesToSpouts = new HashMap<>();
for(String id: _spouts.keySet()) {
TransactionalSpoutComponent c = _spouts.get(id);
if (c.spout instanceof IRichSpout) {
//TODO: wrap this to set the stream name
builder.setSpout(id, (IRichSpout) c.spout, c.parallelism);
} else {
String batchGroup = c.batchGroupId;
if(!batchesToCommitIds.containsKey(batchGroup)) {
batchesToCommitIds.put(batchGroup, new ArrayList<String>());
}
batchesToCommitIds.get(batchGroup).add(c.commitStateId);
if(!batchesToSpouts.containsKey(batchGroup)) {
batchesToSpouts.put(batchGroup, new ArrayList<ITridentSpout>());
}
batchesToSpouts.get(batchGroup).add((ITridentSpout) c.spout);
BoltDeclarer scd =
builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout))
.globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID)
.globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID);
for(Map<String, Object> m: c.componentConfs) {
scd.addConfigurations(m);
}
Map<String, TridentBoltExecutor.CoordSpec> specs = new HashMap();
specs.put(c.batchGroupId, new CoordSpec());
BoltDeclarer bd = builder.setBolt(id,
new TridentBoltExecutor(
new TridentSpoutExecutor(
c.commitStateId,
c.streamName,
((ITridentSpout) c.spout)),
batchIdsForSpouts,
specs),
c.parallelism);
bd.allGrouping(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID);
bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.SUCCESS_STREAM_ID);
if(c.spout instanceof ICommitterTridentSpout) {
bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID);
}
for(Map<String, Object> m: c.componentConfs) {
bd.addConfigurations(m);
}
}
}
//......
return builder.createTopology();
}
複製程式碼
- TridentTopologyBuilder.buildTopology的時候,對使用者的spout判斷,如果是ICommitterTridentSpout型別的,則會配置allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID)
MasterBatchCoordinator
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/MasterBatchCoordinator.java
@Override
public void nextTuple() {
sync();
}
private void sync() {
// note that sometimes the tuples active may be less than max_spout_pending, e.g.
// max_spout_pending = 3
// tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),
// and there won't be a batch for tx 4 because there's max_spout_pending tx active
TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {
maybeCommit.status = AttemptStatus.COMMITTING;
_collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
LOG.debug("Emitted on [stream = {}], [tx_status = {}], [{}]", COMMIT_STREAM_ID, maybeCommit, this);
}
if(_active) {
if(_activeTx.size() < _maxTransactionActive) {
Long curr = _currTransaction;
for(int i=0; i<_maxTransactionActive; i++) {
if(!_activeTx.containsKey(curr) && isReady(curr)) {
// by using a monotonically increasing attempt id, downstream tasks
// can be memory efficient by clearing out state for old attempts
// as soon as they see a higher attempt id for a transaction
Integer attemptId = _attemptIds.get(curr);
if(attemptId==null) {
attemptId = 0;
} else {
attemptId++;
}
_attemptIds.put(curr, attemptId);
for(TransactionalState state: _states) {
state.setData(CURRENT_ATTEMPTS, _attemptIds);
}
TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);
final TransactionStatus newTransactionStatus = new TransactionStatus(attempt);
_activeTx.put(curr, newTransactionStatus);
_collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);
LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", BATCH_STREAM_ID, attempt, newTransactionStatus, this);
_throttler.markEvent();
}
curr = nextTransactionId(curr);
}
}
}
}
@Override
public void ack(Object msgId) {
TransactionAttempt tx = (TransactionAttempt) msgId;
TransactionStatus status = _activeTx.get(tx.getTransactionId());
LOG.debug("Ack. [tx_attempt = {}], [tx_status = {}], [{}]", tx, status, this);
if(status!=null && tx.equals(status.attempt)) {
if(status.status==AttemptStatus.PROCESSING) {
status.status = AttemptStatus.PROCESSED;
LOG.debug("Changed status. [tx_attempt = {}] [tx_status = {}]", tx, status);
} else if(status.status==AttemptStatus.COMMITTING) {
_activeTx.remove(tx.getTransactionId());
_attemptIds.remove(tx.getTransactionId());
_collector.emit(SUCCESS_STREAM_ID, new Values(tx));
_currTransaction = nextTransactionId(tx.getTransactionId());
for(TransactionalState state: _states) {
state.setData(CURRENT_TX, _currTransaction);
}
LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", SUCCESS_STREAM_ID, tx, status, this);
}
sync();
}
}
複製程式碼
- MasterBatchCoordinator在收到ack的時候,如果status是AttemptStatus.PROCESSING狀態,則更改status為AttemptStatus.PROCESSED;如果status是AttemptStatus.COMMITTING,則往SUCCESS_STREAM_ID發射tuple;之後呼叫sync方法
- nextTuple方法也是呼叫sync方法,判斷如果是AttemptStatus.PROCESSED狀態,則更改status為AttemptStatus.COMMITTING,同時往COMMIT_STREAM_ID發射tuple
- 可以看到這裡狀態由AttemptStatus.PROCESSING變為AttemptStatus.PROCESSED(
nextTuple方法將AttemptStatus.PROCESSED變為AttemptStatus.COMMITTING,然後往COMMIT_STREAM_ID發射tuple
),再變為AttemptStatus.COMMITTING(ack的時候,如果是AttemptStatus.COMMITTING狀態,則往SUCCESS_STREAM_ID發射tuple
)
TridentSpoutExecutor
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/TridentSpoutExecutor.java
public void execute(BatchInfo info, Tuple input) {
// there won't be a BatchInfo for the success stream
TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);
if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {
if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) {
((ICommitterTridentSpout.Emitter) _emitter).commit(attempt);
_activeBatches.remove(attempt.getTransactionId());
} else {
throw new FailedException("Received commit for different transaction attempt");
}
} else if(input.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
// valid to delete before what's been committed since
// those batches will never be accessed again
_activeBatches.headMap(attempt.getTransactionId()).clear();
_emitter.success(attempt);
} else {
_collector.setBatch(info.batchId);
_emitter.emitBatch(attempt, input.getValue(1), _collector);
_activeBatches.put(attempt.getTransactionId(), attempt);
}
}
複製程式碼
- TridentSpoutExecutor在execute的時候,判斷如果是MasterBatchCoordinator.COMMIT_STREAM_ID的資料,而且TransactionAttempt的txid相等,則呼叫((ICommitterTridentSpout.Emitter) _emitter).commit(attempt)
TridentBoltExecutor
storm-2.0.0/storm-client/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java
public void execute(Tuple tuple) {
if (TupleUtils.isTick(tuple)) {
long now = System.currentTimeMillis();
if (now - _lastRotate > _messageTimeoutMs) {
_batches.rotate();
_lastRotate = now;
}
return;
}
String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId());
if (batchGroup == null) {
// this is so we can do things like have simple DRPC that doesn't need to use batch processing
_coordCollector.setCurrBatch(null);
_bolt.execute(null, tuple);
_collector.ack(tuple);
return;
}
IBatchID id = (IBatchID) tuple.getValue(0);
//get transaction id
//if it already exists and attempt id is greater than the attempt there
TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId());
// if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) {
// System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()
// + " (" + _batches.size() + ")" +
// "\ntuple: " + tuple +
// "\nwith tracked " + tracked +
// "\nwith id " + id +
// "\nwith group " + batchGroup
// + "\n");
//
// }
//System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex());
// this code here ensures that only one attempt is ever tracked for a batch, so when
// failures happen you don't get an explosion in memory usage in the tasks
if (tracked != null) {
if (id.getAttemptId() > tracked.attemptId) {
_batches.remove(id.getId());
tracked = null;
} else if (id.getAttemptId() < tracked.attemptId) {
// no reason to try to execute a previous attempt than we've already seen
return;
}
}
if (tracked == null) {
tracked =
new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup),
id.getAttemptId());
_batches.put(id.getId(), tracked);
}
_coordCollector.setCurrBatch(tracked);
//System.out.println("TRACKED: " + tracked + " " + tuple);
TupleType t = getTupleType(tuple, tracked);
if (t == TupleType.COMMIT) {
tracked.receivedCommit = true;
checkFinish(tracked, tuple, t);
} else if (t == TupleType.COORD) {
int count = tuple.getInteger(1);
tracked.reportedTasks++;
tracked.expectedTupleCount += count;
checkFinish(tracked, tuple, t);
} else {
tracked.receivedTuples++;
boolean success = true;
try {
_bolt.execute(tracked.info, tuple);
if (tracked.condition.expectedTaskReports == 0) {
success = finishBatch(tracked, tuple);
}
} catch (FailedException e) {
failBatch(tracked, e);
}
if (success) {
_collector.ack(tuple);
} else {
_collector.fail(tuple);
}
}
_coordCollector.setCurrBatch(null);
}
複製程式碼
- 這裡再呼叫_bolt.execute(tracked.info, tuple)之後,會呼叫_collector.ack(tuple)完成ack
SpoutOutputCollector
storm-core-1.2.2-sources.jar!/org/apache/storm/spout/SpoutOutputCollector.java
/**
* Emits a new tuple to the specified output stream with the given message ID.
* When Storm detects that this tuple has been fully processed, or has failed
* to be fully processed, the spout will receive an ack or fail callback respectively
* with the messageId as long as the messageId was not null. If the messageId was null,
* Storm will not track the tuple and no callback will be received.
* Note that Storm's event logging functionality will only work if the messageId
* is serializable via Kryo or the Serializable interface. The emitted values must be immutable.
*
* @return the list of task ids that this tuple was sent to
*/
public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
return _delegate.emit(streamId, tuple, messageId);
}
複製程式碼
- 這裡呼叫了_delegate.emit的emit,這裡的_delegate為SpoutOutputCollectorImpl
SpoutOutputCollectorImpl
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
try {
return sendSpoutMsg(streamId, tuple, messageId, null);
} catch (InterruptedException e) {
LOG.warn("Spout thread interrupted during emit().");
throw new RuntimeException(e);
}
}
private List<Integer> sendSpoutMsg(String stream, List<Object> values, Object messageId, Integer outTaskId) throws
InterruptedException {
emittedCount.increment();
List<Integer> outTasks;
if (outTaskId != null) {
outTasks = taskData.getOutgoingTasks(outTaskId, stream, values);
} else {
outTasks = taskData.getOutgoingTasks(stream, values);
}
final boolean needAck = (messageId != null) && hasAckers;
final List<Long> ackSeq = needAck ? new ArrayList<>() : null;
final long rootId = needAck ? MessageId.generateId(random) : 0;
for (int i = 0; i < outTasks.size(); i++) { // perf critical path. don't use iterators.
Integer t = outTasks.get(i);
MessageId msgId;
if (needAck) {
long as = MessageId.generateId(random);
msgId = MessageId.makeRootId(rootId, as);
ackSeq.add(as);
} else {
msgId = MessageId.makeUnanchored();
}
final TupleImpl tuple =
new TupleImpl(executor.getWorkerTopologyContext(), values, executor.getComponentId(), this.taskId, stream, msgId);
AddressedTuple adrTuple = new AddressedTuple(t, tuple);
executor.getExecutorTransfer().tryTransfer(adrTuple, executor.getPendingEmits());
}
if (isEventLoggers) {
taskData.sendToEventLogger(executor, values, executor.getComponentId(), messageId, random, executor.getPendingEmits());
}
if (needAck) {
boolean sample = executor.samplerCheck();
TupleInfo info = new TupleInfo();
info.setTaskId(this.taskId);
info.setStream(stream);
info.setMessageId(messageId);
if (isDebug) {
info.setValues(values);
}
if (sample) {
info.setTimestamp(System.currentTimeMillis());
}
pending.put(rootId, info);
List<Object> ackInitTuple = new Values(rootId, Utils.bitXorVals(ackSeq), this.taskId);
taskData.sendUnanchored(Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer(), executor.getPendingEmits());
} else if (messageId != null) {
// Reusing TupleInfo object as we directly call executor.ackSpoutMsg() & are not sending msgs. perf critical
if (isDebug) {
if (spoutExecutorThdId != Thread.currentThread().getId()) {
throw new RuntimeException("Detected background thread emitting tuples for the spout. " +
"Spout Output Collector should only emit from the main spout executor thread.");
}
}
globalTupleInfo.clear();
globalTupleInfo.setStream(stream);
globalTupleInfo.setValues(values);
globalTupleInfo.setMessageId(messageId);
globalTupleInfo.setTimestamp(0);
globalTupleInfo.setId("0:");
Long timeDelta = 0L;
executor.ackSpoutMsg(executor, taskData, timeDelta, globalTupleInfo);
}
return outTasks;
}
複製程式碼
- 這裡neekAck的話,會呼叫taskData.sendUnanchored(Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer(), executor.getPendingEmits());
- 注意這裡的ackInitTuple為Values(rootId, Utils.bitXorVals(ackSeq), this.taskId),第二個值對List ackSeq進行了Utils.bitXorVals運算
- ackSeq在沒有outTask的時候,是個空的list,它的Utils.bitXorVals操作為0
Utils
storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/Utils.java
public static long bitXorVals(List<Long> coll) {
long result = 0;
for (Long val : coll) {
result ^= val;
}
return result;
}
public static long bitXor(Long a, Long b) {
return a ^ b;
}
複製程式碼
- bitXor運算是storm的ack機制的核心運算
Acker
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/Acker.java
public void execute(Tuple input) {
if (TupleUtils.isTick(input)) {
Map<Object, AckObject> tmp = pending.rotate();
LOG.debug("Number of timeout tuples:{}", tmp.size());
return;
}
boolean resetTimeout = false;
String streamId = input.getSourceStreamId();
Object id = input.getValue(0);
AckObject curr = pending.get(id);
if (ACKER_INIT_STREAM_ID.equals(streamId)) {
if (curr == null) {
curr = new AckObject();
pending.put(id, curr);
}
curr.updateAck(input.getLong(1));
curr.spoutTask = input.getInteger(2);
} else if (ACKER_ACK_STREAM_ID.equals(streamId)) {
if (curr == null) {
curr = new AckObject();
pending.put(id, curr);
}
curr.updateAck(input.getLong(1));
} else if (ACKER_FAIL_STREAM_ID.equals(streamId)) {
// For the case that ack_fail message arrives before ack_init
if (curr == null) {
curr = new AckObject();
}
curr.failed = true;
pending.put(id, curr);
} else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
resetTimeout = true;
if (curr != null) {
pending.put(id, curr);
} //else if it has not been added yet, there is no reason time it out later on
} else if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {
collector.flush();
return;
} else {
LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask());
return;
}
int task = curr.spoutTask;
if (task >= 0 && (curr.val == 0 || curr.failed || resetTimeout)) {
Values tuple = new Values(id, getTimeDeltaMillis(curr.startTime));
if (curr.val == 0) {
pending.remove(id);
collector.emitDirect(task, ACKER_ACK_STREAM_ID, tuple);
} else if (curr.failed) {
pending.remove(id);
collector.emitDirect(task, ACKER_FAIL_STREAM_ID, tuple);
} else if (resetTimeout) {
collector.emitDirect(task, ACKER_RESET_TIMEOUT_STREAM_ID, tuple);
} else {
throw new IllegalStateException("The checks are inconsistent we reach what should be unreachable code.");
}
}
collector.ack(input);
}
private static class AckObject {
public long val = 0L;
public long startTime = Time.currentTimeMillis();
public int spoutTask = -1;
public boolean failed = false;
// val xor value
public void updateAck(Long value) {
val = Utils.bitXor(val, value);
}
}
複製程式碼
- 當Acker收到ACKER_INIT_STREAM_ID時,如果當前AckObject為null,則建立一個AckObject,其val預設為0;之後呼叫curr.updateAck(input.getLong(1)),即根據tuple的第二個值來更新AckObject的val
- SpoutOutputCollectorImpl發射過來的tuple為Values(rootId, Utils.bitXorVals(ackSeq), this.taskId),其第二個值為Utils.bitXorVals(ackSeq);askSeq為List<Long>,當沒有outputTask的時候,其list為空,而Utils.bitXorVals值為0,這種情況下,curr.updateAck(0)返回0
- Acker在execute的最後會判斷,如果curr.val == 0則會觸發collector.emitDirect(task, ACKER_ACK_STREAM_ID, tuple)
SpoutExecutor
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
String streamId = tuple.getSourceStreamId();
if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {
spoutOutputCollector.flush();
} else if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
pending.rotate();
} else if (streamId.equals(Constants.METRICS_TICK_STREAM_ID)) {
metricsTick(idToTask.get(taskId - idToTaskBase), tuple);
} else if (streamId.equals(Constants.CREDENTIALS_CHANGED_STREAM_ID)) {
Object spoutObj = idToTask.get(taskId - idToTaskBase).getTaskObject();
if (spoutObj instanceof ICredentialsListener) {
((ICredentialsListener) spoutObj).setCredentials((Map<String, String>) tuple.getValue(0));
}
} else if (streamId.equals(Acker.ACKER_RESET_TIMEOUT_STREAM_ID)) {
Long id = (Long) tuple.getValue(0);
TupleInfo pendingForId = pending.get(id);
if (pendingForId != null) {
pending.put(id, pendingForId);
}
} else {
Long id = (Long) tuple.getValue(0);
Long timeDeltaMs = (Long) tuple.getValue(1);
TupleInfo tupleInfo = pending.remove(id);
if (tupleInfo != null && tupleInfo.getMessageId() != null) {
if (taskId != tupleInfo.getTaskId()) {
throw new RuntimeException("Fatal error, mismatched task ids: " + taskId + " " + tupleInfo.getTaskId());
}
Long timeDelta = null;
if (hasAckers) {
long startTimeMs = tupleInfo.getTimestamp();
if (startTimeMs != 0) {
timeDelta = timeDeltaMs;
}
}
if (streamId.equals(Acker.ACKER_ACK_STREAM_ID)) {
ackSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo);
} else if (streamId.equals(Acker.ACKER_FAIL_STREAM_ID)) {
failSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo, "FAIL-STREAM");
}
}
}
}
public void ackSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo) {
try {
ISpout spout = (ISpout) taskData.getTaskObject();
int taskId = taskData.getTaskId();
if (executor.getIsDebug()) {
LOG.info("SPOUT Acking message {} {}", tupleInfo.getId(), tupleInfo.getMessageId());
}
spout.ack(tupleInfo.getMessageId());
if (!taskData.getUserContext().getHooks().isEmpty()) { // avoid allocating SpoutAckInfo obj if not necessary
new SpoutAckInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
}
if (hasAckers && timeDelta != null) {
executor.getStats().spoutAckedTuple(tupleInfo.getStream(), timeDelta,
taskData.getTaskMetrics().getAcked(tupleInfo.getStream()));
}
} catch (Exception e) {
throw Utils.wrapInRuntime(e);
}
}
複製程式碼
- SpoutExecutor在收到Acker.ACKER_ACK_STREAM_ID的時候,會呼叫ackSpoutMsg方法,該方法會回撥原始spout的ack方法,即spout.ack(tupleInfo.getMessageId())
小結
- MasterBatchCoordinator在第一次收到同一個msgId的ack時(
第一次被呼叫
),status由開始的AttemptStatus.PROCESSING轉變為AttemptStatus.PROCESSED,在之後的sync方法裡頭AttemptStatus.PROCESSED轉變為AttemptStatus.COMMITTING,然後往MasterBatchCoordinator.COMMIT_STREAM_ID發射tuple - 當用戶的spout是ICommitterTridentSpout時,TridentTopologyBuilder.buildTopology的時候,會配置allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID);TridentSpoutExecutor會接收MasterBatchCoordinator.COMMIT_STREAM_ID的資料,然後呼叫((ICommitterTridentSpout.Emitter) _emitter).commit(attempt)方法;之後TridentBoltExecutor在TridentSpoutExecutor.execute執行完了之後會自動ack該tuple,然後呼叫MasterBatchCoordinator的ack方法(
第二次被呼叫
),然後觸發_collector.emit(SUCCESS_STREAM_ID, new Values(tx)) - 當用戶的spout不是ICommitterTridentSpout時,這個時候整個topology就沒有component去接收MasterBatchCoordinator.COMMIT_STREAM_ID發射的tuple,即outgoingTasks為空,那麼在SpoutOutputCollectorImpl在needAck的情況下,會給Acker.ACKER_INIT_STREAM_ID發射的tuple,其第二個值為Utils.bitXorVals(ackSeq),ackSeq為空list(
根據outgoingTasks來計算
),該值為0;那麼在Acker接收到ACKER_INIT_STREAM_ID時,curr.updateAck(input.getLong(1))之後curr.val的值為0;這樣Acker在execute的最後看到curr.val為0,又會給Acker.ACKER_ACK_STREAM_ID發射tuple,SpoutExecutor在收到Acker.ACKER_ACK_STREAM_ID的時候,會呼叫ackSpoutMsg方法,該方法會回撥原始spout的ack方法,即spout.ack(tupleInfo.getMessageId());即當一個streamId沒有component消費的時候,會自動ack;這樣對於spout不是ICommitterTridentSpout的情況,在往MasterBatchCoordinator.COMMIT_STREAM_ID發射tuple之後,會呼叫MasterBatchCoordinator的ack方法(第二次被呼叫
),然後觸發_collector.emit(SUCCESS_STREAM_ID, new Values(tx))
spout是否是ICommitterTridentSpout型別的區別在於不是ICommitterTridentSpout型別,它在往MasterBatchCoordinator.COMMIT_STREAM_ID發射tuple之後,Acker會自動ack,呼叫MasterBatchCoordinator的ack方法(
第二次被呼叫
);而ICommitterTridentSpout型別會先執行((ICommitterTridentSpout.Emitter) _emitter).commit(attempt)方法,然後由TridentBoltExecutor來ack,然後呼叫MasterBatchCoordinator的ack方法(第二次被呼叫
);二者在成功的場景下最後都會往SUCCESS_STREAM_ID傳送tuple