1. 程式人生 > 實用技巧 >4、Flink流處理案例實現-Java

4、Flink流處理案例實現-Java

在Flink專案裡面建立一個包,同時新建一個wordcount類

package com.gong.stream;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.util.Collector; public class WordCount { public static void main(String[] args) throws Exception{ //解析命令列傳過來的引數args ParameterTool params=ParameterTool.fromArgs(args); //獲取一個flink的執行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataStream
<String> dataStream = null; if(params.has("input")) {//判斷引數是否帶有input dataStream=env.readTextFile(params.get("input")); }else { System.out.println("資料不存在"); } //資料統計單詞詞頻 DataStream<Tuple2<String,Integer>> counts= dataStream.flatMap(new
Tokenizer()) .keyBy(0) .sum(1); if(params.has("output")){ counts.writeAsText(params.get("output")); }else { counts.print(); } env.execute("Streaming wordcount "); } public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String,Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] tokens =value.toLowerCase().split("\\W+"); for (String token:tokens){ out.collect(new Tuple2<>(token,1)); } } } }