4、Flink流處理案例實現-Java
阿新 • • 發佈:2020-09-12
在Flink專案裡面建立一個包,同時新建一個wordcount類
package com.gong.stream; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.util.Collector; public class WordCount { public static void main(String[] args) throws Exception{ //解析命令列傳過來的引數args ParameterTool params=ParameterTool.fromArgs(args); //獲取一個flink的執行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataStream<String> dataStream = null; if(params.has("input")) {//判斷引數是否帶有input dataStream=env.readTextFile(params.get("input")); }else { System.out.println("資料不存在"); } //資料統計單詞詞頻 DataStream<Tuple2<String,Integer>> counts= dataStream.flatMap(newTokenizer()) .keyBy(0) .sum(1); if(params.has("output")){ counts.writeAsText(params.get("output")); }else { counts.print(); } env.execute("Streaming wordcount "); } public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String,Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] tokens =value.toLowerCase().split("\\W+"); for (String token:tokens){ out.collect(new Tuple2<>(token,1)); } } } }