Flink(1):Flink的基礎案例
阿新 • • 發佈:2021-06-20
相關文章連結
1、批處理的WordCount案例
// 建立執行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 獲取資料 DataSource<String> dataSource = env.fromElements("flink spark hadoop", "hadoop spark", "flink flink"); // 轉換資料 AggregateOperator<Tuple2<String, Integer>> result = dataSource .flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String s, Collector<String> collector) throws Exception { for (String field : s.split(" ")) { collector.collect(field); } } }) .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String s) throws Exception { return Tuple2.of(s, 1); } }) .groupBy(0) .sum(1); // 輸出資料 result.print();
2、流處理的WordCount案例
// 執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); //env.setRuntimeMode(RuntimeExecutionMode.BATCH); //env.setRuntimeMode(RuntimeExecutionMode.STREAMING); // source資料來源 DataStreamSource<String> lines = env.socketTextStream("localhost", 9999); // 資料轉換 SingleOutputStreamOperator<Tuple2<String, Integer>> result = lines .flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String s, Collector<String> collector) throws Exception { for (String word : s.split(" ")) { collector.collect(word); } } }) .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String s) throws Exception { return Tuple2.of(s, 1); } }) .keyBy(t -> t.f0) .sum(1); // sink result.print(); env.execute();
3、流處理的基於Lambda表示式的WordCount案例
// 執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); // 獲取資料 DataStreamSource<String> dataStreamSource = env.fromElements("abc abc abc"); // 資料轉換 SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStreamSource .flatMap((String value, Collector<String> out) -> { Arrays.stream(value.split(" ")).forEach(out::collect); }).returns(Types.STRING) .map((String value) -> Tuple2.of(value, 1), TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {} )) .keyBy(t -> t.f0) .sum(1); // 資料輸出 result.print(); // 執行程式 env.execute();