聊聊flink的SourceFunction
序
本文主要研究一下flink的SourceFunction
例項
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource = env.addSource(new RandomWordSource());
dataStreamSource.map(new UpperCaseMapFunc()).print();
env.execute("sourceFunctionDemo" );
複製程式碼
- 這裡通過addSource方法來新增自定義的SourceFunction
SourceFunction
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java
/**
* Base interface for all stream data sources in Flink. The contract of a stream source
* is the following: When the source should start emitting elements, the {@link #run} method
* is called with a {@link SourceContext} that can be used for emitting elements.
* The run method can run for as long as necessary. The source must, however, react to an
* invocation of {@link #cancel()} by breaking out of its main loop.
*
* <h3>CheckpointedFunction Sources</h3>
*
* <p>Sources that also implement the {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}
* interface must ensure that state checkpointing, updating of internal state and emission of
* elements are not done concurrently. This is achieved by using the provided checkpointing lock
* object to protect update of state and emission of elements in a synchronized block.
*
* <p>This is the basic pattern one should follow when implementing a checkpointed source:
*
* <pre>{@code
* public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
* private long count = 0L;
* private volatile boolean isRunning = true;
*
* private transient ListState<Long> checkpointedCount;
*
* public void run(SourceContext<T> ctx) {
* while (isRunning && count < 1000) {
* // this synchronized block ensures that state checkpointing,
* // internal state updates and emission of elements are an atomic operation
* synchronized (ctx.getCheckpointLock()) {
* ctx.collect(count);
* count++;
* }
* }
* }
*
* public void cancel() {
* isRunning = false;
* }
*
* public void initializeState(FunctionInitializationContext context) {
* this.checkpointedCount = context
* .getOperatorStateStore()
* .getListState(new ListStateDescriptor<>("count", Long.class));
*
* if (context.isRestored()) {
* for (Long count : this.checkpointedCount.get()) {
* this.count = count;
* }
* }
* }
*
* public void snapshotState(FunctionSnapshotContext context) {
* this.checkpointedCount.clear();
* this.checkpointedCount.add(count);
* }
* }
* }</pre>
*
*
* <h3>Timestamps and watermarks:</h3>
* Sources may assign timestamps to elements and may manually emit watermarks.
* However, these are only interpreted if the streaming program runs on
* {@link TimeCharacteristic#EventTime}. On other time characteristics
* ({@link TimeCharacteristic#IngestionTime} and {@link TimeCharacteristic#ProcessingTime}),
* the watermarks from the source function are ignored.
*
* <h3>Gracefully Stopping Functions</h3>
* Functions may additionally implement the {@link org.apache.flink.api.common.functions.StoppableFunction}
* interface. "Stopping" a function, in contrast to "canceling" means a graceful exit that leaves the
* state and the emitted elements in a consistent state.
*
* <p>When a source is stopped, the executing thread is not interrupted, but expected to leave the
* {@link #run(SourceContext)} method in reasonable time on its own, preserving the atomicity
* of state updates and element emission.
*
* @param <T> The type of the elements produced by this source.
*
* @see org.apache.flink.api.common.functions.StoppableFunction
* @see org.apache.flink.streaming.api.TimeCharacteristic
*/
@Public
public interface SourceFunction<T> extends Function, Serializable {
/**
* Starts the source. Implementations can use the {@link SourceContext} emit
* elements.
*
* <p>Sources that implement {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}
* must lock on the checkpoint lock (using a synchronized block) before updating internal
* state and emitting elements, to make both an atomic operation:
*
* <pre>{@code
* public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
* private long count = 0L;
* private volatile boolean isRunning = true;
*
* private transient ListState<Long> checkpointedCount;
*
* public void run(SourceContext<T> ctx) {
* while (isRunning && count < 1000) {
* // this synchronized block ensures that state checkpointing,
* // internal state updates and emission of elements are an atomic operation
* synchronized (ctx.getCheckpointLock()) {
* ctx.collect(count);
* count++;
* }
* }
* }
*
* public void cancel() {
* isRunning = false;
* }
*
* public void initializeState(FunctionInitializationContext context) {
* this.checkpointedCount = context
* .getOperatorStateStore()
* .getListState(new ListStateDescriptor<>("count", Long.class));
*
* if (context.isRestored()) {
* for (Long count : this.checkpointedCount.get()) {
* this.count = count;
* }
* }
* }
*
* public void snapshotState(FunctionSnapshotContext context) {
* this.checkpointedCount.clear();
* this.checkpointedCount.add(count);
* }
* }
* }</pre>
*
* @param ctx The context to emit elements to and for accessing locks.
*/
void run(SourceContext<T> ctx) throws Exception;
/**
* Cancels the source. Most sources will have a while loop inside the
* {@link #run(SourceContext)} method. The implementation needs to ensure that the
* source will break out of that loop after this method is called.
*
* <p>A typical pattern is to have an {@code "volatile boolean isRunning"} flag that is set to
* {@code false} in this method. That flag is checked in the loop condition.
*
* <p>When a source is canceled, the executing thread will also be interrupted
* (via {@link Thread#interrupt()}). The interruption happens strictly after this
* method has been called, so any interruption handler can rely on the fact that
* this method has completed. It is good practice to make any flags altered by
* this method "volatile", in order to guarantee the visibility of the effects of
* this method to any interruption handler.
*/
void cancel();
// ------------------------------------------------------------------------
// source context
// ------------------------------------------------------------------------
/**
* Interface that source functions use to emit elements, and possibly watermarks.
*
* @param <T> The type of the elements produced by the source.
*/
@Public // Interface might be extended in the future with additional methods.
interface SourceContext<T> {
//......
}
}
複製程式碼
- SourceFunction是flink stream data sources的基本介面,這裡頭定義了run方法以及cancel方法,同時定義了SourceContext介面
SourceContext
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java
/**
* Interface that source functions use to emit elements, and possibly watermarks.
*
* @param <T> The type of the elements produced by the source.
*/
@Public // Interface might be extended in the future with additional methods.
interface SourceContext<T> {
/**
* Emits one element from the source, without attaching a timestamp. In most cases,
* this is the default way of emitting elements.
*
* <p>The timestamp that the element will get assigned depends on the time characteristic of
* the streaming program:
* <ul>
* <li>On {@link TimeCharacteristic#ProcessingTime}, the element has no timestamp.</li>
* <li>On {@link TimeCharacteristic#IngestionTime}, the element gets the system's
* current time as the timestamp.</li>
* <li>On {@link TimeCharacteristic#EventTime}, the element will have no timestamp initially.
* It needs to get a timestamp (via a {@link TimestampAssigner}) before any time-dependent
* operation (like time windows).</li>
* </ul>
*
* @param element The element to emit
*/
void collect(T element);
/**
* Emits one element from the source, and attaches the given timestamp. This method
* is relevant for programs using {@link TimeCharacteristic#EventTime}, where the
* sources assign timestamps themselves, rather than relying on a {@link TimestampAssigner}
* on the stream.
*
* <p>On certain time characteristics, this timestamp may be ignored or overwritten.
* This allows programs to switch between the different time characteristics and behaviors
* without changing the code of the source functions.
* <ul>
* <li>On {@link TimeCharacteristic#ProcessingTime}, the timestamp will be ignored,
* because processing time never works with element timestamps.</li>
* <li>On {@link TimeCharacteristic#IngestionTime}, the timestamp is overwritten with the
* system's current time, to realize proper ingestion time semantics.</li>
* <li>On {@link TimeCharacteristic#EventTime}, the timestamp will be used.</li>
* </ul>
*
* @param element The element to emit
* @param timestamp The timestamp in milliseconds since the Epoch
*/
@PublicEvolving
void collectWithTimestamp(T element, long timestamp);
/**
* Emits the given {@link Watermark}. A Watermark of value {@code t} declares that no
* elements with a timestamp {@code t' <= t} will occur any more. If further such
* elements will be emitted, those elements are considered <i>late</i>.
*
* <p>This method is only relevant when running on {@link TimeCharacteristic#EventTime}.
* On {@link TimeCharacteristic#ProcessingTime},Watermarks will be ignored. On
* {@link TimeCharacteristic#IngestionTime}, the Watermarks will be replaced by the
* automatic ingestion time watermarks.
*
* @param mark The Watermark to emit
*/
@PublicEvolving
void emitWatermark(Watermark mark);
/**
* Marks the source to be temporarily idle. This tells the system that this source will
* temporarily stop emitting records and watermarks for an indefinite amount of time. This
* is only relevant when running on {@link TimeCharacteristic#IngestionTime} and
* {@link TimeCharacteristic#EventTime}, allowing downstream tasks to advance their
* watermarks without the need to wait for watermarks from this source while it is idle.
*
* <p>Source functions should make a best effort to call this method as soon as they
* acknowledge themselves to be idle. The system will consider the source to resume activity
* again once {@link SourceContext#collect(T)}, {@link SourceContext#collectWithTimestamp(T, long)},
* or {@link SourceContext#emitWatermark(Watermark)} is called to emit elements or watermarks from the source.
*/
@PublicEvolving
void markAsTemporarilyIdle();
/**
* Returns the checkpoint lock. Please refer to the class-level comment in
* {@link SourceFunction} for details about how to write a consistent checkpointed
* source.
*
* @return The object to use as the lock
*/
Object getCheckpointLock();
/**
* This method is called by the system to shut down the context.
*/
void close();
}
複製程式碼
- SourceContext主要定義了資料來源發射資料的介面,這裡是collect方法(
如果資料本身沒有時間,則在使用TimeCharacteristic.EventTime的時候,可以使用TimestampAssigner在進行依賴時間的相關操作時指定timestamp;如果是配合TimeCharacteristic.IngestionTime,則無需指定,系統會自動生成timestamp
);除了collect方法外,還有collectWithTimestamp發射資料同時指定timestamp(配合TimeCharacteristic.EventTime使用
) - 此外還定義了emitWatermark方法,用於處理資料亂序時,只考慮哪些時間範圍內的資料,這個只有在配合TimeCharacteristic.EventTime的時候才有效;如果是TimeCharacteristic.ProcessingTime則watermark會被忽略;如果是TimeCharacteristic.IngestionTime則watermark會被自動生成的ingestion time watermarks替代
- 這裡還定義了markAsTemporarilyIdle方法,用於告訴系統當前的source會暫停發射資料一段時間,這個只在配合使用TimeCharacteristic.IngestionTime或者TimeCharacteristic.EventTime的時候才有效;當SourceContext.collect(T)或者SourceContext.collectWithTimestamp(T, long)或者SourceContext.emitWatermark(Watermark)被呼叫時,系統會認為source又恢復回來繼續生產資料
- 這裡還定義了getCheckpointLock方法,用於返回checkpoint的lock,方便source處理checkpoint相關的邏輯
- close方法主要給系統來呼叫,用於關閉context相關的資源
Task.run(上游
)
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java
/**
* The Task represents one execution of a parallel subtask on a TaskManager.
* A Task wraps a Flink operator (which may be a user function) and
* runs it, providing all services necessary for example to consume input data,
* produce its results (intermediate result partitions) and communicate
* with the JobManager.
*
* <p>The Flink operators (implemented as subclasses of
* {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
* The task connects those to the network stack and actor messages, and tracks the state
* of the execution and handles exceptions.
*
* <p>Tasks have no knowledge about how they relate to other tasks, or whether they
* are the first attempt to execute the task, or a repeated attempt. All of that
* is only known to the JobManager. All the task knows are its own runnable code,
* the task's configuration, and the IDs of the intermediate results to consume and
* produce (if any).
*
* <p>Each Task is run by one dedicated thread.
*/
public class Task implements Runnable, TaskActions, CheckpointListener {
//......
/**
* The core work method that bootstraps the task and executes its code.
*/
@Override
public void run() {
//......
// now load and instantiate the task's invokable code
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
// ----------------------------------------------------------------
// actual task core work
// ----------------------------------------------------------------
// we must make strictly sure that the invokable is accessible to the cancel() call
// by the time we switched to running.
this.invokable = invokable;
// switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
throw new CancelTaskException();
}
// notify everyone that we switched to running
notifyObservers(ExecutionState.RUNNING, null);
taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
// make sure the user code classloader is accessible thread-locally
executingThread.setContextClassLoader(userCodeClassLoader);
// run the invokable
invokable.invoke();
//......
}
}
複製程式碼
- Task的run方法會呼叫invokable.invoke(),這裡的invokable為StreamTask
StreamTask.invoke
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java
/**
* Base class for all streaming tasks. A task is the unit of local processing that is deployed
* and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form
* the Task's operator chain. Operators that are chained together execute synchronously in the
* same thread and hence on the same stream partition. A common case for these chains
* are successive map/flatmap/filter tasks.
*
* <p>The task chain contains one "head" operator and multiple chained operators.
* The StreamTask is specialized for the type of the head operator: one-input and two-input tasks,
* as well as for sources, iteration heads and iteration tails.
*
* <p>The Task class deals with the setup of the streams read by the head operator, and the streams
* produced by the operators at the ends of the operator chain. Note that the chain may fork and
* thus have multiple ends.
*
* <p>The life cycle of the task is set up as follows:
* <pre>{@code
* -- setInitialState -> provides state of all operators in the chain
*
* -- invoke()
* |
* +----> Create basic utils (config, etc) and load the chain of operators
* +----> operators.setup()
* +----> task specific init()
* +----> initialize-operator-states()
* +----> open-operators()
* +----> run()
* +----> close-operators()
* +----> dispose-operators()
* +----> common cleanup
* +----> task specific cleanup()
* }</pre>
*
* <p>The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a
* {@code StreamOperator} must be synchronized on this lock object to ensure that no methods
* are called concurrently.
*
* @param <OUT>
* @param <OP>
*/
@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
extends AbstractInvokable
implements AsyncExceptionHandler {
//......
@Override
public final void invoke() throws Exception {
boolean disposed = false;
try {
//......
// let the task do its work
isRunning = true;
run();
// if this left the run() method cleanly despite the fact that this was canceled,
// make sure the "clean shutdown" is not attempted
if (canceled) {
throw new CancelTaskException();
}
LOG.debug("Finished task {}", getName());
//......
}
finally {
// clean up everything we initialized
isRunning = false;
//......
}
}
}
複製程式碼
- StreamTask的invoke方法裡頭呼叫了子類的run方法,這裡子類為SourceStreamTask
SourceStreamTask.run
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@Override
protected void run() throws Exception {
headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
}
複製程式碼
- SourceStreamTask的run方法主要呼叫headOperator的run方法,這裡的headOperator為SourceStream
SourceStream.run
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java
public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception {
run(lockingObject, streamStatusMaintainer, output);
}
public void run(final Object lockingObject,
final StreamStatusMaintainer streamStatusMaintainer,
final Output<StreamRecord<OUT>> collector) throws Exception {
final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured()
? getExecutionConfig().getLatencyTrackingInterval()
: configuration.getLong(MetricOptions.LATENCY_INTERVAL);
LatencyMarksEmitter<OUT> latencyEmitter = null;
if (latencyTrackingInterval > 0) {
latencyEmitter = new LatencyMarksEmitter<>(
getProcessingTimeService(),
collector,
latencyTrackingInterval,
this.getOperatorID(),
getRuntimeContext().getIndexOfThisSubtask());
}
final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
this.ctx = StreamSourceContexts.getSourceContext(
timeCharacteristic,
getProcessingTimeService(),
lockingObject,
streamStatusMaintainer,
collector,
watermarkInterval,
-1);
try {
userFunction.run(ctx);
// if we get here, then the user function either exited after being done (finite source)
// or the function was canceled or stopped. For the finite source case, we should emit
// a final watermark that indicates that we reached the end of event-time
if (!isCanceledOrStopped()) {
ctx.emitWatermark(Watermark.MAX_WATERMARK);
}
} finally {
// make sure that the context is closed in any case
ctx.close();
if (latencyEmitter != null) {
latencyEmitter.close();
}
}
}
複製程式碼
- SourceStream的run方法,這裡先通過StreamSourceContexts.getSourceContext構造SourceFunction.SourceContext,然後呼叫userFunction的run方法,這裡的userFunction為RandomWordSource,即使用者自定義的SourceFunction(
這裡要注意在呼叫userFunction.run(ctx)之前,如果latencyTrackingInterval大於0,還建立了LatencyMarksEmitter
)
RandomWordSource.run
public class RandomWordSource implements SourceFunction<String> {
private static final Logger LOGGER = LoggerFactory.getLogger(RandomWordSource.class);
private volatile boolean isRunning = true;
private static final String[] words = new String[]{"The", "brown", "fox", "quick", "jump", "sucky", "5dolla"};
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (isRunning) {
Thread.sleep(300);
int rnd = (int) (Math.random() * 10 % words.length);
LOGGER.info("emit word: {}", words[rnd]);
ctx.collect(words[rnd]);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
複製程式碼
- RandomWordSource的run方法會一直迴圈發射資料
StreamSource.LatencyMarksEmitter
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java
private static class LatencyMarksEmitter<OUT> {
private final ScheduledFuture<?> latencyMarkTimer;
public LatencyMarksEmitter(
final ProcessingTimeService processingTimeService,
final Output<StreamRecord<OUT>> output,
long latencyTrackingInterval,
final OperatorID operatorId,
final int subtaskIndex) {
latencyMarkTimer = processingTimeService.scheduleAtFixedRate(
new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
try {
// ProcessingTimeService callbacks are executed under the checkpointing lock
output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex));
} catch (Throwable t) {
// we catch the Throwables here so that we don't trigger the processing
// timer services async exception handler
LOG.warn("Error while emitting latency marker.", t);
}
}
},
0L,
latencyTrackingInterval);
}
public void close() {
latencyMarkTimer.cancel(true);
}
}
複製程式碼
- LatencyMarksEmitter是在StreamSource的run方法裡頭,呼叫userFunction的run方法前建立的(
如果latencyTrackingInterval>0的話
),這裡的latencyTrackingInterval先呼叫getExecutionConfig().isLatencyTrackingConfigured()判斷executionConfig是否有配置該值,有配置的話則使用getExecutionConfig().getLatencyTrackingInterval()返回的值,沒有配置的話則使用configuration.getLong(MetricOptions.LATENCY_INTERVAL)返回的值,後者預設是2000L(這裡使用的是後者的配置,即為2000
) - LatencyMarksEmitter的構造器裡頭呼叫processingTimeService.scheduleAtFixedRate方法註冊了一個fixedRate的定時任務,排程間隔為latencyTrackingInterval
- 定時任務的處理內容在ProcessingTimeCallback的onProcessTime方法,裡頭呼叫了output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex))來發送LatencyMarker;這裡的processingTimeService為SystemProcessingTimeService;這裡的output為AbstractStreamOperator.CountingOutput
SystemProcessingTimeService.scheduleAtFixedRate
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@Override
public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) {
long nextTimestamp = getCurrentProcessingTime() + initialDelay;
// we directly try to register the timer and only react to the status on exception
// that way we save unnecessary volatile accesses for each timer
try {
return timerService.scheduleAtFixedRate(
new RepeatedTriggerTask(status, task, checkpointLock, callback, nextTimestamp, period),
initialDelay,
period,
TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
final int status = this.status.get();
if (status == STATUS_QUIESCED) {
return new NeverCompleteFuture(initialDelay);
}
else if (status == STATUS_SHUTDOWN) {
throw new IllegalStateException("Timer service is shut down");
}
else {
// something else happened, so propagate the exception
throw e;
}
}
}
@Override
public long getCurrentProcessingTime() {
return System.currentTimeMillis();
}
複製程式碼
- SystemProcessingTimeService的scheduleAtFixedRate方法,實際是委託timerService的scheduleAtFixedRate來執行的,這裡的timerService即ScheduledThreadPoolExecutor,它的corePoolSize為1,然後它排程的任務是RepeatedTriggerTask
RepeatedTriggerTask
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
/**
* Internal task which is repeatedly called by the processing time service.
*/
private static final class RepeatedTriggerTask implements Runnable {
private final AtomicInteger serviceStatus;
private final Object lock;
private final ProcessingTimeCallback target;
private final long period;
private final AsyncExceptionHandler exceptionHandler;
private long nextTimestamp;
private RepeatedTriggerTask(
final AtomicInteger serviceStatus,
final AsyncExceptionHandler exceptionHandler,
final Object lock,
final ProcessingTimeCallback target,
final long nextTimestamp,
final long period) {
this.serviceStatus = Preconditions.checkNotNull(serviceStatus);
this.lock = Preconditions.checkNotNull(lock);
this.target = Preconditions.checkNotNull(target);
this.period = period;
this.exceptionHandler = Preconditions.checkNotNull(exceptionHandler);
this.nextTimestamp = nextTimestamp;
}
@Override
public void run() {
synchronized (lock) {
try {
if (serviceStatus.get() == STATUS_ALIVE) {
target.onProcessingTime(nextTimestamp);
}
nextTimestamp += period;
} catch (Throwable t) {
TimerException asyncException = new TimerException(t);
exceptionHandler.handleAsyncException("Caught exception while processing repeated timer task.", asyncException);
}
}
}
}
複製程式碼
- RepeatedTriggerTask會在serviceStatus為STATUS_ALIVE的時候,呼叫ProcessingTimeCallback的onProcessingTime;這裡的nextTimestamp最初傳進來的是依據getCurrentProcessingTime() + initialDelay來算的,之後不斷累加period
AbstractStreamOperator.CountingOutput.emitLatencyMarker
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
/**
* Wrapping {@link Output} that updates metrics on the number of emitted elements.
*/
public static class CountingOutput<OUT> implements Output<StreamRecord<OUT>> {
private final Output<StreamRecord<OUT>> output;
private final Counter numRecordsOut;
public CountingOutput(Output<StreamRecord<OUT>> output, Counter counter) {
this.output = output;
this.numRecordsOut = counter;
}
@Override
public void emitWatermark(Watermark mark) {
output.emitWatermark(mark);
}
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
output.emitLatencyMarker(latencyMarker);
}
@Override
public void collect(StreamRecord<OUT> record) {
numRecordsOut.inc();
output.collect(record);
}
@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
numRecordsOut.inc();
output.collect(outputTag, record);
}
@Override
public void close() {
output.close();
}
}
複製程式碼
- 它實際包裝的是RecordWriterOutput
RecordWriterOutput.emitLatencyMarker
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
/**
* Implementation of {@link Output} that sends data using a {@link RecordWriter}.
*/
@Internal
public class RecordWriterOutput<OUT> implements OperatorChain.WatermarkGaugeExposingOutput<StreamRecord<OUT>> {
private StreamRecordWriter<SerializationDelegate<StreamElement>> recordWriter;
private SerializationDelegate<StreamElement> serializationDelegate;
//......
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
serializationDelegate.setInstance(latencyMarker);
try {
recordWriter.randomEmit(serializationDelegate);
}
catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}
複製程式碼
- 這裡的emitLatencyMarker主要呼叫了StreamRecordWriter的randomEmit(
它實際上是通過父類RecordWriter來發射
),來發射LatencyMarker
RecordWriter
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
/**
* This is used to send LatencyMarks to a random target channel.
*/
public void randomEmit(T record) throws IOException, InterruptedException {
sendToTarget(record, rng.nextInt(numChannels));
}
private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
RecordSerializer<T> serializer = serializers[targetChannel];
SerializationResult result = serializer.addRecord(record);
while (result.isFullBuffer()) {
if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
// If this was a full record, we are done. Not breaking
// out of the loop at this point will lead to another
// buffer request before breaking out (that would not be
// a problem per se, but it can lead to stalls in the
// pipeline).
if (result.isFullRecord()) {
break;
}
}
BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);
result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
}
checkState(!serializer.hasSerializedData(), "All data should be written at once");
if (flushAlways) {
targetPartition.flush(targetChannel);
}
}
複製程式碼
- RecordWriter的randomEmit就是隨機選擇一個targetChannel,然後進行傳送
Task.run(下游
)
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java
/**
* The Task represents one execution of a parallel subtask on a TaskManager.
* A Task wraps a Flink operator (which may be a user function) and
* runs it, providing all services necessary for example to consume input data,
* produce its results (intermediate result partitions) and communicate
* with the JobManager.
*
* <p>The Flink operators (implemented as subclasses of
* {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
* The task connects those to the network stack and actor messages, and tracks the state
* of the execution and handles exceptions.
*
* <p>Tasks have no knowledge about how they relate to other tasks, or whether they
* are the first attempt to execute the task, or a repeated attempt. All of that
* is only known to the JobManager. All the task knows are its own runnable code,
* the task's configuration, and the IDs of the intermediate results to consume and
* produce (if any).
*
* <p>Each Task is run by one dedicated thread.
*/
public class Task implements Runnable, TaskActions, CheckpointListener {
//......
/**
* The core work method that bootstraps the task and executes its code.
*/
@Override
public void run() {
//......
// now load and instantiate the task's invokable code
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
// ----------------------------------------------------------------
// actual task core work
// ----------------------------------------------------------------
// we must make strictly sure that the invokable is accessible to the cancel() call
// by the time we switched to running.
this.invokable = invokable;
// switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
throw new CancelTaskException();
}
// notify everyone that we switched to running
notifyObservers(ExecutionState.RUNNING, null);
taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
// make sure the user code classloader is accessible thread-locally
executingThread.setContextClassLoader(userCodeClassLoader);
// run the invokable
invokable.invoke();
//......
}
}
複製程式碼
- 下游的Task的run方法會呼叫invokable.invoke(),這裡的invokable為OneInputStreamTask
OneInputStreamTask
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@Override
protected void run() throws Exception {
// cache processor reference on the stack, to make the code more JIT friendly
final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
while (running && inputProcessor.processInput()) {
// all the work happens in the "processInput" method
}
}
複製程式碼
- Task的run方法會呼叫StreamTask的invoke方法,而invoke方法會呼叫OneInputStreamTask的run方法這裡主要是不斷迴圈呼叫inputProcessor.processInput();這裡的inputProcessor為StreamInputProcessor
StreamInputProcessor
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
public boolean processInput() throws Exception {
if (isFinished) {
return false;
}
if (numRecordsIn == null) {
try {
numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
} catch (Exception e) {
LOG.warn("An exception occurred during the metrics setup.", e);
numRecordsIn = new SimpleCounter();
}
}
while (true) {
if (currentRecordDeserializer != null) {
DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
if (result.isBufferConsumed()) {
currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
currentRecordDeserializer = null;
}
if (result.isFullRecord()) {
StreamElement recordOrMark = deserializationDelegate.getInstance();
if (recordOrMark.isWatermark()) {
// handle watermark
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
continue;
} else if (recordOrMark.isStreamStatus()) {
// handle stream status
statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
continue;
} else if (recordOrMark.isLatencyMarker()) {
// handle latency marker
synchronized (lock) {
streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
}
continue;
} else {
// now we can do the actual processing
StreamRecord<IN> record = recordOrMark.asRecord();
synchronized (lock) {
numRecordsIn.inc();
streamOperator.setKeyContextElement1(record);
streamOperator.processElement(record);
}
return true;
}
}
}
final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
if (bufferOrEvent != null) {
if (bufferOrEvent.isBuffer()) {
currentChannel = bufferOrEvent.getChannelIndex();
currentRecordDeserializer = recordDeserializers[currentChannel];
currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
}
else {
// Event received
final AbstractEvent event = bufferOrEvent.getEvent();
if (event.getClass() != EndOfPartitionEvent.class) {
throw new IOException("Unexpected event: " + event);
}
}
}
else {
isFinished = true;
if (!barrierHandler.isEmpty()) {
throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
}
return false;
}
}
}
複製程式碼
- processInput方法首先呼叫currentRecordDeserializer.getNextRecord(deserializationDelegate)獲取nextRecord,然後只有當result.isFullRecord()的時候才進行處理
- 處理的時候會根據StreamElement的不同型別進行不同處理,主要分為watermark、streamStatus、latencyMakrker及正常的資料這幾類來處理
- 如果是正常的資料,則呼叫streamOperator.processElement(record),這裡的streamOperator為StreamMap
StreamMap.processElement
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamMap.java
/**
* A {@link StreamOperator} for executing {@link MapFunction MapFunctions}.
*/
@Internal
public class StreamMap<IN, OUT>
extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> {
private static final long serialVersionUID = 1L;
public StreamMap(MapFunction<IN, OUT> mapper) {
super(mapper);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
output.collect(element.replace(userFunction.map(element.getValue())));
}
}
複製程式碼
- 這裡呼叫了userFunction.map(element.getValue())來進行map操作,這裡的userFunction即為UpperCaseMapFunc
小結
- SourceFunction是flink stream data sources的基本介面,這裡頭定義了run方法以及cancel方法,同時定義了SourceContext介面;SourceContext介面主要定義了collect、collectWithTimestamp方法用於發射資料,同時也提供了emitWatermark來發射Watermark
- 對於資料的發射來說,其呼叫順序為Task.run --> StreamTask.invoke --> SourceStreamTask.run --> SourceStream.run --> userFunction.run(ctx)(
RandomWordSource.run
);SourceStream.run裡頭在呼叫userFunction.run之前會判斷latencyTrackingInterval是否大於0,如果大於0則會建立LatencyMarksEmitter,它註冊了定時任務來定時回撥ProcessingTimeCallback的onProcessingTime方法,來觸發output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex)) - 這裡相當於下游會收到userFunction.run傳送的使用者資料,也會收到定時任務傳送的LatencyMarker;下游的呼叫順序為Task.run --> StreamTask.invoke --> OneInputStreamTask.run --> StreamInputProcessor.processInput --> statusWatermarkValve.inputWatermark或者statusWatermarkValve.inputStreamStatus或者streamOperator.processLatencyMarker或者streamOperator.processElement;可以看到StreamInputProcessor.processInput裡頭會根據資料的不同型別做不同處理,如果是使用者資料,則呼叫streamOperator.processElement即StreamMap.processElement --> userFunction.map(
UpperCaseMapFunc.map
)