1. 程式人生 > >Flink分散式快取Distributed Cache應用案例實戰-Flink牛刀小試

Flink分散式快取Distributed Cache應用案例實戰-Flink牛刀小試

版權宣告:本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。版權宣告:禁止轉載,歡迎學習。QQ郵箱地址:[email protected],如有任何問題,可隨時聯絡。

1 分散式快取

  • Flink提供了一個分散式快取,類似於hadoop,可以使使用者在並行函式中很方便的讀取本地檔案,並把它放在taskmanager節點中,防止task重複拉取。
  • 此快取的工作機制如下:程式註冊一個檔案或者目錄(本地或者遠端檔案系統,例如hdfs或者s3),通過ExecutionEnvironment註冊快取檔案併為它起一個名稱。當程式執行,Flink自動將檔案或者目錄複製到所有taskmanager節點的本地檔案系統,僅會執行一次。使用者可以通過這個指定的名稱查詢檔案或者目錄,然後從taskmanager節點的本地檔案系統訪問它

2 使用技巧

  • 1:註冊一個檔案

      env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")  
    複製程式碼
  • 2:訪問資料

      File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
    複製程式碼

3 應用案例實戰

3.1 在D盤建立一個檔案discache.txt,並進行registerCachedFile

3.2 每一個TaskManager都會存在一份,防止MapTask重複拉取檔案。

public class BatchDemoDisCache {

    public static void main(String[] args) throws Exception{

        //獲取執行環境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //1:註冊一個檔案,可以使用hdfs或者s3上的檔案
        env.registerCachedFile("d:\\discache.txt","a.txt");

        DataSource<String> data = env.fromElements("a", "b", "c", "d");

        DataSet<String> result = data.map(new RichMapFunction<String, String>() {
            private ArrayList<String> dataList = new ArrayList<String>();

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //2:使用檔案
                File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt");
                List<String> lines = FileUtils.readLines(myFile);
                for (String line : lines) {
                    this.dataList.add(line);
                    System.out.println("discache:" + line);
                }
            }
            
            @Override
            public String map(String value) throws Exception {
                //在這裡就可以使用dataList
                return value;
            }
        });
        result.print();
    }
}
複製程式碼

3.3 結果展示

discache:flink
discache:spark
discache:hadoop
discache:kylin
a
b
c
d
複製程式碼

4 總結收尾

短文奉上,主題明確。辛苦成文,各自珍惜,謝謝!

秦凱新 於深圳 201811251732