聊聊flink的InputFormatSourceFunction
序
本文主要研究一下flink的InputFormatSourceFunction
例項
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); IteratorInputFormat iteratorInputFormat = new IteratorInputFormat<String>(new WordIterator()); env //TypeInformation.of(new TypeHint<String>() {} .createInput(iteratorInputFormat,TypeExtractor.createTypeInfo(String.class)) .setParallelism(1) .print();
- 這裡使用IteratorInputFormat呼叫env的createInput方法來建立SourceFunction
StreamExecutionEnvironment.createInput
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@PublicEvolving public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInfo) { DataStreamSource<OUT> source; if (inputFormat instanceof FileInputFormat) { @SuppressWarnings("unchecked") FileInputFormat<OUT> format = (FileInputFormat<OUT>) inputFormat; source = createFileInput(format, typeInfo, "Custom File source", FileProcessingMode.PROCESS_ONCE, -1); } else { source = createInput(inputFormat, typeInfo, "Custom Source"); } return source; } private <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInfo, String sourceName) { InputFormatSourceFunction<OUT> function = new InputFormatSourceFunction<>(inputFormat, typeInfo); return addSource(function, sourceName, typeInfo); }
- StreamExecutionEnvironment.createInput在inputFormat不是FileInputFormat型別的時候建立的是InputFormatSourceFunction
InputFormatSourceFunction
/** * A {@link SourceFunction} that reads data using an {@link InputFormat}. */ @Internal public class InputFormatSourceFunction<OUT> extends RichParallelSourceFunction<OUT> { private static final long serialVersionUID = 1L; private TypeInformation<OUT> typeInfo; private transient TypeSerializer<OUT> serializer; private InputFormat<OUT, InputSplit> format; private transient InputSplitProvider provider; private transient Iterator<InputSplit> splitIterator; private volatile boolean isRunning = true; @SuppressWarnings("unchecked") public InputFormatSourceFunction(InputFormat<OUT, ?> format, TypeInformation<OUT> typeInfo) { this.format = (InputFormat<OUT, InputSplit>) format; this.typeInfo = typeInfo; } @Override @SuppressWarnings("unchecked") public void open(Configuration parameters) throws Exception { StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); if (format instanceof RichInputFormat) { ((RichInputFormat) format).setRuntimeContext(context); } format.configure(parameters); provider = context.getInputSplitProvider(); serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig()); splitIterator = getInputSplits(); isRunning = splitIterator.hasNext(); } @Override public void run(SourceContext<OUT> ctx) throws Exception { try { Counter completedSplitsCounter = getRuntimeContext().getMetricGroup().counter("numSplitsProcessed"); if (isRunning && format instanceof RichInputFormat) { ((RichInputFormat) format).openInputFormat(); } OUT nextElement = serializer.createInstance(); while (isRunning) { format.open(splitIterator.next()); // for each element we also check if cancel // was called by checking the isRunning flag while (isRunning && !format.reachedEnd()) { nextElement = format.nextRecord(nextElement); if (nextElement != null) { ctx.collect(nextElement); } else { break; } } format.close(); completedSplitsCounter.inc(); if (isRunning) { isRunning = splitIterator.hasNext(); } } } finally { format.close(); if (format instanceof RichInputFormat) { ((RichInputFormat) format).closeInputFormat(); } isRunning = false; } } @Override public void cancel() { isRunning = false; } @Override public void close() throws Exception { format.close(); if (format instanceof RichInputFormat) { ((RichInputFormat) format).closeInputFormat(); } } /** * Returns the {@code InputFormat}. This is only needed because we need to set the input * split assigner on the {@code StreamGraph}. */ public InputFormat<OUT, InputSplit> getFormat() { return format; } private Iterator<InputSplit> getInputSplits() { return new Iterator<InputSplit>() { private InputSplit nextSplit; private boolean exhausted; @Override public boolean hasNext() { if (exhausted) { return false; } if (nextSplit != null) { return true; } final InputSplit split; try { split = provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader()); } catch (InputSplitProviderException e) { throw new RuntimeException("Could not retrieve next input split.", e); } if (split != null) { this.nextSplit = split; return true; } else { exhausted = true; return false; } } @Override public InputSplit next() { if (this.nextSplit == null && !hasNext()) { throw new NoSuchElementException(); } final InputSplit tmp = this.nextSplit; this.nextSplit = null; return tmp; } @Override public void remove() { throw new UnsupportedOperationException(); } }; } }
- InputFormatSourceFunction是一個使用InputFormat來讀取資料的SourceFunction,它繼承了RichParallelSourceFunction,新增了帶有2個引數的構造器,一個是InputFormat,一個是TypeInformation
- 這裡有一個getInputSplits方法,它返回的是InputSplit的Iterator(
splitIterator
),nextSplit是呼叫InputSplitProvider.getNextInputSplit來獲取 - run方法主要是挨個呼叫splitIterator.next(),並用InputFormat去open該InputSplit,然後呼叫format.nextRecord來挨個讀取該InputSplit的每個record,最後使用SourceContext的emit方法發射出去
InputSplitProvider
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java
/**
* An input split provider can be successively queried to provide a series of {@link InputSplit} objects a
* task is supposed to consume in the course of its execution.
*/
@Public
public interface InputSplitProvider {
/**
* Requests the next input split to be consumed by the calling task.
*
* @param userCodeClassLoader used to deserialize input splits
* @return the next input split to be consumed by the calling task or <code>null</code> if the
* task shall not consume any further input splits.
* @throws InputSplitProviderException if fetching the next input split fails
*/
InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException;
}
- InputSplitProvider介面定義了getNextInputSplit方法,用於查詢nextInputSplit,它有兩個實現類,分別是RpcInputSplitProvider、TaskInputSplitProvider
RpcInputSplitProvider
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
public class RpcInputSplitProvider implements InputSplitProvider {
private final JobMasterGateway jobMasterGateway;
private final JobVertexID jobVertexID;
private final ExecutionAttemptID executionAttemptID;
private final Time timeout;
public RpcInputSplitProvider(
JobMasterGateway jobMasterGateway,
JobVertexID jobVertexID,
ExecutionAttemptID executionAttemptID,
Time timeout) {
this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
this.jobVertexID = Preconditions.checkNotNull(jobVertexID);
this.executionAttemptID = Preconditions.checkNotNull(executionAttemptID);
this.timeout = Preconditions.checkNotNull(timeout);
}
@Override
public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException {
Preconditions.checkNotNull(userCodeClassLoader);
CompletableFuture<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit(
jobVertexID,
executionAttemptID);
try {
SerializedInputSplit serializedInputSplit = futureInputSplit.get(timeout.getSize(), timeout.getUnit());
if (serializedInputSplit.isEmpty()) {
return null;
} else {
return InstantiationUtil.deserializeObject(serializedInputSplit.getInputSplitData(), userCodeClassLoader);
}
} catch (Exception e) {
throw new InputSplitProviderException("Requesting the next input split failed.", e);
}
}
}
- RpcInputSplitProvider請求jobMasterGateway.requestNextInputSplit來獲取SerializedInputSplit(
本例項的splitProvider為RpcInputSplitProvider
)
TaskInputSplitProvider
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
/**
* Implementation using {@link ActorGateway} to forward the messages.
*/
public class TaskInputSplitProvider implements InputSplitProvider {
private final ActorGateway jobManager;
private final JobID jobID;
private final JobVertexID vertexID;
private final ExecutionAttemptID executionID;
private final FiniteDuration timeout;
public TaskInputSplitProvider(
ActorGateway jobManager,
JobID jobID,
JobVertexID vertexID,
ExecutionAttemptID executionID,
FiniteDuration timeout) {
this.jobManager = Preconditions.checkNotNull(jobManager);
this.jobID = Preconditions.checkNotNull(jobID);
this.vertexID = Preconditions.checkNotNull(vertexID);
this.executionID = Preconditions.checkNotNull(executionID);
this.timeout = Preconditions.checkNotNull(timeout);
}
@Override
public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException {
Preconditions.checkNotNull(userCodeClassLoader);
final Future<Object> response = jobManager.ask(
new JobManagerMessages.RequestNextInputSplit(jobID, vertexID, executionID),
timeout);
final Object result;
try {
result = Await.result(response, timeout);
} catch (Exception e) {
throw new InputSplitProviderException("Did not receive next input split from JobManager.", e);
}
if(result instanceof JobManagerMessages.NextInputSplit){
final JobManagerMessages.NextInputSplit nextInputSplit =
(JobManagerMessages.NextInputSplit) result;
byte[] serializedData = nextInputSplit.splitData();
if(serializedData == null) {
return null;
} else {
final Object deserialized;
try {
deserialized = InstantiationUtil.deserializeObject(serializedData,
userCodeClassLoader);
} catch (Exception e) {
throw new InputSplitProviderException("Could not deserialize the serialized input split.", e);
}
return (InputSplit) deserialized;
}
} else {
throw new InputSplitProviderException("RequestNextInputSplit requires a response of type " +
"NextInputSplit. Instead response is of type " + result.getClass() + '.');
}
}
}
- TaskInputSplitProvider請求jobManager.ask(new JobManagerMessages.RequestNextInputSplit(jobID, vertexID, executionID),timeout)來獲取SerializedInputSplit
InputSplit
flink-core-1.6.2-sources.jar!/org/apache/flink/core/io/InputSplit.java
/**
* This interface must be implemented by all kind of input splits that can be assigned to input formats.
*
* <p>Input splits are transferred in serialized form via the messages, so they need to be serializable
* as defined by {@link java.io.Serializable}.</p>
*/
@Public
public interface InputSplit extends Serializable {
/**
* Returns the number of this input split.
*
* @return the number of this input split
*/
int getSplitNumber();
}
- InputSplit是所有型別的input splits必須實現的介面,它InputSplit繼承了Serializable,方便進行序列化傳輸;getSplitNumber返回的是當前split的編號
- 它有四個實現類,其中兩個實現類是直接實現該介面,分別是GenericInputSplit、LocatableInputSplit
- 另外兩個分別是繼承了LocatableInputSplit的FileInputSplit,以及繼承了FileInputSplit的TimestampedFileInputSplit
GenericInputSplit
flink-core-1.6.2-sources.jar!/org/apache/flink/core/io/GenericInputSplit.java
/**
* A generic input split that has only a partition number.
*/
@Public
public class GenericInputSplit implements InputSplit, java.io.Serializable {
private static final long serialVersionUID = 1L;
/** The number of this split. */
private final int partitionNumber;
/** The total number of partitions */
private final int totalNumberOfPartitions;
// --------------------------------------------------------------------------------------------
/**
* Creates a generic input split with the given split number.
*
* @param partitionNumber The number of the split's partition.
* @param totalNumberOfPartitions The total number of the splits (partitions).
*/
public GenericInputSplit(int partitionNumber, int totalNumberOfPartitions) {
this.partitionNumber = partitionNumber;
this.totalNumberOfPartitions = totalNumberOfPartitions;
}
//......
public String toString() {
return "GenericSplit (" + this.partitionNumber + '/' + this.totalNumberOfPartitions + ')';
}
}
- GenericInputSplit比較簡單,只有兩個屬性,分別是partitionNumber、totalNumberOfPartitions(
本例項的InputSplit為GenericInputSplit型別
)
LocatableInputSplit
flink-core-1.6.2-sources.jar!/org/apache/flink/core/io/LocatableInputSplit.java
/**
* A locatable input split is an input split referring to input data which is located on one or more hosts.
*/
@Public
public class LocatableInputSplit implements InputSplit, java.io.Serializable {
private static final long serialVersionUID = 1L;
private static final String[] EMPTY_ARR = new String[0];
/** The number of the split. */
private final int splitNumber;
/** The names of the hosts storing the data this input split refers to. */
private final String[] hostnames;
// --------------------------------------------------------------------------------------------
/**
* Creates a new locatable input split that refers to a multiple host as its data location.
*
* @param splitNumber The number of the split
* @param hostnames The names of the hosts storing the data this input split refers to.
*/
public LocatableInputSplit(int splitNumber, String[] hostnames) {
this.splitNumber = splitNumber;
this.hostnames = hostnames == null ? EMPTY_ARR : hostnames;
}
/**
* Creates a new locatable input split that refers to a single host as its data location.
*
* @param splitNumber The number of the split.
* @param hostname The names of the host storing the data this input split refers to.
*/
public LocatableInputSplit(int splitNumber, String hostname) {
this.splitNumber = splitNumber;
this.hostnames = hostname == null ? EMPTY_ARR : new String[] { hostname };
}
//......
@Override
public String toString() {
return "Locatable Split (" + splitNumber + ") at " + Arrays.toString(this.hostnames);
}
}
- LocatableInputSplit是可定位的input split,它有兩個屬性,分別是splitNumber以及該split對應的資料所在的hostnames
IteratorInputFormat
flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/IteratorInputFormat.java
/**
* An input format that returns objects from an iterator.
*/
@PublicEvolving
public class IteratorInputFormat<T> extends GenericInputFormat<T> implements NonParallelInput {
private static final long serialVersionUID = 1L;
private Iterator<T> iterator; // input data as serializable iterator
public IteratorInputFormat(Iterator<T> iterator) {
if (!(iterator instanceof Serializable)) {
throw new IllegalArgumentException("The data source iterator must be serializable.");
}
this.iterator = iterator;
}
@Override
public boolean reachedEnd() {
return !this.iterator.hasNext();
}
@Override
public T nextRecord(T record) {
return this.iterator.next();
}
}
- IteratorInputFormat主要是對Iterator進行了包裝,實現了reachedEnd、nextRecord介面;它繼承了GenericInputFormat
GenericInputFormat
flink-core-1.6.2-sources.jar!/org/apache/flink/api/common/io/GenericInputFormat.java
/**
* Generic base class for all Rich inputs that are not based on files.
*/
@Public
public abstract class GenericInputFormat<OT> extends RichInputFormat<OT, GenericInputSplit> {
private static final long serialVersionUID = 1L;
/**
* The partition of this split.
*/
protected int partitionNumber;
// --------------------------------------------------------------------------------------------
@Override
public void configure(Configuration parameters) {
// nothing by default
}
@Override
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
// no statistics available, by default.
return cachedStatistics;
}
@Override
public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
if (numSplits < 1) {
throw new IllegalArgumentException("Number of input splits has to be at least 1.");
}
numSplits = (this instanceof NonParallelInput) ? 1 : numSplits;
GenericInputSplit[] splits = new GenericInputSplit[numSplits];
for (int i = 0; i < splits.length; i++) {
splits[i] = new GenericInputSplit(i, numSplits);
}
return splits;
}
@Override
public DefaultInputSplitAssigner getInputSplitAssigner(GenericInputSplit[] splits) {
return new DefaultInputSplitAssigner(splits);
}
// --------------------------------------------------------------------------------------------
@Override
public void open(GenericInputSplit split) throws IOException {
this.partitionNumber = split.getSplitNumber();
}
@Override
public void close() throws IOException {}
}
- RpcInputSplitProvider是呼叫JobMaster.requestNextInputSplit來獲取SerializedInputSplit,而JobMaster是呼叫splitAssigner.getNextInputSplit(host, taskId),這裡的splitAssigner,即為DefaultInputSplitAssigner(
從vertex.getSplitAssigner()獲取
) - 而vertex.getSplitAssigner()返回的splitAssigner,是ExecutionJobVertex在構造器裡頭根據splitSource.getInputSplitAssigner(splitSource.createInputSplits(numTaskVertices))得來的
- 而splitSource即為這裡的IteratorInputFormat,而IteratorInputFormat的createInputSplits(
根據numTaskVertices來分割
)及getInputSplitAssigner方法均為父類GenericInputFormat提供
DefaultInputSplitAssigner
flink-core-1.6.2-sources.jar!/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java
/**
* This is the default implementation of the {@link InputSplitAssigner} interface. The default input split assigner
* simply returns all input splits of an input vertex in the order they were originally computed.
*/
@Internal
public class DefaultInputSplitAssigner implements InputSplitAssigner {
/** The logging object used to report information and errors. */
private static final Logger LOG = LoggerFactory.getLogger(DefaultInputSplitAssigner.class);
/** The list of all splits */
private final List<InputSplit> splits = new ArrayList<InputSplit>();
public DefaultInputSplitAssigner(InputSplit[] splits) {
Collections.addAll(this.splits, splits);
}
public DefaultInputSplitAssigner(Collection<? extends InputSplit> splits) {
this.splits.addAll(splits);
}
@Override
public InputSplit getNextInputSplit(String host, int taskId) {
InputSplit next = null;
// keep the synchronized part short
synchronized (this.splits) {
if (this.splits.size() > 0) {
next = this.splits.remove(this.splits.size() - 1);
}
}
if (LOG.isDebugEnabled()) {
if (next == null) {
LOG.debug("No more input splits available");
} else {
LOG.debug("Assigning split " + next + " to " + host);
}
}
return next;
}
}
- DefaultInputSplitAssigner僅僅是按順序返回InputSplit
小結
- InputFormatSourceFunction是一個使用InputFormat來讀取資料的SourceFunction,它繼承了RichParallelSourceFunction,新增了帶有2個引數的構造器,一個是InputFormat,一個是TypeInformation
- 本例項使用的IteratorInputFormat繼承了GenericInputFormat,後者提供了兩個重要的方法,一個是createInputSplits(
這裡是根據numTaskVertices來分割
),一個是getInputSplitAssigner(這裡建立的是DefaultInputSplitAssigner,即按順序返回分割好的InputSplit
) - InputFormatSourceFunction的run方法主要是挨個呼叫splitIterator.next(),並用InputFormat去open該InputSplit,然後呼叫format.nextRecord來挨個讀取該InputSplit的每個record,最後使用SourceContext的emit方法發射出去
可以看到整個大的邏輯就是GenericInputFormat提供將input分割為InputSplit的方法,同時提供InputSplitAssigner,然後InputFormatSourceFunction就是挨個遍歷分割好的屬於自己(
Task
)的InputSplit(通過InputSplitAssigner獲取
),然後通過InputFormat讀取InputSplit來挨個獲取這個InputSplit的每個元素,然後通過SourceContext的emit方法發射出去