1. 程式人生 > 實用技巧 >Flink例項(十五):Flink的分散式快取

Flink例項(十五):Flink的分散式快取

分散式快取

  Flink提供了一個分散式快取,類似於hadoop,可以使使用者在並行函式中很方便的讀取本地檔案,並把它放在taskmanager節點中,防止task重複拉取。
  此快取的工作機制如下:程式註冊一個檔案或者目錄(本地或者遠端檔案系統,例如hdfs或者s3),通過ExecutionEnvironment註冊快取檔案併為它起一個名稱。
  當程式執行,Flink自動將檔案或者目錄複製到所有taskmanager節點的本地檔案系統,僅會執行一次。使用者可以通過這個指定的名稱查詢檔案或者目錄,然後從taskmanager節點的本地檔案系統訪問它。

示例

在ExecutionEnvironment中註冊一個檔案:

//獲取執行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //1:註冊一個檔案,可以使用hdfs上的檔案 也可以是本地檔案進行測試 env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text","a.txt"); //在使用者函式中訪問快取檔案或者目錄(這裡是一個map函式)。這個函式必須繼承RichFunction,因為它需要使用RuntimeContext讀取資料: DataSet<String> result = data.map(new
RichMapFunction<String, String>() { private ArrayList<String> dataList = new ArrayList<String>(); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //2:使用檔案 File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt"); List
<String> lines = FileUtils.readLines(myFile); for (String line : lines) { this.dataList.add(line); System.err.println("分散式快取為:" + line); } } @Override public String map(String value) throws Exception { //在這裡就可以使用dataList System.err.println("使用datalist:" + dataList + "------------" +value); //業務邏輯 return dataList +":" + value; } }); result.printToErr(); }

完整程式碼如下,仔細看註釋:

public class DisCacheTest {

    public static void main(String[] args) throws Exception{

        //獲取執行環境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //1:註冊一個檔案,可以使用hdfs上的檔案 也可以是本地檔案進行測試
      //text 中有4個單詞:hello flink hello FLINK env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text","a.txt");

        DataSource<String> data = env.fromElements("a", "b", "c", "d");

        DataSet<String> result = data.map(new RichMapFunction<String, String>() {
            private ArrayList<String> dataList = new ArrayList<String>();

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //2:使用檔案
                File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt");
                List<String> lines = FileUtils.readLines(myFile);
                for (String line : lines) {
                    this.dataList.add(line);
                    System.err.println("分散式快取為:" + line);
                }
            }

            @Override
            public String map(String value) throws Exception {
                //在這裡就可以使用dataList
                System.err.println("使用datalist:" + dataList + "------------" +value);
                //業務邏輯
                return dataList +":" +  value;
            }
        });

        result.printToErr();
    }
}//

輸出結果如下:

[hello, flink, hello, FLINK]:a
[hello, flink, hello, FLINK]:b
[hello, flink, hello, FLINK]:c
[hello, flink, hello, FLINK]:d