1. 程式人生 > 其它 >Flink(1):Flink的基礎案例

Flink(1):Flink的基礎案例

相關文章連結

1、批處理的WordCount案例

// 建立執行環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// 獲取資料
DataSource<String> dataSource = env.fromElements("flink spark hadoop", "hadoop spark", "flink flink");

// 轉換資料
AggregateOperator<Tuple2<String, Integer>> result = dataSource
    .flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String s, Collector<String> collector) throws Exception {
            for (String field : s.split(" ")) {
                collector.collect(field);
            }
        }
    })
    .map(new MapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> map(String s) throws Exception {
            return Tuple2.of(s, 1);
        }
    })
    .groupBy(0)
    .sum(1);

// 輸出資料
result.print();

2、流處理的WordCount案例

// 執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//env.setRuntimeMode(RuntimeExecutionMode.BATCH);
//env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

// source資料來源
DataStreamSource<String> lines = env.socketTextStream("localhost", 9999);

// 資料轉換
SingleOutputStreamOperator<Tuple2<String, Integer>> result = lines
    .flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String s, Collector<String> collector) throws Exception {
            for (String word : s.split(" ")) {
                collector.collect(word);
            }
        }
    })
    .map(new MapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> map(String s) throws Exception {
            return Tuple2.of(s, 1);
        }
    })
    .keyBy(t -> t.f0)
    .sum(1);

// sink
result.print();

env.execute();

3、流處理的基於Lambda表示式的WordCount案例

// 執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

// 獲取資料
DataStreamSource<String> dataStreamSource = env.fromElements("abc abc abc");

// 資料轉換
SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStreamSource
    .flatMap((String value, Collector<String> out) -> {
        Arrays.stream(value.split(" ")).forEach(out::collect);
    }).returns(Types.STRING)
    .map((String value) ->
            Tuple2.of(value, 1), TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}
    ))
    .keyBy(t -> t.f0)
    .sum(1);

// 資料輸出
result.print();

// 執行程式
env.execute();