1. 程式人生 > >flink dataset api使用及原理

flink dataset api使用及原理

隨著大資料技術在各行各業的廣泛應用,要求能對海量資料進行實時處理的需求越來越多,同時資料處理的業務邏輯也越來越複雜,傳統的批處理方式和早期的流式處理框架也越來越難以在延遲性、吞吐量、容錯能力以及使用便捷性等方面滿足業務日益苛刻的要求。

在這種形勢下,新型流式處理框架Flink通過創造性地把現代大規模並行處理技術應用到流式處理中來,極大地改善了以前的流式處理框架所存在的問題。

 

1.概述:

flink提供DataSet Api使用者處理批量資料。flink先將接入資料轉換成DataSet資料集,並行分佈在叢集的每個節點上;然後將DataSet資料集進行各種轉換操作(map,filter等),最後通過DataSink操作將結果資料集輸出到外部系統。

 

2.資料接入

輸入InputFormat

/**
 * The base interface for data sources that produces records.
 * <p>
 * The input format handles the following:
 * <ul>
 *   <li>It describes how the input is split into splits that can be processed in parallel.</li>
 *   <li>It describes how to read records from the input split.</li>
 *   <li>It describes how to gather basic statistics from the input.</li> 
 * </ul>
 * <p>
 * The life cycle of an input format is the following:
 * <ol>
 *   <li>After being instantiated (parameterless), it is configured with a {@link Configuration} object. 
 *       Basic fields are read from the configuration, such as a file path, if the format describes
 *       files as input.</li>
 *   <li>Optionally: It is called by the compiler to produce basic statistics about the input.</li>
 *   <li>It is called to create the input splits.</li>
 *   <li>Each parallel input task creates an instance, configures it and opens it for a specific split.</li>
 *   <li>All records are read from the input</li>
 *   <li>The input format is closed</li>
 * </ol>
 * <p>
 * IMPORTANT NOTE: Input formats must be written such that an instance can be opened again after it was closed. That
 * is due to the fact that the input format is used for potentially multiple splits. After a split is done, the
 * format's close function is invoked and, if another split is available, the open function is invoked afterwards for
 * the next split.
 *  
 * @see InputSplit
 * @see BaseStatistics
 * 
 * @param <OT> The type of the produced records.
 * @param <T> The type of input split.
 */

 

3.資料轉換

DataSet:一組相同型別的元素。DataSet可以通過transformation轉換成其它的DataSet。示例如下:

DataSet#map(org.apache.flink.api.common.functions.MapFunction)
DataSet#reduce(org.apache.flink.api.common.functions.ReduceFunction)
DataSet#join(DataSet)
DataSet#coGroup(DataSet)

其中,Function:使用者定義的業務邏輯,支援java 8 lambda表示式

 function的實現通過operator來做的,以map為例

    /**
     * Applies a Map transformation on this DataSet.
     *
     * <p>The transformation calls a {@link org.apache.flink.api.common.functions.MapFunction} for each element of the DataSet.
     * Each MapFunction call returns exactly one element.
     *
     * @param mapper The MapFunction that is called for each element of the DataSet.
     * @return A MapOperator that represents the transformed DataSet.
     *
     * @see org.apache.flink.api.common.functions.MapFunction
     * @see org.apache.flink.api.common.functions.RichMapFunction
     * @see MapOperator
     */
    public <R> MapOperator<T, R> map(MapFunction<T, R> mapper) {
        if (mapper == null) {
            throw new NullPointerException("Map function must not be null.");
        }

        String callLocation = Utils.getCallLocationName();
        TypeInformation<R> resultType = TypeExtractor.getMapReturnTypes(mapper, getType(), callLocation, true);
        return new MapOperator<>(this, resultType, clean(mapper), callLocation);
    }

其中,Operator

 4.資料輸出

DataSink:一個用來儲存資料結果的操作。

輸出OutputFormat

 

例如,可以csv輸出

    /**
     * Writes a {@link Tuple} DataSet as CSV file(s) to the specified location with the specified field and line delimiters.
     *
     * <p><b>Note: Only a Tuple DataSet can written as a CSV file.</b>
      * For each Tuple field the result of {@link Object#toString()} is written.
     *
     * @param filePath The path pointing to the location the CSV file is written to.
     * @param rowDelimiter The row delimiter to separate Tuples.
     * @param fieldDelimiter The field delimiter to separate Tuple fields.
     * @param writeMode The behavior regarding existing files. Options are NO_OVERWRITE and OVERWRITE.
     *
     * @see Tuple
     * @see CsvOutputFormat
     * @see DataSet#writeAsText(String) Output files and directories
     */
    public DataSink<T> writeAsCsv(String filePath, String rowDelimiter, String fieldDelimiter, WriteMode writeMode) {
        return internalWriteAsCsv(new Path(filePath), rowDelimiter, fieldDelimiter, writeMode);
    }

    @SuppressWarnings("unchecked")
    private <X extends Tuple> DataSink<T> internalWriteAsCsv(Path filePath, String rowDelimiter, String fieldDelimiter, WriteMode wm) {
        Preconditions.checkArgument(getType().isTupleType(), "The writeAsCsv() method can only be used on data sets of tuples.");
        CsvOutputFormat<X> of = new CsvOutputFormat<>(filePath, rowDelimiter, fieldDelimiter);
        if (wm != null) {
            of.setWriteMode(wm);
        }
        return output((OutputFormat<T>) of);
    }

 5.總結

  1. flink通過InputFormat對各種資料來源的資料進行讀取轉換成DataSet資料集

  2. flink提供了豐富的轉換操作,DataSet可以通過transformation轉換成其它的DataSet,內部的實現是Function和Operator。

  3. flink通過OutFormat將DataSet轉換成DataSink,最終將資料寫入到不同的儲存介質。

 

參考資料:

【1】https://blog.51cto.com/13654660/208