1. 程式人生 > >Flink DataStream API指南【翻譯】

Flink DataStream API指南【翻譯】

1.什麼是DataStream?

DataStream是Flink中可以在資料流的基礎上實現各種transformation操作的程式,(比如filtering,updating state,defining windows,aggregating)。這些資料流最初的來源可以有很多種,比如訊息佇列,socket流,檔案等,計算的結果通過sinks途徑返回,你也可以寫這些資料到一個檔案或者標準的輸出系統中(比如命令列控制檯)。Flink程式可以執行在一個檔案(本地測試),standalone模式或者嵌入其他程式等多種方式,Flink程式的執行發生在一個local JVM或者一個叢集的許多機器上。

在介紹DataStream API之前,我們先了解一下Flink程式的基本流程。

2.Flink Program

Flink程式像常規的程式一樣對資料集合進行轉換操作,每個程式由下面幾部分組成:

  1. 獲取一個執行環境
  2. 載入/建立初始化資料
  3. 指定對於資料的transformations操作
  4. 指定計算的輸出結果(列印或者輸出到檔案)
  5. 觸發程式執行

StreamExecutionEnvironment 是所有Flink程式的基礎,你可以使用StreamExecutionEnvironment類的靜態方法來獲取,有以下三個靜態方法:

getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(String host, int port, String... jarFiles)

典型地,當你依賴於文字實現一些計算時,你需要使用getExecutionEnvironment()來獲取;如果執行程式在IDE或者java程式中時可以建立一個本地環境變數來執行觸發程式;如果需要打包程式jar包執行,可以使用命令列提交,如下命令所示:

./bin/flink run ./examples/batch/WordCount.jar

可以通過執行環境來指定資料來源,有多種資料來源可以指定,比如命令列,CSV檔案讀取,使用訊息佇列消費等格式,比如讀取一個文字檔案的資料:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("file:///path/to/file");

接著你可以使用DataStream的transformation方法來對資料進行操作,比如下面的例項,將源資料轉換為Integer格式資料:

DataStream<String> input = ...;

DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
    @Override
    public Integer map(String value) {
        return Integer.parseInt(value);
    }
});

接著你可以將計算的結果列印在控制檯或者寫入某個檔案,比如下面的寫法:

writeAsText(String path)

print()

最後,我們需要使用執行環境變數Env呼叫execute方法來觸發Flink程式的執行。

下面通過一個簡單的WordCount程式來看看Flink程式的基本寫法與執行結果,如果你使用的IDE來編寫程式,需要匯入下面maven依賴,Flink版本使用的是最新版本1.6.1。

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_2.11</artifactId>
      <version>${flink.version}</version>
      <scope>compile</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_2.11</artifactId>
      <version>${flink.version}</version>
      <scope>compile</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-core</artifactId>
      <version>${flink.version}</version>
      <scope>compile</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.11</artifactId>
      <version>${flink.version}</version>
      <scope>compile</scope>
    </dependency>

    
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>

程式使用Scala編寫(會scala的話java照著翻譯就行,很簡單),WordCount程式如下所示:

import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem.WriteMode

/**
  * 可以直接本地執行
  */
object WordCount {

  def main(args: Array[String]) {
    val env = ExecutionEnvironment.createLocalEnvironment(1)

    //從本地讀取檔案
    val text = env.readTextFile("F:\\FlinkProject\\src\\main\\scala\\cn\\just\\shinelon\\data\\word.txt")

    //單詞統計
    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .groupBy(0)   //按照第一個欄位分組
      .sum(1)       //第二個欄位求和

    //輸出結果
    counts.print()

    //儲存結果到txt檔案
    counts.writeAsText("F:\\FlinkProject\\src\\main\\scala\\cn\\just\\shinelon\\print\\wordcount.txt", WriteMode.OVERWRITE)
    env.execute("Scala WordCount Example")

  }
}

程式執行結果如下所示:
在這裡插入圖片描述

3.資料來源

3.1文字檔案

你可以繫結一個數據源到你的程式使用StreamExecutionEnvironment.addSource(sourceFunction)。
下面有幾種資料來源的輸入格式API可以使用StreamExecutionEnvironment來呼叫:

readTextFile(path):讀取一個文字檔案,輸入檔案可以指定輸入格式
readFile(fileInputFormat, path):從指定目錄讀取檔案
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo):這個方法在指定檔案格式的基礎上讀取檔案,依賴於watchType,它可以通過執行interval(ms為單位)愛週期性的監控目錄下產生的新的資料。主要有兩種檔案處理模式:(FileProcessingMode.PROCESS_CONTINUOUSLY)模式和(FileProcessingMode.PROCESS_ONCE)

注意:

  1. 如果watchType設定為FileProcessingMode.PROCESS_CONTINUOUSLY,當一個檔案被修改,它的整個文字內容將會全部被重新執行,這樣會破壞"exactly-once"語義(不懂的可以查一下,也可以參考我的手記【Spark Streaming容錯性分析】)。當在檔案末尾增加新的資料時整個檔案都將會被重新處理一次。
  2. 如果watchType設定為FileProcessingMode.PROCESS_ONCE,它將會掃描已經存在在路徑中的檔案,不會等等待reader區完成讀取檔案內容,不過reader會一直讀取資料直到讀完整個檔案。
3.2 socket流資料
socketTextStream :從socket讀取資料,元素之間按照指定分隔符分割
3.3 集合資料

主要有以下API:

fromCollection(Collection):建立一個數據流從Java.util.Collection,集合中的所有元素必須指定相同的型別
fromCollection(Iterator, Class):使用iterator建立一個數據流,通過class指定iterator元素的資料型別
fromElements(T ...):使用物件佇列來建立資料流,所有的物件必須有相同的型別。
fromParallelCollection(SplittableIterator, Class) :使用並行方式的iterator
generateSequence(from, to) :生成一個指定範圍的數字佇列
3.4 Consumer
addSource :繫結一個kafka Consumer,比如addSource(new FlinkKafkaConsumer08<>(...))

4. DataStream Transformations

詳情可以檢視文件transformation operations

5. Data Sink

主要有以下API:

writeAsText() / TextOutputFormat:寫入一個文字檔案
writeAsCsv(...) / CsvOutputFormat:寫入一個CSV格式檔案
print() / printToErr():輸出列印到控制檯
writeUsingOutputFormat() / FileOutputFormat :支援物件到位元組的轉換
writeToSocket:寫一個元素到socket根據SerializationSchema
addSink:呼叫一個custom sink方法,也可以繫結其他系統,比如kafka

6. Iteratoins

迭代流程式實現了一個步進函式並且嵌入它到一個IterativeStream中,一個DataStream程式也許永遠也不會執行完畢,它沒有迭代的最大次數,不過,你必須使用一個split transformation或者filter來反饋到iteration函式,可以通過下面方式建立使用:

IterativeStream<Integer> iteration = input.iterate();

DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);

下面通過一個簡單的案例來說明:

import org.apache.flink.streaming.api.scala._

/**
  * 測試迭代流程式
  * 0-1000的每一個整數迭代減1
  */
object SubtractIterator {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val someIntegers = env.generateSequence(0,1000)

    val iteratedStream = someIntegers.iterate(iteration =>{
      //map transformation
      val minusOne = iteration.map(x=>x-1)
      //filter
      val stillGreaterThanZero = minusOne.filter (_ > 0)
      val lessThanZero = minusOne.filter(_ <= 0)
      (stillGreaterThanZero, lessThanZero)
    })

    iteratedStream.print()

    env.execute("SubtractIterator")

  }

}

7.執行引數

StreamExecutionEnvironment 的方法ExecutionConfig 允許設定程式執行時引數。比如:

setAutoWatermarkInterval(long milliseconds):設定發射流自動標記間隔,也可以獲取當前值使用方法long getAutoWatermarkInterval()

8.容錯機制

Flink可以使用state&Checkpoint機制進行容錯,可以參考文件State&CheckPoint
不過還有一些隱患需要注意,預設的,元素不能在網路之間一個一個的進行傳輸,但是它可以通過buffer的形式,buffer的大小可以通過Flink的配置檔案設定,我們可以設定最佳的資料吞吐量,但是它也有可能會造成一個隱患是輸入流不是足夠快,為了協調吞吐量和這個隱患,我們可以使用env.setBufferTimeout(timeoutMillis) 來設定一個最大等待時間來等待buffer填滿資料,如果等待超時,buffer會自動傳送事件即使buffer還沒有填滿,預設的超時時間是100ms。例如:

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);

env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

為了設定最大的吞吐量,你可以設定setBufferTimeout(-1)來取消超時等待只有當buffer填滿了才傳送,當然你也可以設定為0來避免上述隱患,加快輸入流的速度,不過這樣會導致效能嚴重下降。

以上是參考官方文件的翻譯文章,本人水平有限,如有不足之處還請指教。
官方文件 Flink DataStream API Programming Guide