Flink DataStream API指南【翻譯】
1.什麼是DataStream?
DataStream是Flink中可以在資料流的基礎上實現各種transformation操作的程式,(比如filtering,updating state,defining windows,aggregating)。這些資料流最初的來源可以有很多種,比如訊息佇列,socket流,檔案等,計算的結果通過sinks途徑返回,你也可以寫這些資料到一個檔案或者標準的輸出系統中(比如命令列控制檯)。Flink程式可以執行在一個檔案(本地測試),standalone模式或者嵌入其他程式等多種方式,Flink程式的執行發生在一個local JVM或者一個叢集的許多機器上。
在介紹DataStream API之前,我們先了解一下Flink程式的基本流程。
2.Flink Program
Flink程式像常規的程式一樣對資料集合進行轉換操作,每個程式由下面幾部分組成:
- 獲取一個執行環境
- 載入/建立初始化資料
- 指定對於資料的transformations操作
- 指定計算的輸出結果(列印或者輸出到檔案)
- 觸發程式執行
StreamExecutionEnvironment 是所有Flink程式的基礎,你可以使用StreamExecutionEnvironment類的靜態方法來獲取,有以下三個靜態方法:
getExecutionEnvironment() createLocalEnvironment() createRemoteEnvironment(String host, int port, String... jarFiles)
典型地,當你依賴於文字實現一些計算時,你需要使用getExecutionEnvironment()來獲取;如果執行程式在IDE或者java程式中時可以建立一個本地環境變數來執行觸發程式;如果需要打包程式jar包執行,可以使用命令列提交,如下命令所示:
./bin/flink run ./examples/batch/WordCount.jar
可以通過執行環境來指定資料來源,有多種資料來源可以指定,比如命令列,CSV檔案讀取,使用訊息佇列消費等格式,比如讀取一個文字檔案的資料:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = env.readTextFile("file:///path/to/file");
接著你可以使用DataStream的transformation方法來對資料進行操作,比如下面的例項,將源資料轉換為Integer格式資料:
DataStream<String> input = ...;
DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) {
return Integer.parseInt(value);
}
});
接著你可以將計算的結果列印在控制檯或者寫入某個檔案,比如下面的寫法:
writeAsText(String path)
print()
最後,我們需要使用執行環境變數Env呼叫execute方法來觸發Flink程式的執行。
下面通過一個簡單的WordCount程式來看看Flink程式的基本寫法與執行結果,如果你使用的IDE來編寫程式,需要匯入下面maven依賴,Flink版本使用的是最新版本1.6.1。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
程式使用Scala編寫(會scala的話java照著翻譯就行,很簡單),WordCount程式如下所示:
import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem.WriteMode
/**
* 可以直接本地執行
*/
object WordCount {
def main(args: Array[String]) {
val env = ExecutionEnvironment.createLocalEnvironment(1)
//從本地讀取檔案
val text = env.readTextFile("F:\\FlinkProject\\src\\main\\scala\\cn\\just\\shinelon\\data\\word.txt")
//單詞統計
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.groupBy(0) //按照第一個欄位分組
.sum(1) //第二個欄位求和
//輸出結果
counts.print()
//儲存結果到txt檔案
counts.writeAsText("F:\\FlinkProject\\src\\main\\scala\\cn\\just\\shinelon\\print\\wordcount.txt", WriteMode.OVERWRITE)
env.execute("Scala WordCount Example")
}
}
程式執行結果如下所示:
3.資料來源
3.1文字檔案
你可以繫結一個數據源到你的程式使用StreamExecutionEnvironment.addSource(sourceFunction)。
下面有幾種資料來源的輸入格式API可以使用StreamExecutionEnvironment來呼叫:
readTextFile(path):讀取一個文字檔案,輸入檔案可以指定輸入格式
readFile(fileInputFormat, path):從指定目錄讀取檔案
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo):這個方法在指定檔案格式的基礎上讀取檔案,依賴於watchType,它可以通過執行interval(ms為單位)愛週期性的監控目錄下產生的新的資料。主要有兩種檔案處理模式:(FileProcessingMode.PROCESS_CONTINUOUSLY)模式和(FileProcessingMode.PROCESS_ONCE)
注意:
- 如果watchType設定為FileProcessingMode.PROCESS_CONTINUOUSLY,當一個檔案被修改,它的整個文字內容將會全部被重新執行,這樣會破壞"exactly-once"語義(不懂的可以查一下,也可以參考我的手記【Spark Streaming容錯性分析】)。當在檔案末尾增加新的資料時整個檔案都將會被重新處理一次。
- 如果watchType設定為FileProcessingMode.PROCESS_ONCE,它將會掃描已經存在在路徑中的檔案,不會等等待reader區完成讀取檔案內容,不過reader會一直讀取資料直到讀完整個檔案。
3.2 socket流資料
socketTextStream :從socket讀取資料,元素之間按照指定分隔符分割
3.3 集合資料
主要有以下API:
fromCollection(Collection):建立一個數據流從Java.util.Collection,集合中的所有元素必須指定相同的型別
fromCollection(Iterator, Class):使用iterator建立一個數據流,通過class指定iterator元素的資料型別
fromElements(T ...):使用物件佇列來建立資料流,所有的物件必須有相同的型別。
fromParallelCollection(SplittableIterator, Class) :使用並行方式的iterator
generateSequence(from, to) :生成一個指定範圍的數字佇列
3.4 Consumer
addSource :繫結一個kafka Consumer,比如addSource(new FlinkKafkaConsumer08<>(...))
4. DataStream Transformations
詳情可以檢視文件transformation operations。
5. Data Sink
主要有以下API:
writeAsText() / TextOutputFormat:寫入一個文字檔案
writeAsCsv(...) / CsvOutputFormat:寫入一個CSV格式檔案
print() / printToErr():輸出列印到控制檯
writeUsingOutputFormat() / FileOutputFormat :支援物件到位元組的轉換
writeToSocket:寫一個元素到socket根據SerializationSchema
addSink:呼叫一個custom sink方法,也可以繫結其他系統,比如kafka
6. Iteratoins
迭代流程式實現了一個步進函式並且嵌入它到一個IterativeStream中,一個DataStream程式也許永遠也不會執行完畢,它沒有迭代的最大次數,不過,你必須使用一個split transformation或者filter來反饋到iteration函式,可以通過下面方式建立使用:
IterativeStream<Integer> iteration = input.iterate();
DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);
下面通過一個簡單的案例來說明:
import org.apache.flink.streaming.api.scala._
/**
* 測試迭代流程式
* 0-1000的每一個整數迭代減1
*/
object SubtractIterator {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val someIntegers = env.generateSequence(0,1000)
val iteratedStream = someIntegers.iterate(iteration =>{
//map transformation
val minusOne = iteration.map(x=>x-1)
//filter
val stillGreaterThanZero = minusOne.filter (_ > 0)
val lessThanZero = minusOne.filter(_ <= 0)
(stillGreaterThanZero, lessThanZero)
})
iteratedStream.print()
env.execute("SubtractIterator")
}
}
7.執行引數
StreamExecutionEnvironment 的方法ExecutionConfig 允許設定程式執行時引數。比如:
setAutoWatermarkInterval(long milliseconds):設定發射流自動標記間隔,也可以獲取當前值使用方法long getAutoWatermarkInterval()
8.容錯機制
Flink可以使用state&Checkpoint機制進行容錯,可以參考文件State&CheckPoint
不過還有一些隱患需要注意,預設的,元素不能在網路之間一個一個的進行傳輸,但是它可以通過buffer的形式,buffer的大小可以通過Flink的配置檔案設定,我們可以設定最佳的資料吞吐量,但是它也有可能會造成一個隱患是輸入流不是足夠快,為了協調吞吐量和這個隱患,我們可以使用env.setBufferTimeout(timeoutMillis) 來設定一個最大等待時間來等待buffer填滿資料,如果等待超時,buffer會自動傳送事件即使buffer還沒有填滿,預設的超時時間是100ms。例如:
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
為了設定最大的吞吐量,你可以設定setBufferTimeout(-1)來取消超時等待只有當buffer填滿了才傳送,當然你也可以設定為0來避免上述隱患,加快輸入流的速度,不過這樣會導致效能嚴重下降。
以上是參考官方文件的翻譯文章,本人水平有限,如有不足之處還請指教。
官方文件 Flink DataStream API Programming Guide