1. 程式人生 > 其它 >第一章 flink簡介與wordcount

第一章 flink簡介與wordcount

一、簡介

flink是德國和歐洲的一些大學聯合發起的專案,14年進入了Apache基金會,不到一年的時間,flink就成為了Apache的頂級專案,flink的理念是“Apache Flink是為分散式、高效能、隨時可用以及準確的流處理應用打造的開源流處理框架,用於對無界和有界資料流進行有狀態計算。”。

flink有幾個重要的特點:

  1. 事件驅動型

    應用是有狀態的,從事件流提取資料,並根據到來的事件觸發計算、狀態更新或者其他動作。以kafka為代表的訊息佇列幾乎都是事件驅動型應用。

與之不同的是SparkStreaming微批次,它將一批批資料看作連續的流,進行處理。

  1. 流與批的世界觀

    批處理的特點是有界、持久、大量,非常適合全套記錄才能完成的計算,一般用於離線計算。流處理的特點是無界、實時,無需針對整個資料集操作,而是對通過系統傳輸的每個資料項執行操作,一般用於實時統計。在 spark 的世界觀中,一切都是由批次組成的,離線資料是一個大批次,而實時資料是由一個一個無限的小批次組成的。 而在 flink 的世界觀中,一切都是由流組成的,離線資料是有界限的流,實時資料是一個沒有界限的流,這就是所謂的有界流和無界流。

  2. 分層API

最底層的抽象僅僅提供了有狀態流,它將通過過程函式被嵌入到DataStream API中。實際上使用者只需要使用核心API程式設計計科,比如DataStream API(有界或者無界流資料)以及DataSet API(有界資料集)。Table API是以表為中心的宣告式程式設計,其中表有可能會動態變化(在表達流資料時)。

目前Flink作為批處理還不是主流,不如Spark成熟,所以DataSet使用的並不是很多。所以主要是學習DataStream API 的使用。Flink Table API和Flink SQL也並不完善,很多大廠都在自己定製。

Flink的幾大模組主要如下:

  • Flink Table&Flink SQL

  • Flink Gelly圖計算

  • Flink CEP複雜事件處理

二、Wordcount

Wordcount在大資料中有點像Hello World,當我們輸出Hello World的時候,就說名成功了,同樣在大資料專案中如果成功的統計出了文字或者socket流中的單詞數量,也相當於成功運行了第一個專案。flink是一個流批一體的計算引擎,所以wordcount分為兩種,從文字或者其它儲存中讀取的批處理和從socket讀取的流處理wordcount。

專案使用maven進行專案管理,首先編寫pom檔案,新增下面兩項依賴。

<dependencies>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.10.1</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.10.1</version>
  </dependency>
</dependencies>

新增依賴後發現在第二個依賴中有2.12的字尾,flink使用Java編寫,但是他依賴的底層使用的是akka進行通訊,akka使用Scala進行編寫,這裡的2.12是指Scala的版本。

Spark原先使用akka進行通訊,後來從Spark1.3.1版本開始,為了解決大塊資料(如Shuffle)的傳輸問題,Spark引入了Netty通訊框架,到了1.6.0版本,Netty完全取代了Akka,承擔Spark內部所有的RPC通訊以及資料流傳輸。

批處理WordCount

package flink;
​
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
​
/**
 * @Classname WordCount
 * @Date 2021/10/4 17:20
 * @Created by github.com/myxiaoxin
 */
public class WordCount {
  public static void main(String[] args) throws Exception {
    //建立執行環境,每一個ExecutionEnvironment都是一個flink任務
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    //從檔案
    String inputPath = "README.md";
    DataSet<String> inputDataset = env.readTextFile(inputPath);
​
    DataSet<Tuple2<String,Integer>> wordCountDataSet = inputDataset
         .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
          @Override
          public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
            String[] words = s.split(" ");
            for(String word : words){
              collector.collect(new Tuple2<String,Integer>(word,1));
             }
           }
         })
         .groupBy(0)
         .sum(1);
    wordCountDataSet.print();
   }
}

流處理WordCount

package flink;
​
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
​
/**
 * @Classname StreamWordCount
 * @Date 2021/10/4 19:46
 * @Created by github.com/myxiaoxin
 */
public class StreamWordCount {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
​
    ParameterTool parameterTool = ParameterTool.fromArgs(args);
    String host = parameterTool.get("host");
    int port = parameterTool.getInt("port");
​
    DataStream<String> inputDStream = env.socketTextStream(host,port);
    DataStream<Tuple2<String,Integer>> wordCountDataStream = inputDStream
         .flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
          @Override
          public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
            String[] arr = s.split(" ");
            for (int i = 0; i < arr.length; i++) {
              collector.collect(new Tuple2<String,Integer>(arr[i],1));
             }
           }
         })
         .keyBy(0)
         .sum(1);
    wordCountDataStream.print().setParallelism(1);
    env.execute();
   }
}

測試——在Linux中通過netcat命令進行傳送測試。

nc -lk 7777