Flink Basic API Concepts 學習筆記&譯文
地址:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/api_concepts.html#
Flink也是常規的分散式程式,在分散式資料集上實現了transformation運算元,例如: filtering,
mapping, updating state, joining, grouping, defining windows, aggregating。資料集初始化一般是讀取檔案、Kafka、或者記憶體資料等,結果一般可以寫到檔案或者終端列印。Flink支援Standalone或者執行在其他框架之上,不同的執行模式下Flink執行在不同的Context中,Flink程式可以執行在本地或者是叢集中!
根據資料來源的型別,Flink可以開發批處理程式或者Streaming程式,批處理程式使用DataSet API,Streaming程式使用
Note:正如前面的example,Streaming程式使用的是StreamingExecutionEnvironment
和DataStream
API,批處理使用的是ExecutionEnvironment
和DataSet
API。
Flink使用DataSet
和DataStream
在程式中代表資料,你可以認為他們是可以包含重複資料的不可變的資料集合。DataSet是有限的資料,DataStream是無邊界的資料!
資料集和Java collection區別:首先,資料集一旦建立了就無法再add或者remove元素了,同時也無法訪問裡面的元素(應該指的是不可以想Java集合一樣get某一個元素)!
Flink程式一旦建立了資料集,可以根據現有的資料集使用運算元生成新的資料集!
Flink程式也是一個比較有規律的程式,就是在資料集上執行運算元。每一個程式都含有相同的基礎概念: 1:獲取一個執行環境(
execution
environment
)
2:載入或者建立資料集
3:在資料集上使用運算元
4:指定計算結果的輸出方式和位置
5:觸發程式執行(Spark中是由Action觸發,Flink需要呼叫execute觸發執行getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
getExecutionEnvironment方法或根據你的執行環境自動建立一個ExecutionEnvironment,如果你是在IDE中執行你的程式,getExecutionEnvironment方法方法會返回一個 Local environment,如果你對程式打成Jar包之後使用命令列提交到叢集執行的話,getExecutionEnvironment方法將會返回一個Cluster environment。
第二步,載入資料集:
可以載入Text或者指定Format載入指定格式資料。
al env = StreamExecutionEnvironment.getExecutionEnvironment()
val text: DataStream[String] = env.readTextFile("file:///path/to/file")
第三步,在資料集上使用運算元:
val input: DataSet[String] = ...
val mapped = input.map { x => x.toInt }
第四步,輸出:
writeAsText(path: String)
print()
第五步,觸發作業執行:
呼叫execute()觸發作業執行
所有的Flink程式都是延遲執行的,當程式的main方法被執行時候,所有載入資料和transformations 運算元都沒有開始執行,而是每一個operation 將會建立並且新增到程式Plan中(類似於Spark的Stage)。只有當你明確的呼叫execute()方法時候,程式才會真正執行!
延遲載入好處:你可以開發複雜的程式,但是Flink可以可以將複雜的程式轉成一個Plan,將Plan作為一個整體單元執行!
Specifying Keys有些transformations (join, coGroup, keyBy, groupBy) 需要資料集有key,其他的一些transformations (Reduce, GroupReduce, Aggregate, Windows) 可以根據key進行分組:
A DataSet is grouped as
DataSet<...> input = // [...]
DataSet<...> reduced = input
.groupBy(/*define key here*/)
.reduceGroup(/*do something*/);
while a key can be specified on a DataStream using
DataStream<...> input = // [...]
DataStream<...> windowed = input
.keyBy(/*define key here*/)
.window(/*window specification*/);
Flink的資料模型不是基於Key/Value的,因此你不必特意將資料集組成Key/Value形式,Flink中的Key是根據實際資料由函式定義的!
如下討論我們將基於
DataStream
API
和keyBy,對於DataSet API使用的是
DataSet
和 groupBy
.
為元組定義Key:
1:單個屬性為Key
val input: DataStream[(Int, String, Long)] = // [...]
val keyed = input.keyBy(0)//元組第一個屬性被定義為Key
這樣的話,元組的分組可以根據第一個屬性進行分組!
2:多個屬性定義組合Key
val input: DataSet[(Int, String, Long)] = // [...]
val grouped = input.groupBy(0,1)
這樣的話,元組的分組可以根據第一個、第二個屬性進行分組!(比Spark運算元強大)
使用欄位表示式定義Key:
可以使用String型別的欄位為 grouping, sorting, joining, or coGrouping運算元定義Key
,例如:
我們有一個wc的POJO,它有兩個欄位:word、count。為了根據word欄位進行分許,我們可以傳遞名字給keyBy定義Key:
// some ordinary POJO (Plain old Java Object)
class WC(var word: String, var count: Int) {
def this() { this("", 0L) }
}
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)
// or, as a case class, which is less typing
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)
欄位表示式語法:
Field Expression Syntax:
-
Select POJO fields by their field name. For example
"user"
refers to the “user” field of a POJO type. -
Select Tuple fields by their 1-offset field name or 0-offset field index. For example
"_1"
and"5"
refer to the first and sixth field of a Scala Tuple type, respectively. -
You can select nested fields in POJOs and Tuples. For example
"user.zip"
refers to the “zip” field of a POJO which is stored in the “user” field of a POJO type. Arbitrary nesting and mixing of POJOs and Tuples is supported such as"_2.user.zip"
or"user._4.1.zip"
. -
You can select the full type using the
"_"
wildcard expressions. This does also work for types which are not Tuple or POJO types.
Field Expression Example:
class WC(var complex: ComplexNestedClass, var count: Int) {
def this() { this(null, 0) }
}
class ComplexNestedClass(
var someNumber: Int,
someFloat: Float,
word: (Long, Long, String),
hadoopCitizen: IntWritable) {
def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
}
These are valid field expressions for the example code above:
-
"count"
: The count field in theWC
class. -
"complex"
: Recursively selects all fields of the field complex of POJO typeComplexNestedClass
. -
"complex.word._3"
: Selects the last field of the nestedTuple3
. -
"complex.hadoopCitizen"
: Selects the HadoopIntWritable
type.
Define keys using Key Selector Functions(使用Key Selector函式)
// some ordinary case class
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val keyed = words.keyBy( _.word )
Specifying Transformation Functions(為Transformation指定函式)
Lambda Functions
val data: DataSet[String] = // [...]
data.filter { _.startsWith("http://") }
val data: DataSet[Int] = // [...]
data.reduce { (i1,i2) => i1 + i2 }
// or
data.reduce { _ + _ }
Rich functions
data.map { x => x.toInt }
you can write
class MyMapFunction extends RichMapFunction[String, Int] {
def map(in: String):Int = { in.toInt }
})
and pass the function to a map
transformation:
data.map(new MyMapFunction())
Rich functions can also be defined as an anonymous class:
data.map (new RichMapFunction[String, Int] {
def map(in: String):Int = { in.toInt }
})
Supported Data Types
參官方文件Accumulators & Counters(加速器&計數器)
Accumulators are simple constructs with an add operation and a final accumulated result, which is available after the job ended.
The most straightforward accumulator is a counter: You can increment it using the Accumulator.add(V value)
method.
At the end of the job Flink will sum up (merge) all partial results and send the result to the client. Accumulators are useful during debugging or if you quickly want to find out more about your data.
這個和Spark中的加速器類似,Accumulator是父介面,Counter是其一個簡單實現!
Flink目前內建的加速器有如下這些,這些加速器都實現了ccumulator介面。
- Histogram: A histogram(直方圖) implementation for a discrete(離散值) number of bins. Internally it is just a map from Integer to Integer. You can use this to compute distributions of values, e.g. the distribution of words-per-line for a word count program.(可以用來分散式統計每一行單詞數)
1:建立加速器
private IntCounter numLines = new IntCounter();
2:註冊加速器
getRuntimeContext().addAccumulator("num-lines", this.numLines);
3:執行add操作
this.numLines.add(1);
4:獲取結果,最後使用由execute方法返回的JobExecutionResult
物件獲取結果:
val myJobExecutionResult
=
env.execute()
myJobExecutionResult.getAccumulatorResult("num-lines")
最後附上一張Flink的結構圖:
Flink程式執行流程: