Flink Distributed Cache 分散式快取
阿新 • • 發佈:2019-02-10
Flink提供了一個分散式快取,類似於hadoop,可以使使用者在並行函式中很方便的讀取本地檔案。此功能可用於共享檔案,包含靜態的外部資料,例如字典或者machine-learned迴歸模型。
此快取的工作機制如下:程式註冊一個檔案或者目錄(本地或者遠端檔案系統,例如hdfs或者s3),通過ExecutionEnvironment註冊快取檔案併為它起一個名稱。當程式執行,Flink自動將檔案或者目錄複製到所有worker節點的本地檔案系統。使用者函式可以查詢檔案或者目錄通過這個指定的名稱,然後從worker節點的本地檔案系統訪問它。
使用分散式快取 如下示例:
java程式碼:
在ExecutionEnvironment中註冊檔案或者目錄
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 從hdfs註冊一個檔案 env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile") // 註冊一個本地可執行的指令碼檔案 env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true) // 定義程式程式碼 並且執行 ... DataSet<String> input = ... DataSet<Integer> result = input.map(new MyMapper()); ... env.execute();
在使用者函式中訪問快取檔案或者目錄(這裡是一個map函式)。這個函式必須繼承RichFunction,因為它需要使用RuntimeContext讀取資料
// 繼承RichFunction 為了獲取RuntimeContext public final class MyMapper extends RichMapFunction<String, Integer> { @Override public void open(Configuration config) { // 通過RuntimeContext 和 DistributedCache訪問快取檔案 File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile"); // 讀取檔案(或者本地目錄) ... } @Override public Integer map(String value) throws Exception { // 使用快取檔案的內容做一些處理 ... } }
scala程式碼:
在ExecutionEnvironment中註冊檔案或者目錄
val env = ExecutionEnvironment.getExecutionEnvironment
// 從hdfs註冊一個檔案
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
// 註冊一個本地可執行的指令碼檔案
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
// 定義程式程式碼 並且執行
...
val input: DataSet[String] = ...
val result: DataSet[Integer] = input.map(new MyMapper())
...
env.execute()
在使用者函式中訪問快取檔案或者目錄(這裡是一個map函式)。這個函式必須繼承RichFunction,因為它需要使用RuntimeContext讀取資料
// 繼承RichFunction 為了獲取RuntimeContext
class MyMapper extends RichMapFunction[String, Int] {
override def open(config: Configuration): Unit = {
// 通過RuntimeContext 和 DistributedCache訪問快取檔案
val myFile: File = getRuntimeContext.getDistributedCache.getFile("hdfsFile")
// 讀取檔案(或者本地目錄)
...
}
override def map(value: String): Int = {
// 使用快取檔案的內容做一些處理
...
}
}
獲取更多大資料資料,視訊以及技術交流請加群:
QQ群號1:295505811(已滿)
QQ群號2:54902210
QQ群號3:555684318