1. 程式人生 > >Flink Basic API Concepts 學習筆記&譯文

Flink Basic API Concepts 學習筆記&譯文

地址:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/api_concepts.html#

Flink也是常規的分散式程式,在分散式資料集上實現了transformation運算元,例如: filtering, mapping, updating state, joining, grouping, defining windows, aggregating。資料集初始化一般是讀取檔案、Kafka、或者記憶體資料等,結果一般可以寫到檔案或者終端列印。Flink支援Standalone或者執行在其他框架之上,不同的執行模式下Flink執行在不同的Context中,Flink程式可以執行在本地或者是叢集中!

根據資料來源的型別,Flink可以開發批處理程式或者Streaming程式,批處理程式使用DataSet API,Streaming程式使用

Note:正如前面的example,Streaming程式使用的是StreamingExecutionEnvironment 和DataStream API,批處理使用的是ExecutionEnvironment 和DataSet API。

DataSet and DataStream

Flink使用DataSetDataStream在程式中代表資料,你可以認為他們是可以包含重複資料的不可變的資料集合。DataSet是有限的資料,DataStream是無邊界的資料!

資料集和Java collection區別:首先,資料集一旦建立了就無法再add或者remove元素了,同時也無法訪問裡面的元素(應該指的是不可以想Java集合一樣get某一個元素)!

Flink程式一旦建立了資料集,可以根據現有的資料集使用運算元生成新的資料集!

Anatomy of a Flink Program
Flink程式也是一個比較有規律的程式,就是在資料集上執行運算元。每一個程式都含有相同的基礎概念: 1:獲取一個執行環境(execution environment) 2:載入或者建立資料集 3:在資料集上使用運算元 4:指定計算結果的輸出方式和位置 5:觸發程式執行(Spark中是由Action觸發,Flink需要呼叫execute觸發執行
) 第一步,獲取執行環境: StreamExecutionEnvironment是所有Flink Streaming程式的基礎(此處官方文件說的是StreamExecutionEnvironment是所有Flink程式的基礎,這塊錯誤了,對於批處理應該使用ExecutionEnvironment,你可以使用靜態方法獲取一個StreamExecutionEnvironment成員:
getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(host: String, port: Int, jarFiles: String*)

getExecutionEnvironment方法或根據你的執行環境自動建立一個ExecutionEnvironment,如果你是在IDE中執行你的程式,getExecutionEnvironment方法方法會返回一個 Local environment,如果你對程式打成Jar包之後使用命令列提交到叢集執行的話,getExecutionEnvironment方法將會返回一個Cluster environment。

第二步,載入資料集:

可以載入Text或者指定Format載入指定格式資料。

al env = StreamExecutionEnvironment.getExecutionEnvironment()

val text: DataStream[String] = env.readTextFile("file:///path/to/file")
第三步,在資料集上使用運算元:
val input: DataSet[String] = ...

val mapped = input.map { x => x.toInt }


第四步,輸出:

writeAsText(path: String)

print()


第五步,觸發作業執行: 呼叫execute()觸發作業執行

 Lazy Evaluation

所有的Flink程式都是延遲執行的,當程式的main方法被執行時候,所有載入資料和transformations 運算元都沒有開始執行,而是每一個operation 將會建立並且新增到程式Plan中(類似於Spark的Stage)。只有當你明確的呼叫execute()方法時候,程式才會真正執行!

延遲載入好處:你可以開發複雜的程式,但是Flink可以可以將複雜的程式轉成一個Plan,將Plan作為一個整體單元執行!

Specifying Keys

有些transformations (join, coGroup, keyBy, groupBy) 需要資料集有key,其他的一些transformations (Reduce, GroupReduce, Aggregate, Windows) 可以根據key進行分組:

A DataSet is grouped as

DataSet<...> input = // [...]
DataSet<...> reduced = input
  .groupBy(/*define key here*/)
  .reduceGroup(/*do something*/);
while a key can be specified on a DataStream using

DataStream<...> input = // [...]
DataStream<...> windowed = input
  .keyBy(/*define key here*/)
  .window(/*window specification*/);


Flink的資料模型不是基於Key/Value的,因此你不必特意將資料集組成Key/Value形式,Flink中的Key是根據實際資料由函式定義的!

如下討論我們將基於DataStream API 和keyBy,對於DataSet API使用的是DataSet 和 groupBy.

為元組定義Key:

1:單個屬性為Key

val input: DataStream[(Int, String, Long)] = // [...]
val keyed = input.keyBy(0)//元組第一個屬性被定義為Key

這樣的話,元組的分組可以根據第一個屬性進行分組!

2:多個屬性定義組合Key

val input: DataSet[(Int, String, Long)] = // [...]
val grouped = input.groupBy(0,1)

這樣的話,元組的分組可以根據第一個、第二個屬性進行分組!(比Spark運算元強大)

使用欄位表示式定義Key:

可以使用String型別的欄位為 grouping, sorting, joining, or coGrouping運算元定義Key ,例如:

我們有一個wc的POJO,它有兩個欄位:word、count。為了根據word欄位進行分許,我們可以傳遞名字給keyBy定義Key:

// some ordinary POJO (Plain old Java Object)
class WC(var word: String, var count: Int) {
  def this() { this("", 0L) }
}
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)

// or, as a case class, which is less typing
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)
欄位表示式語法:

Field Expression Syntax:

  • Select POJO fields by their field name. For example "user" refers to the “user” field of a POJO type.

  • Select Tuple fields by their 1-offset field name or 0-offset field index. For example "_1" and "5" refer to the first and sixth field of a Scala Tuple type, respectively.

  • You can select nested fields in POJOs and Tuples. For example "user.zip" refers to the “zip” field of a POJO which is stored in the “user” field of a POJO type. Arbitrary nesting and mixing of POJOs and Tuples is supported such as "_2.user.zip" or "user._4.1.zip".

  • You can select the full type using the "_" wildcard expressions. This does also work for types which are not Tuple or POJO types.


Field Expression Example:

class WC(var complex: ComplexNestedClass, var count: Int) {
  def this() { this(null, 0) }
}

class ComplexNestedClass(
    var someNumber: Int,
    someFloat: Float,
    word: (Long, Long, String),
    hadoopCitizen: IntWritable) {
  def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
}

These are valid field expressions for the example code above:

  • "count": The count field in the WC class.

  • "complex": Recursively selects all fields of the field complex of POJO type ComplexNestedClass.

  • "complex.word._3": Selects the last field of the nested Tuple3.

  • "complex.hadoopCitizen": Selects the Hadoop IntWritable type.


Define keys using Key Selector Functions(使用Key Selector函式)

// some ordinary case class
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val keyed = words.keyBy( _.word )


Specifying Transformation Functions(為Transformation指定函式)


Lambda Functions


val data: DataSet[String] = // [...]
data.filter { _.startsWith("http://") }
val data: DataSet[Int] = // [...]
data.reduce { (i1,i2) => i1 + i2 }
// or
data.reduce { _ + _ }

Rich functions


data.map { x => x.toInt }

you can write

class MyMapFunction extends RichMapFunction[String, Int] {
  def map(in: String):Int = { in.toInt }
})

and pass the function to a map transformation:

data.map(new MyMapFunction())

Rich functions can also be defined as an anonymous class:

data.map (new RichMapFunction[String, Int] {
  def map(in: String):Int = { in.toInt }
})

Supported Data Types

參官方文件

Accumulators & Counters(加速器&計數器)

Accumulators are simple constructs with an add operation and a final accumulated result, which is available after the job ended.

The most straightforward accumulator is a counter: You can increment it using the Accumulator.add(V value) method. At the end of the job Flink will sum up (merge) all partial results and send the result to the client. Accumulators are useful during debugging or if you quickly want to find out more about your data.


這個和Spark中的加速器類似,Accumulator是父介面,Counter是其一個簡單實現!

Flink目前內建的加速器有如下這些,這些加速器都實現了ccumulator介面。

  • Histogram: A histogram(直方圖) implementation for a discrete(離散值) number of bins. Internally it is just a map from Integer to Integer. You can use this to compute distributions of values, e.g. the distribution of words-per-line for a word count program.(可以用來分散式統計每一行單詞數)
如何使用加速器:
1:建立加速器
private IntCounter numLines = new IntCounter();
2:註冊加速器
getRuntimeContext().addAccumulator("num-lines", this.numLines);
3:執行add操作
this.numLines.add(1);
4:獲取結果,最後使用由execute方法返回的JobExecutionResult 物件獲取結果:  val myJobExecutionResult= env.execute()
myJobExecutionResult.getAccumulatorResult("num-lines")


最後附上一張Flink的結構圖:


Flink程式執行流程: