1. 程式人生 > >聊聊flink的SourceFunction

聊聊flink的SourceFunction

本文主要研究一下flink的SourceFunction

例項

        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> dataStreamSource = env.addSource(new RandomWordSource());

        dataStreamSource.map(new UpperCaseMapFunc()).print();

        env.execute("sourceFunctionDemo"
); 複製程式碼
  • 這裡通過addSource方法來新增自定義的SourceFunction

SourceFunction

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java

/**
 * Base interface for all stream data sources in Flink. The contract of a stream source
 * is the following: When the source
should start emitting elements, the {@link #run} method * is called with a {@link SourceContext} that can be used for emitting elements. * The run method can run for as long as necessary. The source must, however, react to an * invocation of {@link #cancel()} by breaking out of its main loop. * * <h3>CheckpointedFunction Sources</h3> * * <p>Sources that also implement the {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction} * interface must ensure that state checkpointing, updating of internal state and emission of * elements are not done
concurrently. This is achieved by using the provided checkpointing lock * object to protect update of state and emission of elements in a synchronized block. * * <p>This is the basic pattern one should follow when implementing a checkpointed source: * * <pre>{@code * public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction { * private long count = 0L; * private volatile boolean isRunning = true; * * private transient ListState<Long> checkpointedCount; * * public void run(SourceContext<T> ctx) { * while (isRunning && count < 1000) { * // this synchronized block ensures that state checkpointing, * // internal state updates and emission of elements are an atomic operation * synchronized (ctx.getCheckpointLock()) { * ctx.collect(count); * count++; * } * } * } * * public void cancel() { * isRunning = false; * } * * public void initializeState(FunctionInitializationContext context) { * this.checkpointedCount = context * .getOperatorStateStore() * .getListState(new ListStateDescriptor<>("count", Long.class)); * * if (context.isRestored()) { * for (Long count : this.checkpointedCount.get()) { * this.count = count; * } * } * } * * public void snapshotState(FunctionSnapshotContext context) { * this.checkpointedCount.clear(); * this.checkpointedCount.add(count); * } * } * }</pre> * * * <h3>Timestamps and watermarks:</h3> * Sources may assign timestamps to elements and may manually emit watermarks. * However, these are only interpreted if the streaming program runs on * {@link TimeCharacteristic#EventTime}. On other time characteristics * ({@link TimeCharacteristic#IngestionTime} and {@link TimeCharacteristic#ProcessingTime}), * the watermarks from the source function are ignored. * * <h3>Gracefully Stopping Functions</h3> * Functions may additionally implement the {@link org.apache.flink.api.common.functions.StoppableFunction} * interface. "Stopping" a function, in contrast to "canceling" means a graceful exit that leaves the * state and the emitted elements in a consistent state. * * <p>When a source is stopped, the executing thread is not interrupted, but expected to leave the * {@link #run(SourceContext)} method in reasonable time on its own, preserving the atomicity * of state updates and element emission. * * @param <T> The type of the elements produced by this source. * * @see org.apache.flink.api.common.functions.StoppableFunction * @see org.apache.flink.streaming.api.TimeCharacteristic */ @Public public interface SourceFunction<T> extends Function, Serializable { /** * Starts the source. Implementations can use the {@link SourceContext} emit * elements. * * <p>Sources that implement {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction} * must lock on the checkpoint lock (using a synchronized block) before updating internal * state and emitting elements, to make both an atomic operation: * * <pre>{@code * public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction { * private long count = 0L; * private volatile boolean isRunning = true; * * private transient ListState<Long> checkpointedCount; * * public void run(SourceContext<T> ctx) { * while (isRunning && count < 1000) { * // this synchronized block ensures that state checkpointing, * // internal state updates and emission of elements are an atomic operation * synchronized (ctx.getCheckpointLock()) { * ctx.collect(count); * count++; * } * } * } * * public void cancel() { * isRunning = false; * } * * public void initializeState(FunctionInitializationContext context) { * this.checkpointedCount = context * .getOperatorStateStore() * .getListState(new ListStateDescriptor<>("count", Long.class)); * * if (context.isRestored()) { * for (Long count : this.checkpointedCount.get()) { * this.count = count; * } * } * } * * public void snapshotState(FunctionSnapshotContext context) { * this.checkpointedCount.clear(); * this.checkpointedCount.add(count); * } * } * }</pre> * * @param ctx The context to emit elements to and for accessing locks. */ void run(SourceContext<T> ctx) throws Exception; /** * Cancels the source. Most sources will have a while loop inside the * {@link #run(SourceContext)} method. The implementation needs to ensure that the * source will break out of that loop after this method is called. * * <p>A typical pattern is to have an {@code "volatile boolean isRunning"} flag that is set to * {@code false} in this method. That flag is checked in the loop condition. * * <p>When a source is canceled, the executing thread will also be interrupted * (via {@link Thread#interrupt()}). The interruption happens strictly after this * method has been called, so any interruption handler can rely on the fact that * this method has completed. It is good practice to make any flags altered by * this method "volatile", in order to guarantee the visibility of the effects of * this method to any interruption handler. */ void cancel(); // ------------------------------------------------------------------------ // source context // ------------------------------------------------------------------------ /** * Interface that source functions use to emit elements, and possibly watermarks. * * @param <T> The type of the elements produced by the source. */ @Public // Interface might be extended in the future with additional methods. interface SourceContext<T> { //...... } } 複製程式碼
  • SourceFunction是flink stream data sources的基本介面,這裡頭定義了run方法以及cancel方法,同時定義了SourceContext介面

SourceContext

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java

	/**
	 * Interface that source functions use to emit elements, and possibly watermarks.
	 *
	 * @param <T> The type of the elements produced by the source.
	 */
	@Public // Interface might be extended in the future with additional methods.
	interface SourceContext<T> {

		/**
		 * Emits one element from the source, without attaching a timestamp. In most cases,
		 * this is the default way of emitting elements.
		 *
		 * <p>The timestamp that the element will get assigned depends on the time characteristic of
		 * the streaming program:
		 * <ul>
		 *     <li>On {@link TimeCharacteristic#ProcessingTime}, the element has no timestamp.</li>
		 *     <li>On {@link TimeCharacteristic#IngestionTime}, the element gets the system's
		 *         current time as the timestamp.</li>
		 *     <li>On {@link TimeCharacteristic#EventTime}, the element will have no timestamp initially.
		 *         It needs to get a timestamp (via a {@link TimestampAssigner}) before any time-dependent
		 *         operation (like time windows).</li>
		 * </ul>
		 *
		 * @param element The element to emit
		 */
		void collect(T element);

		/**
		 * Emits one element from the source, and attaches the given timestamp. This method
		 * is relevant for programs using {@link TimeCharacteristic#EventTime}, where the
		 * sources assign timestamps themselves, rather than relying on a {@link TimestampAssigner}
		 * on the stream.
		 *
		 * <p>On certain time characteristics, this timestamp may be ignored or overwritten.
		 * This allows programs to switch between the different time characteristics and behaviors
		 * without changing the code of the source functions.
		 * <ul>
		 *     <li>On {@link TimeCharacteristic#ProcessingTime}, the timestamp will be ignored,
		 *         because processing time never works with element timestamps.</li>
		 *     <li>On {@link TimeCharacteristic#IngestionTime}, the timestamp is overwritten with the
		 *         system's current time, to realize proper ingestion time semantics.</li>
		 *     <li>On {@link TimeCharacteristic#EventTime}, the timestamp will be used.</li>
		 * </ul>
		 *
		 * @param element The element to emit
		 * @param timestamp The timestamp in milliseconds since the Epoch
		 */
		@PublicEvolving
		void collectWithTimestamp(T element, long timestamp);

		/**
		 * Emits the given {@link Watermark}. A Watermark of value {@code t} declares that no
		 * elements with a timestamp {@code t' <= t} will occur any more. If further such
		 * elements will be emitted, those elements are considered <i>late</i>.
		 *
		 * <p>This method is only relevant when running on {@link TimeCharacteristic#EventTime}.
		 * On {@link TimeCharacteristic#ProcessingTime},Watermarks will be ignored. On
		 * {@link TimeCharacteristic#IngestionTime}, the Watermarks will be replaced by the
		 * automatic ingestion time watermarks.
		 *
		 * @param mark The Watermark to emit
		 */
		@PublicEvolving
		void emitWatermark(Watermark mark);

		/**
		 * Marks the source to be temporarily idle. This tells the system that this source will
		 * temporarily stop emitting records and watermarks for an indefinite amount of time. This
		 * is only relevant when running on {@link TimeCharacteristic#IngestionTime} and
		 * {@link TimeCharacteristic#EventTime}, allowing downstream tasks to advance their
		 * watermarks without the need to wait for watermarks from this source while it is idle.
		 *
		 * <p>Source functions should make a best effort to call this method as soon as they
		 * acknowledge themselves to be idle. The system will consider the source to resume activity
		 * again once {@link SourceContext#collect(T)}, {@link SourceContext#collectWithTimestamp(T, long)},
		 * or {@link SourceContext#emitWatermark(Watermark)} is called to emit elements or watermarks from the source.
		 */
		@PublicEvolving
		void markAsTemporarilyIdle();

		/**
		 * Returns the checkpoint lock. Please refer to the class-level comment in
		 * {@link SourceFunction} for details about how to write a consistent checkpointed
		 * source.
		 *
		 * @return The object to use as the lock
		 */
		Object getCheckpointLock();

		/**
		 * This method is called by the system to shut down the context.
		 */
		void close();
	}
複製程式碼
  • SourceContext主要定義了資料來源發射資料的介面,這裡是collect方法(如果資料本身沒有時間,則在使用TimeCharacteristic.EventTime的時候,可以使用TimestampAssigner在進行依賴時間的相關操作時指定timestamp;如果是配合TimeCharacteristic.IngestionTime,則無需指定,系統會自動生成timestamp);除了collect方法外,還有collectWithTimestamp發射資料同時指定timestamp(配合TimeCharacteristic.EventTime使用)
  • 此外還定義了emitWatermark方法,用於處理資料亂序時,只考慮哪些時間範圍內的資料,這個只有在配合TimeCharacteristic.EventTime的時候才有效;如果是TimeCharacteristic.ProcessingTime則watermark會被忽略;如果是TimeCharacteristic.IngestionTime則watermark會被自動生成的ingestion time watermarks替代
  • 這裡還定義了markAsTemporarilyIdle方法,用於告訴系統當前的source會暫停發射資料一段時間,這個只在配合使用TimeCharacteristic.IngestionTime或者TimeCharacteristic.EventTime的時候才有效;當SourceContext.collect(T)或者SourceContext.collectWithTimestamp(T, long)或者SourceContext.emitWatermark(Watermark)被呼叫時,系統會認為source又恢復回來繼續生產資料
  • 這裡還定義了getCheckpointLock方法,用於返回checkpoint的lock,方便source處理checkpoint相關的邏輯
  • close方法主要給系統來呼叫,用於關閉context相關的資源

Task.run(上游)

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java

/**
 * The Task represents one execution of a parallel subtask on a TaskManager.
 * A Task wraps a Flink operator (which may be a user function) and
 * runs it, providing all services necessary for example to consume input data,
 * produce its results (intermediate result partitions) and communicate
 * with the JobManager.
 *
 * <p>The Flink operators (implemented as subclasses of
 * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
 * The task connects those to the network stack and actor messages, and tracks the state
 * of the execution and handles exceptions.
 *
 * <p>Tasks have no knowledge about how they relate to other tasks, or whether they
 * are the first attempt to execute the task, or a repeated attempt. All of that
 * is only known to the JobManager. All the task knows are its own runnable code,
 * the task's configuration, and the IDs of the intermediate results to consume and
 * produce (if any).
 *
 * <p>Each Task is run by one dedicated thread.
 */
public class Task implements Runnable, TaskActions, CheckpointListener {
    //......

    /**
     * The core work method that bootstraps the task and executes its code.
     */
    @Override
    public void run() {
            //......
            // now load and instantiate the task's invokable code
            invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

            // ----------------------------------------------------------------
            //  actual task core work
            // ----------------------------------------------------------------

            // we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            this.invokable = invokable;

            // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }

            // notify everyone that we switched to running
            notifyObservers(ExecutionState.RUNNING, null);
            taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

            // make sure the user code classloader is accessible thread-locally
            executingThread.setContextClassLoader(userCodeClassLoader);

            // run the invokable
            invokable.invoke();

            //......
    }
}
複製程式碼
  • Task的run方法會呼叫invokable.invoke(),這裡的invokable為StreamTask

StreamTask.invoke

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java

/**
 * Base class for all streaming tasks. A task is the unit of local processing that is deployed
 * and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form
 * the Task's operator chain. Operators that are chained together execute synchronously in the
 * same thread and hence on the same stream partition. A common case for these chains
 * are successive map/flatmap/filter tasks.
 *
 * <p>The task chain contains one "head" operator and multiple chained operators.
 * The StreamTask is specialized for the type of the head operator: one-input and two-input tasks,
 * as well as for sources, iteration heads and iteration tails.
 *
 * <p>The Task class deals with the setup of the streams read by the head operator, and the streams
 * produced by the operators at the ends of the operator chain. Note that the chain may fork and
 * thus have multiple ends.
 *
 * <p>The life cycle of the task is set up as follows:
 * <pre>{@code
 *  -- setInitialState -> provides state of all operators in the chain
 *
 *  -- invoke()
 *        |
 *        +----> Create basic utils (config, etc) and load the chain of operators
 *        +----> operators.setup()
 *        +----> task specific init()
 *        +----> initialize-operator-states()
 *        +----> open-operators()
 *        +----> run()
 *        +----> close-operators()
 *        +----> dispose-operators()
 *        +----> common cleanup
 *        +----> task specific cleanup()
 * }</pre>
 *
 * <p>The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a
 * {@code StreamOperator} must be synchronized on this lock object to ensure that no methods
 * are called concurrently.
 *
 * @param <OUT>
 * @param <OP>
 */
@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
        extends AbstractInvokable
        implements AsyncExceptionHandler {

        //......

    @Override
    public final void invoke() throws Exception {

        boolean disposed = false;
        try {
            //......

            // let the task do its work
            isRunning = true;
            run();

            // if this left the run() method cleanly despite the fact that this was canceled,
            // make sure the "clean shutdown" is not attempted
            if (canceled) {
                throw new CancelTaskException();
            }

            LOG.debug("Finished task {}", getName());

            //......
        }
        finally {
            // clean up everything we initialized
            isRunning = false;

            //......
        }
    }
}
複製程式碼
  • StreamTask的invoke方法裡頭呼叫了子類的run方法,這裡子類為SourceStreamTask

SourceStreamTask.run

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java

	@Override
	protected void run() throws Exception {
		headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
	}
複製程式碼
  • SourceStreamTask的run方法主要呼叫headOperator的run方法,這裡的headOperator為SourceStream

SourceStream.run

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java

	public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception {
		run(lockingObject, streamStatusMaintainer, output);
	}

	public void run(final Object lockingObject,
			final StreamStatusMaintainer streamStatusMaintainer,
			final Output<StreamRecord<OUT>> collector) throws Exception {

		final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();

		final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
		final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured()
			? getExecutionConfig().getLatencyTrackingInterval()
			: configuration.getLong(MetricOptions.LATENCY_INTERVAL);

		LatencyMarksEmitter<OUT> latencyEmitter = null;
		if (latencyTrackingInterval > 0) {
			latencyEmitter = new LatencyMarksEmitter<>(
				getProcessingTimeService(),
				collector,
				latencyTrackingInterval,
				this.getOperatorID(),
				getRuntimeContext().getIndexOfThisSubtask());
		}

		final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();

		this.ctx = StreamSourceContexts.getSourceContext(
			timeCharacteristic,
			getProcessingTimeService(),
			lockingObject,
			streamStatusMaintainer,
			collector,
			watermarkInterval,
			-1);

		try {
			userFunction.run(ctx);

			// if we get here, then the user function either exited after being done (finite source)
			// or the function was canceled or stopped. For the finite source case, we should emit
			// a final watermark that indicates that we reached the end of event-time
			if (!isCanceledOrStopped()) {
				ctx.emitWatermark(Watermark.MAX_WATERMARK);
			}
		} finally {
			// make sure that the context is closed in any case
			ctx.close();
			if (latencyEmitter != null) {
				latencyEmitter.close();
			}
		}
	}
複製程式碼
  • SourceStream的run方法,這裡先通過StreamSourceContexts.getSourceContext構造SourceFunction.SourceContext,然後呼叫userFunction的run方法,這裡的userFunction為RandomWordSource,即使用者自定義的SourceFunction(這裡要注意在呼叫userFunction.run(ctx)之前,如果latencyTrackingInterval大於0,還建立了LatencyMarksEmitter)

RandomWordSource.run

public class RandomWordSource implements SourceFunction<String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(RandomWordSource.class);

    private volatile boolean isRunning = true;

    private static final String[] words = new String[]{"The", "brown", "fox", "quick", "jump", "sucky", "5dolla"};

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (isRunning) {
            Thread.sleep(300);
            int rnd = (int) (Math.random() * 10 % words.length);
            LOGGER.info("emit word: {}", words[rnd]);
            ctx.collect(words[rnd]);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}
複製程式碼
  • RandomWordSource的run方法會一直迴圈發射資料

StreamSource.LatencyMarksEmitter

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java

	private static class LatencyMarksEmitter<OUT> {
		private final ScheduledFuture<?> latencyMarkTimer;

		public LatencyMarksEmitter(
				final ProcessingTimeService processingTimeService,
				final Output<StreamRecord<OUT>> output,
				long latencyTrackingInterval,
				final OperatorID operatorId,
				final int subtaskIndex) {

			latencyMarkTimer = processingTimeService.scheduleAtFixedRate(
				new ProcessingTimeCallback() {
					@Override
					public void onProcessingTime(long timestamp) throws Exception {
						try {
							// ProcessingTimeService callbacks are executed under the checkpointing lock
							output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex));
						} catch (Throwable t) {
							// we catch the Throwables here so that we don't trigger the processing
							// timer services async exception handler
							LOG.warn("Error while emitting latency marker.", t);
						}
					}
				},
				0L,
				latencyTrackingInterval);
		}

		public void close() {
			latencyMarkTimer.cancel(true);
		}
	}
複製程式碼
  • LatencyMarksEmitter是在StreamSource的run方法裡頭,呼叫userFunction的run方法前建立的(如果latencyTrackingInterval>0的話),這裡的latencyTrackingInterval先呼叫getExecutionConfig().isLatencyTrackingConfigured()判斷executionConfig是否有配置該值,有配置的話則使用getExecutionConfig().getLatencyTrackingInterval()返回的值,沒有配置的話則使用configuration.getLong(MetricOptions.LATENCY_INTERVAL)返回的值,後者預設是2000L(這裡使用的是後者的配置,即為2000)
  • LatencyMarksEmitter的構造器裡頭呼叫processingTimeService.scheduleAtFixedRate方法註冊了一個fixedRate的定時任務,排程間隔為latencyTrackingInterval
  • 定時任務的處理內容在ProcessingTimeCallback的onProcessTime方法,裡頭呼叫了output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex))來發送LatencyMarker;這裡的processingTimeService為SystemProcessingTimeService;這裡的output為AbstractStreamOperator.CountingOutput

SystemProcessingTimeService.scheduleAtFixedRate

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java

	@Override
	public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) {
		long nextTimestamp = getCurrentProcessingTime() + initialDelay;

		// we directly try to register the timer and only react to the status on exception
		// that way we save unnecessary volatile accesses for each timer
		try {
			return timerService.scheduleAtFixedRate(
				new RepeatedTriggerTask(status, task, checkpointLock, callback, nextTimestamp, period),
				initialDelay,
				period,
				TimeUnit.MILLISECONDS);
		} catch (RejectedExecutionException e) {
			final int status = this.status.get();
			if (status == STATUS_QUIESCED) {
				return new NeverCompleteFuture(initialDelay);
			}
			else if (status == STATUS_SHUTDOWN) {
				throw new IllegalStateException("Timer service is shut down");
			}
			else {
				// something else happened, so propagate the exception
				throw e;
			}
		}
	}

	@Override
	public long getCurrentProcessingTime() {
		return System.currentTimeMillis();
	}
複製程式碼
  • SystemProcessingTimeService的scheduleAtFixedRate方法,實際是委託timerService的scheduleAtFixedRate來執行的,這裡的timerService即ScheduledThreadPoolExecutor,它的corePoolSize為1,然後它排程的任務是RepeatedTriggerTask

RepeatedTriggerTask

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java

	/**
	 * Internal task which is repeatedly called by the processing time service.
	 */
	private static final class RepeatedTriggerTask implements Runnable {

		private final AtomicInteger serviceStatus;
		private final Object lock;
		private final ProcessingTimeCallback target;
		private final long period;
		private final AsyncExceptionHandler exceptionHandler;

		private long nextTimestamp;

		private RepeatedTriggerTask(
				final AtomicInteger serviceStatus,
				final AsyncExceptionHandler exceptionHandler,
				final Object lock,
				final ProcessingTimeCallback target,
				final long nextTimestamp,
				final long period) {

			this.serviceStatus = Preconditions.checkNotNull(serviceStatus);
			this.lock = Preconditions.checkNotNull(lock);
			this.target = Preconditions.checkNotNull(target);
			this.period = period;
			this.exceptionHandler = Preconditions.checkNotNull(exceptionHandler);

			this.nextTimestamp = nextTimestamp;
		}

		@Override
		public void run() {
			synchronized (lock) {
				try {
					if (serviceStatus.get() == STATUS_ALIVE) {
						target.onProcessingTime(nextTimestamp);
					}

					nextTimestamp += period;
				} catch (Throwable t) {
					TimerException asyncException = new TimerException(t);
					exceptionHandler.handleAsyncException("Caught exception while processing repeated timer task.", asyncException);
				}
			}
		}
	}
複製程式碼
  • RepeatedTriggerTask會在serviceStatus為STATUS_ALIVE的時候,呼叫ProcessingTimeCallback的onProcessingTime;這裡的nextTimestamp最初傳進來的是依據getCurrentProcessingTime() + initialDelay來算的,之後不斷累加period

AbstractStreamOperator.CountingOutput.emitLatencyMarker

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java

	/**
	 * Wrapping {@link Output} that updates metrics on the number of emitted elements.
	 */
	public static class CountingOutput<OUT> implements Output<StreamRecord<OUT>> {
		private final Output<StreamRecord<OUT>> output;
		private final Counter numRecordsOut;

		public CountingOutput(Output<StreamRecord<OUT>> output, Counter counter) {
			this.output = output;
			this.numRecordsOut = counter;
		}

		@Override
		public void emitWatermark(Watermark mark) {
			output.emitWatermark(mark);
		}

		@Override
		public void emitLatencyMarker(LatencyMarker latencyMarker) {
			output.emitLatencyMarker(latencyMarker);
		}

		@Override
		public void collect(StreamRecord<OUT> record) {
			numRecordsOut.inc();
			output.collect(record);
		}

		@Override
		public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
			numRecordsOut.inc();
			output.collect(outputTag, record);
		}

		@Override
		public void close() {
			output.close();
		}
	}
複製程式碼
  • 它實際包裝的是RecordWriterOutput

RecordWriterOutput.emitLatencyMarker

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java

/**
 * Implementation of {@link Output} that sends data using a {@link RecordWriter}.
 */
@Internal
public class RecordWriterOutput<OUT> implements OperatorChain.WatermarkGaugeExposingOutput<StreamRecord<OUT>> {

	private StreamRecordWriter<SerializationDelegate<StreamElement>> recordWriter;

	private SerializationDelegate<StreamElement> serializationDelegate;

	//......

	@Override
	public void emitLatencyMarker(LatencyMarker latencyMarker) {
		serializationDelegate.setInstance(latencyMarker);

		try {
			recordWriter.randomEmit(serializationDelegate);
		}
		catch (Exception e) {
			throw new RuntimeException(e.getMessage(), e);
		}
	}
}
複製程式碼
  • 這裡的emitLatencyMarker主要呼叫了StreamRecordWriter的randomEmit(它實際上是通過父類RecordWriter來發射),來發射LatencyMarker

RecordWriter

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java

	/**
	 * This is used to send LatencyMarks to a random target channel.
	 */
	public void randomEmit(T record) throws IOException, InterruptedException {
		sendToTarget(record, rng.nextInt(numChannels));
	}

	private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
		RecordSerializer<T> serializer = serializers[targetChannel];

		SerializationResult result = serializer.addRecord(record);

		while (result.isFullBuffer()) {
			if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
				// If this was a full record, we are done. Not breaking
				// out of the loop at this point will lead to another
				// buffer request before breaking out (that would not be
				// a problem per se, but it can lead to stalls in the
				// pipeline).
				if (result.isFullRecord()) {
					break;
				}
			}
			BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);

			result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
		}
		checkState(!serializer.hasSerializedData(), "All data should be written at once");

		if (flushAlways) {
			targetPartition.flush(targetChannel);
		}
	}
複製程式碼
  • RecordWriter的randomEmit就是隨機選擇一個targetChannel,然後進行傳送

Task.run(下游)

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java

/**
 * The Task represents one execution of a parallel subtask on a TaskManager.
 * A Task wraps a Flink operator (which may be a user function) and
 * runs it, providing all services necessary for example to consume input data,
 * produce its results (intermediate result partitions) and communicate
 * with the JobManager.
 *
 * <p>The Flink operators (implemented as subclasses of
 * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
 * The task connects those to the network stack and actor messages, and tracks the state
 * of the execution and handles exceptions.
 *
 * <p>Tasks have no knowledge about how they relate to other tasks, or whether they
 * are the first attempt to execute the task, or a repeated attempt. All of that
 * is only known to the JobManager. All the task knows are its own runnable code,
 * the task's configuration, and the IDs of the intermediate results to consume and
 * produce (if any).
 *
 * <p>Each Task is run by one dedicated thread.
 */
public class Task implements Runnable, TaskActions, CheckpointListener {
    //......

    /**
     * The core work method that bootstraps the task and executes its code.
     */
    @Override
    public void run() {
            //......
            // now load and instantiate the task's invokable code
            invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

            // ----------------------------------------------------------------
            //  actual task core work
            // ----------------------------------------------------------------

            // we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            this.invokable = invokable;

            // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }

            // notify everyone that we switched to running
            notifyObservers(ExecutionState.RUNNING, null);
            taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

            // make sure the user code classloader is accessible thread-locally
            executingThread.setContextClassLoader(userCodeClassLoader);

            // run the invokable
            invokable.invoke();

            //......
    }
}
複製程式碼
  • 下游的Task的run方法會呼叫invokable.invoke(),這裡的invokable為OneInputStreamTask

OneInputStreamTask

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java

	@Override
	protected void run() throws Exception {
		// cache processor reference on the stack, to make the code more JIT friendly
		final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;

		while (running && inputProcessor.processInput()) {
			// all the work happens in the "processInput" method
		}
	}
複製程式碼
  • Task的run方法會呼叫StreamTask的invoke方法,而invoke方法會呼叫OneInputStreamTask的run方法這裡主要是不斷迴圈呼叫inputProcessor.processInput();這裡的inputProcessor為StreamInputProcessor

StreamInputProcessor

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java

	public boolean processInput() throws Exception {
		if (isFinished) {
			return false;
		}
		if (numRecordsIn == null) {
			try {
				numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
			} catch (Exception e) {
				LOG.warn("An exception occurred during the metrics setup.", e);
				numRecordsIn = new SimpleCounter();
			}
		}

		while (true) {
			if (currentRecordDeserializer != null) {
				DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);

				if (result.isBufferConsumed()) {
					currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
					currentRecordDeserializer = null;
				}

				if (result.isFullRecord()) {
					StreamElement recordOrMark = deserializationDelegate.getInstance();

					if (recordOrMark.isWatermark()) {
						// handle watermark
						statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
						continue;
					} else if (recordOrMark.isStreamStatus()) {
						// handle stream status
						statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
						continue;
					} else if (recordOrMark.isLatencyMarker()) {
						// handle latency marker
						synchronized (lock) {
							streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
						}
						continue;
					} else {
						// now we can do the actual processing
						StreamRecord<IN> record = recordOrMark.asRecord();
						synchronized (lock) {
							numRecordsIn.inc();
							streamOperator.setKeyContextElement1(record);
							streamOperator.processElement(record);
						}
						return true;
					}
				}
			}

			final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
			if (bufferOrEvent != null) {
				if (bufferOrEvent.isBuffer()) {
					currentChannel = bufferOrEvent.getChannelIndex();
					currentRecordDeserializer = recordDeserializers[currentChannel];
					currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
				}
				else {
					// Event received
					final AbstractEvent event = bufferOrEvent.getEvent();
					if (event.getClass() != EndOfPartitionEvent.class) {
						throw new IOException("Unexpected event: " + event);
					}
				}
			}
			else {
				isFinished = true;
				if (!barrierHandler.isEmpty()) {
					throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
				}
				return false;
			}
		}
	}
複製程式碼
  • processInput方法首先呼叫currentRecordDeserializer.getNextRecord(deserializationDelegate)獲取nextRecord,然後只有當result.isFullRecord()的時候才進行處理
  • 處理的時候會根據StreamElement的不同型別進行不同處理,主要分為watermark、streamStatus、latencyMakrker及正常的資料這幾類來處理
  • 如果是正常的資料,則呼叫streamOperator.processElement(record),這裡的streamOperator為StreamMap

StreamMap.processElement

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamMap.java

/**
 * A {@link StreamOperator} for executing {@link MapFunction MapFunctions}.
 */
@Internal
public class StreamMap<IN, OUT>
		extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
		implements OneInputStreamOperator<IN, OUT> {

	private static final long serialVersionUID = 1L;

	public StreamMap(MapFunction<IN, OUT> mapper) {
		super(mapper);
		chainingStrategy = ChainingStrategy.ALWAYS;
	}

	@Override
	public void processElement(StreamRecord<IN> element) throws Exception {
		output.collect(element.replace(userFunction.map(element.getValue())));
	}
}
複製程式碼
  • 這裡呼叫了userFunction.map(element.getValue())來進行map操作,這裡的userFunction即為UpperCaseMapFunc

小結

  • SourceFunction是flink stream data sources的基本介面,這裡頭定義了run方法以及cancel方法,同時定義了SourceContext介面;SourceContext介面主要定義了collect、collectWithTimestamp方法用於發射資料,同時也提供了emitWatermark來發射Watermark
  • 對於資料的發射來說,其呼叫順序為Task.run --> StreamTask.invoke --> SourceStreamTask.run --> SourceStream.run --> userFunction.run(ctx)(RandomWordSource.run);SourceStream.run裡頭在呼叫userFunction.run之前會判斷latencyTrackingInterval是否大於0,如果大於0則會建立LatencyMarksEmitter,它註冊了定時任務來定時回撥ProcessingTimeCallback的onProcessingTime方法,來觸發output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex))
  • 這裡相當於下游會收到userFunction.run傳送的使用者資料,也會收到定時任務傳送的LatencyMarker;下游的呼叫順序為Task.run --> StreamTask.invoke --> OneInputStreamTask.run --> StreamInputProcessor.processInput --> statusWatermarkValve.inputWatermark或者statusWatermarkValve.inputStreamStatus或者streamOperator.processLatencyMarker或者streamOperator.processElement;可以看到StreamInputProcessor.processInput裡頭會根據資料的不同型別做不同處理,如果是使用者資料,則呼叫streamOperator.processElement即StreamMap.processElement --> userFunction.map(UpperCaseMapFunc.map)

doc