1. 程式人生 > >flink1.7自定義source實現

flink1.7自定義source實現

flink讀取source data

資料的來源是flink程式從中讀取輸入的地方。我們可以使用StreamExecutionEnvironment.addSource(sourceFunction)將源新增到程式中。
flink附帶大量預先實現好的各種讀取資料來源的函式,也可以通過為非並行源去實現SourceFunction介面或者為並行源實現ParallelSourceFunction介面或擴充套件RichParallelSourceFunction來編寫滿足自己業務需要的定製源。

flink預先實現好資料來源

下面有幾個預定義的流源可以從StreamExecutionEnvironment訪問

基於檔案

readTextFile(path): 讀取文字檔案,該檔案要符合TextInputFormat規範,逐行讀取並作為字串返回。
readFile(fileInputFormat,path): 根據指定的檔案輸入格式指定讀取檔案。
readFile(fileInputFormat,path,watchType,interval,pathFilter,typeInfo): 這是前兩個方法在內部呼叫的方法。它根據給定的fileInputFormat讀取路徑中的檔案。根據提供的watchType,該源可能會定期監視(每間隔ms)該路徑下來到的新資料(FileProcessingMode.PROCESS_CONTINUOUSLY),或者處理當前路徑中的資料後並退出(FileProcessingMode.PROCESS_ONCE)。使用pathFilter,使用者可以進一步排除檔案的處理。

基於套接字

socketTextStream : 從套接字讀取。元素可以用分隔符分隔。

基於集合

fromCollection(Collection) : 從Java Java.util.Collection建立一個數據流。集合中的所有元素必須是相同的型別。
fromCollection(Iterator,Class) :從迭代器建立資料流。該類要指定迭代器返回的元素的資料型別。
fromElements(T ...) :根據給定的物件序列建立資料流。所有物件必須是相同的型別。
fromParallelCollection(SplittableIterator,Class) : 並行地從迭代器建立資料流。該類指定迭代器返回的元素的資料型別。
generateSequence(from,to) : 在給定的區間內並行生成數字序列 。

自定義資料原

package com.intsmaze.flink.streaming.source;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;

/**
* @Description: 自定義資料來源的模板
* @Author: intsmaze
* @Date: 2019/1/4
*/ 
public class CustomSource {

    private static final int BOUND = 100;

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<Integer, Integer>> inputStream= env.addSource(new RandomFibonacciSource());

        inputStream.map(new InputMap()).print();

        env.execute("Intsmaze Custom Source");
    }


    /**
    * @Description: 
    * @Author: intsmaze
    * @Date: 2019/1/5
    */ 
    private static class RandomFibonacciSource implements SourceFunction<Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1L;

        private Random rnd = new Random();

        private volatile boolean isRunning = true;
        private int counter = 0;

        /**
        * @Description: 
        * @Param: 
        * @return: 
        * @Author: intsmaze
        * @Date: 2019/1/5
        */ 
        @Override
        public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
            while (isRunning && counter < BOUND) {
                int first = rnd.nextInt(BOUND / 2 - 1) + 1;
                int second = rnd.nextInt(BOUND / 2 - 1) + 1;
                ctx.collect(new Tuple2<>(first, second));
                counter++;
                Thread.sleep(50L);
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }


    /**
    * @Description: 
    * @Param: 
    * @return: 
    * @Author: intsmaze
    * @Date: 2019/1/5
    */ 
    public static class InputMap implements MapFunction<Tuple2<Integer, Integer>, Tuple5<Integer, Integer, Integer,
            Integer, Integer>> {
        private static final long serialVersionUID = 1L;

        @Override
        public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple2<Integer, Integer> value) throws
                Exception {
            return new Tuple5<>(value.f0, value.f1, value.f0, value.f1, 0);
        }
    }

}