Flink學習記錄(一)
基本上來自官網的翻譯
flink啟動後,會啟動一個 JobManger 和一個或多個的 TaskManager。由 Client 提交任務給 JobManager,JobManager 再排程任務到各個 TaskManager 去執行,然後 TaskManager 將心跳和統計資訊彙報給 JobManager。TaskManager 之間以流的形式進行資料的傳輸。
取決於資料來源的型別,靜態或非靜態的資料來源,可以使用DataSet API做批處理操作或DataStream API做流處理操作
每一個程式包含一些同樣的組成:1.執行環境 2.load/create 初始資料 3.指定資料的轉換方式 4. 指定計算結果存放位置 5. 觸發程式執行
DataSet API在包org.apache.flink.api.scala.中
DataStream API在org.apache.flink.streaming.api.scala.中
通過以下方式之一建立執行環境
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
getExecutionEnvironment()會根據程式執行的狀態自動選擇建立合適的上下文,比如在IDE中除錯將會建立local environment,如果通過命令呼叫Jar包方式,flink cluseter manager對執行jar包的main方法,呼叫getExecutionEnvironment()並且返回叢集環境的執行環境。
執行環境有幾個方法一個讀取檔案
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val text: DataStream[String] = env.readTextFile(“file:///path/to/file”)
execute()方法返回JobExecutionResult,包好執行的次數和累計結果,
整個flink程式都是lazy的,只有當顯示的呼叫execute()時才會真正的執行
指定Keys
一些轉換(join, coGroup, keyBy, groupBy)需要給集合定義一個key,其他轉換(Reduce, GroupReduce, Aggregate, Windows)可以在處理之前根據key來分組
使用欄位表達數定義keys(POJO和Tuple型別)
case class WC(word: String, count: Int)
val words: DataStream[WC] = // […]
val wordCounts = words.keyBy(“word”).window(/window specification/)
也可以選擇POJO和Tuple巢狀的欄位,例如
class WC(var complex: ComplexNestedClass, var count: Int) {
def this() { this(null, 0) }
}
class ComplexNestedClass(
var someNumber: Int,
someFloat: Float,
word: (Long, Long, String),
hadoopCitizen: IntWritable) {
def this() { this(0, 0, (0, 0, “”), new IntWritable(0)) }
}
選擇word的第一個欄位作為key,可以用complex.word._1
使用key selector定義key
case class WC(word: String, count: Int)
val words: DataStream[WC] = // […]
val keyed = words.keyBy( _.word )
指定轉換函式
lamabda函式
val data: DataSet[String] = // […]
data.filter { _.startsWith(“http://”) }
val data: DataSet[Int] = // […]
data.reduce { (i1,i2) => i1 + i2 }
// or
data.reduce { _ + _ }
Rich functions
data.map { x => x.toInt }
可以改寫成
class MyMapFunction extends RichMapFunction[String, Int] {
def map(in: String):Int = { in.toInt }
})
data.map(new MyMapFunction())
提供Rich functions是為了可以使用使用者自定義函式(map, reduce, etc)和四個方法(open, close, getRuntimeContext, and setRuntimeContext)。這對於引數化方法是很有用的,creating and finalizing local state,訪問廣播變數,訪問執行時資訊如accumulators and counters和information on iterations
支援的資料型別
六種不同的資料型別:
- Java Tuples and Scala Case Classes
- Java POJOs
- Primitive Types
- Regular Classes
- Values
- Hadoop Writables
- Special Types
Tuples and Case Classes
case class WordCount(word: String, count: Int)
val input = env.fromElements(
WordCount(“hello”, 1),
WordCount(“world”, 2)) // Case Class Data Set
input.keyBy(“word”)// key by field expression “word”
val input2 = env.fromElements((“hello”, 1), (“world”, 2)) // Tuple2 Data Set
input2.keyBy(0, 1) // key by field positions 0 and 1
POJOs
class WordWithCount(var word: String, var count: Int) {
def this() {
this(null, -1)
}
}
val input = env.fromElements(
new WordWithCount(“hello”, 1),
new WordWithCount(“world”, 2)) // Case Class Data Set
input.keyBy(“word”)// key by field expression “word”
Primitive Types
例如Integer, String, and Double.
General Class Types
所有沒有被定義為POJO型別的類都被作為通用型別(可序列化),flink把它看作黑盒,並且不能訪問其中的內容。使用Kryo序列化
Values
Hadoop Writables
可以使用實現 org.apache.hadoop.Writable的型別,序列化的邏輯在write()和readFields() 方法中
Special Types
可以使用Scala的Either, Option和Try。Either 對於錯誤處理或操作時需要輸出到兩種不同記錄型別的情況很有用
Accumulators 和 Counters
private IntCounter numLines = new IntCounter();
getRuntimeContext().addAccumulator(“num-lines”, this.numLines);
this.numLines.add(1);//可以在任何操作中使用,包括open()和close()
廣播變數
讓一個數據集對所有並行操作的示例可見
註冊withBroadcastSet(DataSet, String)
訪問getRuntimeContext().getBroadcastVariable(String)
// 1. The DataSet to be broadcasted
val toBroadcast = env.fromElements(1, 2, 3)
val data = env.fromElements(“a”, “b”)
data.map(new RichMapFunctionString, String {
var broadcastSet: Traversable[String] = null
override def open(config: Configuration): Unit = {
// 3. Access the broadcasted DataSet as a Collection
broadcastSet = getRuntimeContext().getBroadcastVariable[String]("broadcastSetName").asScala
}
def map(in: String): String = {
...
}}).withBroadcastSet(toBroadcast, "broadcastSetName") // 2. Broadcast the DataSet
分散式快取
flink執行時會自動拷貝檔案到所有worker節點的本地檔案系統
val env = ExecutionEnvironment.getExecutionEnvironment
// register a file from HDFS
env.registerCachedFile(“hdfs:///path/to/your/file”, “hdfsFile”)
// register a local executable file (script, executable, …)
env.registerCachedFile(“file:///path/to/exec/file”, “localExecFile”, true)
// define your program and execute
…
val input: DataSet[String] = …
val result: DataSet[Integer] = input.map(new MyMapper())
…
env.execute()// extend a RichFunction to have access to the RuntimeContext
class MyMapper extends RichMapFunction[String, Int] {
override def open(config: Configuration): Unit = {
// access cached file via RuntimeContext and DistributedCache
val myFile: File = getRuntimeContext.getDistributedCache.getFile(“hdfsFile”)
// read the file (or navigate the directory)
…
}
override def map(value: String): Int = {
// use content of cached file
…
}
}