1. 程式人生 > 其它 >Flink(2):Flink的Source源

Flink(2):Flink的Source源

相關文章連結

1、Flink的Source源之從集合中獲取資料

 1 //1.env
 2 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 3 
 4 //2.source
 5 // * 1.env.fromElements(可變引數);
 6 DataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink");
 7 // * 2.env.fromColletion(各種集合);
 8 DataStream<String> ds2 = env.fromCollection(Arrays.asList("hadoop", "spark", "flink"));
9 // * 3.env.generateSequence(開始,結束); 10 DataStream<Long> ds3 = env.generateSequence(1, 10); 11 12 //3.Transformation 13 14 //4.sink 15 ds1.print(); 16 ds2.print(); 17 ds3.print(); 18 19 //5.execute 20 env.execute();

2、Flink的Source源之從集合中獲取資料

//建立Flink流的執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//2.source // * 1.env.readTextFile(本地檔案/HDFS檔案);//壓縮檔案也可以 DataStream<String> ds1 = env.readTextFile("D:\\Project\\IDEA\\bigdata-study-tutorial\\flink-tutorial-java\\src\\main\\data\\input\\words.txt"); DataStream<String> ds2 = env.readTextFile("data/input/dir"); DataStream<String> ds3 = env.readTextFile("hdfs://node1:8020//wordcount/input/words.txt"); DataStream
<String> ds4 = env.readTextFile("data/input/wordcount.txt.gz"); //3.Transformation //4.sink ds1.print(); ds2.print(); ds3.print(); ds4.print(); //5.execute env.execute();

3、Flink之自定義Source源

public static void main(String[] args) throws Exception {
    //##### 建立Flink流的執行環境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
    // env.setParallelism(4);
    // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    // 新增自定義source源
    DataStreamSource<SensorReading> dataStream = env.addSource(new SensorReadingSource(1, 1000L));

    // 列印資料
    dataStream.print();

    //##### 啟動執行環境
    env.execute("SourceDemo02_UDF");
}

/**
 * 溫度感測器的bean類
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class SensorReading {
    private String id;
    private Long timestamp;
    private Double temperature;
}

/**
 * 自定義SensorReading源
 */
public static class SensorReadingSource extends RichSourceFunction<SensorReading> {

    /**
     * 設定標識
     */
    private boolean running = true;

    private Integer num = 5;
    private Long interval = 1000L;

    public SensorReadingSource() {
    }

    /**
     * 根據傳入的 數量 和 時間間隔 建立自定義源
     *
     * @param num      要建立的感測器數量
     * @param interval 建立的感測器時間間隔
     */
    public SensorReadingSource(Integer num, Long interval) {
        this.num = num;
        this.interval = interval;
    }

    @Override
    public void run(SourceContext<SensorReading> ctx) throws Exception {

        // 定義一個隨機物件
        Random random = new Random();

        // 初始化num個感測器
        ArrayList<SensorReading> sensorReadings = new ArrayList<>();
        for (int i = 1; i <= num; i++) {
            String id = "sensor_" + i;
            long timestamp = System.currentTimeMillis();
            double temperature = 60 + random.nextGaussian() * 10;
            sensorReadings.add(new SensorReading(id, timestamp, temperature));
        }

        // 當running為true時,生成資料
        while (running) {
            long timestamp = System.currentTimeMillis();
            for (SensorReading sensorReading : sensorReadings) {
                sensorReading.setTimestamp(timestamp);
                sensorReading.setTemperature(sensorReading.getTemperature() + random.nextGaussian());
                ctx.collect(sensorReading);
            }
            Thread.sleep(interval);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}