1. 程式人生 > 程式設計 >Java lambda表示式實現Flink WordCount過程解析

Java lambda表示式實現Flink WordCount過程解析

這篇文章主要介紹了Java lambda表示式實現Flink WordCount過程解析,文中通過示例程式碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下

本篇我們將使用Java語言來實現Flink的單詞統計。

程式碼開發

環境準備

匯入Flink 1.9 pom依賴

<dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>1.9.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.11</artifactId>
      <version>1.9.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-lang3</artifactId>
      <version>3.7</version>
    </dependency>
  </dependencies>

構建Flink流處理環境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

自定義source

每秒生成一行文字

DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() {
      private boolean isCanal = false;
      private String[] words = {
          "important oracle jdk license update","the oracle jdk license has changed for releases starting april 16 2019","the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ","personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ","downloading and using this product an faq is available here ","commercial license and support is available with a low cost java se subscription","oracle also provides the latest openjdk release under the open source gpl license at jdk java net"
      };

      @Override
      public void run(SourceContext<String> ctx) throws Exception {
        // 每秒傳送一行文字
        while (!isCanal) {
          int randomIndex = RandomUtils.nextInt(0,words.length);
          ctx.collect(words[randomIndex]);
          Thread.sleep(1000);
        }
      }

      @Override
      public void cancel() {
        isCanal = true;
      }
    });

單詞計算

// 3. 單詞統計
    // 3.1 將文字行切分成一個個的單詞
    SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line,Collector<String> ctx) -> {
      // 切分單詞
      Arrays.stream(line.split(" ")).forEach(word -> {
        ctx.collect(word);
      });
    }).returns(Types.STRING);

    //3.2 將單詞轉換為一個個的元組
    SingleOutputStreamOperator<Tuple2<String,Integer>> tupleDS = wordsDS
        .map(word -> Tuple2.of(word,1))
        .returns(Types.TUPLE(Types.STRING,Types.INT));

    // 3.3 按照單詞進行分組
    KeyedStream<Tuple2<String,Integer>,String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);

    // 3.4 對每組單詞數量進行累加
    SingleOutputStreamOperator<Tuple2<String,Integer>> resultDS = keyedDS
        .timeWindow(Time.seconds(3))
        .reduce((t1,t2) -> Tuple2.of(t1.f0,t1.f1 + t2.f1));

    resultDS.print();

參考程式碼

public class WordCount {
  public static void main(String[] args) throws Exception {
    // 1. 構建Flink流式初始化環境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 2. 自定義source - 每秒傳送一行文字
    DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() {
      private boolean isCanal = false;
      private String[] words = {
          "important oracle jdk license update",words.length);
          ctx.collect(words[randomIndex]);
          Thread.sleep(1000);
        }
      }

      @Override
      public void cancel() {
        isCanal = true;
      }
    });

    // 3. 單詞統計
    // 3.1 將文字行切分成一個個的單詞
    SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line,t1.f1 + t2.f1));

    resultDS.print();

    env.execute("app");
  }
}

Flink對Java Lambda表示式支援情況

Flink支援Java API所有操作符使用Lambda表示式。但是,但Lambda表示式使用Java泛型時,就需要宣告型別資訊。

我們來看下上述的這段程式碼:

SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line,Collector<String> ctx) -> {
      // 切分單詞
      Arrays.stream(line.split(" ")).forEach(word -> {
        ctx.collect(word);
      });
    }).returns(Types.STRING);

之所以這裡將所有的型別資訊,因為Flink無法正確自動推斷出來Collector中帶的泛型。我們來看一下FlatMapFuntion的原始碼

@Public
@FunctionalInterface
public interface FlatMapFunction<T,O> extends Function,Serializable {

  /**
  * The core method of the FlatMapFunction. Takes an element from the input data set and transforms
  * it into zero,one,or more elements.
  *
  * @param value The input value.
  * @param out The collector for returning result values.
  *
  * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
  *          to fail and may trigger recovery.
  */
  void flatMap(T value,Collector<O> out) throws Exception;
}

我們發現 flatMap的第二個引數是Collector<O>,是一個帶引數的泛型。Java編譯器編譯該程式碼時會進行引數型別擦除,所以Java編譯器會變成成:

void flatMap(T value,Collector out)

這種情況,Flink將無法自動推斷型別資訊。如果我們沒有顯示地提供型別資訊,將會出現以下錯誤:

org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing.
  In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.
  An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface.
  Otherwise the type has to be specified explicitly using type information.

這種情況下,必須要顯示指定型別資訊,否則輸出將返回值視為Object型別,這將導致Flink無法正確序列化。

所以,我們需要顯示地指定Lambda表示式的引數型別資訊,並通過returns方法顯示指定輸出的型別資訊

我們再看一段程式碼:

SingleOutputStreamOperator<Tuple2<String,Types.INT));

為什麼map後面也需要指定型別呢?

因為此處map返回的是Tuple2型別,Tuple2是帶有泛型引數,在編譯的時候同樣會被查出泛型引數資訊,導致Flink無法正確推斷。

更多關於對Java Lambda表示式的支援請參考官網:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/java_lambdas.html

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。