1. 程式人生 > >Flink Distributed Cache 分散式快取

Flink Distributed Cache 分散式快取

Flink提供了一個分散式快取,類似於hadoop,可以使使用者在並行函式中很方便的讀取本地檔案。此功能可用於共享檔案,包含靜態的外部資料,例如字典或者machine-learned迴歸模型。

此快取的工作機制如下:程式註冊一個檔案或者目錄(本地或者遠端檔案系統,例如hdfs或者s3),通過ExecutionEnvironment註冊快取檔案併為它起一個名稱。當程式執行,Flink自動將檔案或者目錄複製到所有worker節點的本地檔案系統。使用者函式可以查詢檔案或者目錄通過這個指定的名稱,然後從worker節點的本地檔案系統訪問它。

使用分散式快取 如下示例:

java程式碼:

在ExecutionEnvironment中註冊檔案或者目錄

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// 從hdfs註冊一個檔案
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")

// 註冊一個本地可執行的指令碼檔案
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)

// 定義程式程式碼 並且執行
...
DataSet<String> input = ...
DataSet<Integer> result = input.map(new MyMapper());
...
env.execute();

在使用者函式中訪問快取檔案或者目錄(這裡是一個map函式)。這個函式必須繼承RichFunction,因為它需要使用RuntimeContext讀取資料

// 繼承RichFunction 為了獲取RuntimeContext
public final class MyMapper extends RichMapFunction<String, Integer> {

    @Override
    public void open(Configuration config) {

      // 通過RuntimeContext 和 DistributedCache訪問快取檔案
      File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
      // 讀取檔案(或者本地目錄)
      ...
    }

    @Override
    public Integer map(String value) throws Exception {
      // 使用快取檔案的內容做一些處理
      ...
    }
}

scala程式碼:

在ExecutionEnvironment中註冊檔案或者目錄

val env = ExecutionEnvironment.getExecutionEnvironment

// 從hdfs註冊一個檔案
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")

// 註冊一個本地可執行的指令碼檔案
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)

// 定義程式程式碼 並且執行
...
val input: DataSet[String] = ...
val result: DataSet[Integer] = input.map(new MyMapper())
...
env.execute()

在使用者函式中訪問快取檔案或者目錄(這裡是一個map函式)。這個函式必須繼承RichFunction,因為它需要使用RuntimeContext讀取資料

// 繼承RichFunction 為了獲取RuntimeContext
class MyMapper extends RichMapFunction[String, Int] {

  override def open(config: Configuration): Unit = {

    // 通過RuntimeContext 和 DistributedCache訪問快取檔案
    val myFile: File = getRuntimeContext.getDistributedCache.getFile("hdfsFile")
    // 讀取檔案(或者本地目錄)
    ...
  }

  override def map(value: String): Int = {
    // 使用快取檔案的內容做一些處理
    ...
  }
}

獲取更多大資料資料,視訊以及技術交流請加群:

QQ群號1:295505811(已滿)

QQ群號2:54902210

QQ群號3:555684318