聊聊flink的PrintSinkFunction
阿新 • • 發佈:2018-12-01
序
本文主要研究一下flink的PrintSinkFunction
DataStream.print
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
/** * Writes a DataStream to the standard output stream (stdout). * * <p>For each element of the DataStream the result of {@link Object#toString()} is written. * * <p>NOTE: This will print to stdout on the machine where the code is executed, i.e. the Flink * worker. * * @return The closed DataStream. */ @PublicEvolving public DataStreamSink<T> print() { PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(); return addSink(printFunction).name("Print to Std. Out"); } /** * Writes a DataStream to the standard output stream (stderr). * * <p>For each element of the DataStream the result of {@link Object#toString()} is written. * * <p>NOTE: This will print to stderr on the machine where the code is executed, i.e. the Flink * worker. * * @return The closed DataStream. */ @PublicEvolving public DataStreamSink<T> printToErr() { PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(true); return addSink(printFunction).name("Print to Std. Err"); } /** * Writes a DataStream to the standard output stream (stdout). * * <p>For each element of the DataStream the result of {@link Object#toString()} is written. * * <p>NOTE: This will print to stdout on the machine where the code is executed, i.e. the Flink * worker. * * @param sinkIdentifier The string to prefix the output with. * @return The closed DataStream. */ @PublicEvolving public DataStreamSink<T> print(String sinkIdentifier) { PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(sinkIdentifier, false); return addSink(printFunction).name("Print to Std. Out"); } /** * Writes a DataStream to the standard output stream (stderr). * * <p>For each element of the DataStream the result of {@link Object#toString()} is written. * * <p>NOTE: This will print to stderr on the machine where the code is executed, i.e. the Flink * worker. * * @param sinkIdentifier The string to prefix the output with. * @return The closed DataStream. */ @PublicEvolving public DataStreamSink<T> printToErr(String sinkIdentifier) { PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(sinkIdentifier, true); return addSink(printFunction).name("Print to Std. Err"); } /** * Adds the given sink to this DataStream. Only streams with sinks added * will be executed once the {@link StreamExecutionEnvironment#execute()} * method is called. * * @param sinkFunction * The object containing the sink's invoke function. * @return The closed DataStream. */ public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) { // read the output type of the input Transform to coax out errors about MissingTypeInfo transformation.getOutputType(); // configure the type if needed if (sinkFunction instanceof InputTypeConfigurable) { ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig()); } StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction)); DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator); getExecutionEnvironment().addOperator(sink.getTransformation()); return sink; }
- DataStream提供了幾個print開頭的方法,內部是建立了PrintSinkFunction,通過呼叫addSink操作把該PrintSinkFunction新增進去
- addSink方法的註釋表明帶有sinks的streams,會在StreamExecutionEnvironment.execute()呼叫的時候被執行
- SinkFunction先是被StreamSink包裝,然後被DataStreamSink包裝,最後通過DataStreamSink.getTransformation作為operator新增到ExecutionEnvironment
SinkFunction
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
/** * Interface for implementing user defined sink functionality. * * @param <IN> Input type parameter. */ @Public public interface SinkFunction<IN> extends Function, Serializable { /** * @deprecated Use {@link #invoke(Object, Context)}. */ @Deprecated default void invoke(IN value) throws Exception {} /** * Writes the given value to the sink. This function is called for every record. * * <p>You have to override this method when implementing a {@code SinkFunction}, this is a * {@code default} method for backward compatibility with the old-style method only. * * @param value The input record. * @param context Additional context about the input record. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. */ default void invoke(IN value, Context context) throws Exception { invoke(value); } /** * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about * an input record. * * <p>The context is only valid for the duration of a * {@link SinkFunction#invoke(Object, Context)} call. Do not store the context and use * afterwards! * * @param <T> The type of elements accepted by the sink. */ @Public // Interface might be extended in the future with additional methods. interface Context<T> { /** Returns the current processing time. */ long currentProcessingTime(); /** Returns the current event-time watermark. */ long currentWatermark(); /** * Returns the timestamp of the current input record or {@code null} if the element does not * have an assigned timestamp. */ Long timestamp(); } }
- SinkFunction介面定義了invoke方法,用來觸發sink邏輯;invoke方法裡頭傳遞了一個Context,該介面定義了currentProcessingTime、currentWatermark、timestamp三個方法
RichSinkFunction
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
/**
* A {@link org.apache.flink.api.common.functions.RichFunction} version of {@link SinkFunction}.
*/
@Public
public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {
private static final long serialVersionUID = 1L;
}
- RichSinkFunction抽象類繼承了AbstractRichFunction類,同時也宣告實現SinkFunction介面;大部分內建的sink function都繼承了RichSinkFunction;AbstractRichFunction主要是提供了RuntimeContext屬性,可以用來獲取function執行時的上下文
PrintSinkFunction
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
/**
* Implementation of the SinkFunction writing every tuple to the standard
* output or standard error stream.
*
* <p>
* Four possible format options:
* {@code sinkIdentifier}:taskId> output <- {@code sinkIdentifier} provided, parallelism > 1
* {@code sinkIdentifier}> output <- {@code sinkIdentifier} provided, parallelism == 1
* taskId> output <- no {@code sinkIdentifier} provided, parallelism > 1
* output <- no {@code sinkIdentifier} provided, parallelism == 1
* </p>
*
* @param <IN> Input record type
*/
@PublicEvolving
public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
private static final long serialVersionUID = 1L;
private final PrintSinkOutputWriter<IN> writer;
/**
* Instantiates a print sink function that prints to standard out.
*/
public PrintSinkFunction() {
writer = new PrintSinkOutputWriter<>(false);
}
/**
* Instantiates a print sink function that prints to standard out.
*
* @param stdErr True, if the format should print to standard error instead of standard out.
*/
public PrintSinkFunction(final boolean stdErr) {
writer = new PrintSinkOutputWriter<>(stdErr);
}
/**
* Instantiates a print sink function that prints to standard out and gives a sink identifier.
*
* @param stdErr True, if the format should print to standard error instead of standard out.
* @param sinkIdentifier Message that identify sink and is prefixed to the output of the value
*/
public PrintSinkFunction(final String sinkIdentifier, final boolean stdErr) {
writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
}
@Override
public void invoke(IN record) {
writer.write(record);
}
@Override
public String toString() {
return writer.toString();
}
}
- PrintSinkFunction繼承了RichSinkFunction,它主要是使用了PrintSinkOutputWriter,在invoke的時候呼叫PrintSinkOutputWriter的write方法來執行輸出
PrintSinkOutputWriter
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java
/**
* Print sink output writer for DataStream and DataSet print API.
*/
@Internal
public class PrintSinkOutputWriter<IN> implements Serializable {
private static final long serialVersionUID = 1L;
private static final boolean STD_OUT = false;
private static final boolean STD_ERR = true;
private final boolean target;
private transient PrintStream stream;
private final String sinkIdentifier;
private transient String completedPrefix;
public PrintSinkOutputWriter() {
this("", STD_OUT);
}
public PrintSinkOutputWriter(final boolean stdErr) {
this("", stdErr);
}
public PrintSinkOutputWriter(final String sinkIdentifier, final boolean stdErr) {
this.target = stdErr;
this.sinkIdentifier = (sinkIdentifier == null ? "" : sinkIdentifier);
}
public void open(int subtaskIndex, int numParallelSubtasks) {
// get the target stream
stream = target == STD_OUT ? System.out : System.err;
completedPrefix = sinkIdentifier;
if (numParallelSubtasks > 1) {
if (!completedPrefix.isEmpty()) {
completedPrefix += ":";
}
completedPrefix += (subtaskIndex + 1);
}
if (!completedPrefix.isEmpty()) {
completedPrefix += "> ";
}
}
public void write(IN record) {
stream.println(completedPrefix + record.toString());
}
@Override
public String toString() {
return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
}
}
- PrintSinkOutputWriter的構造器最多可以接收兩個引數,分別是sinkIdentifier以及stdErr;sinkIdentifier即為輸出的字首,stdErr用於表示是否輸出到System.err
- open方法主要用於做一些準備工作,它在PrintSinkFunction的open方法裡頭會被呼叫,PrintSinkFunction的open方法會從AbstractRichFunction定義的RuntimeContext裡頭獲取subtaskIndex及numParallelSubtasks傳遞過來;這裡的open方法根據sinkIdentifier以及subtaskIndex、numParallelSubtasks資訊構建completedPrefix
- write方法就是呼叫System.out或者System.err的println方法,帶上completedPrefix及record的資訊
小結
- DataStream的幾個print開頭的方法內部建立的是PrintSinkFunction,然後呼叫addSink方法新增到ExecutionEnvironment中(
先是被StreamSink包裝,然後被DataStreamSink包裝,最後通過DataStreamSink.getTransformation作為operator新增到ExecutionEnvironment
) - SinkFunction是sink function的基礎介面,它主要定義了invoke方法,該方法裡頭傳遞了一個Context;而內建的一些sink function大多是繼承的RichSinkFunction,RichSinkFunction主要是繼承了AbstractRichFunction,可以提供funtion執行時的RuntimeContext資訊
- PrintSinkFunction繼承了RichSinkFunction,它主要是使用了PrintSinkOutputWriter,在invoke的時候呼叫PrintSinkOutputWriter的write方法來執行輸出