1. 程式人生 > >Flink基本的API(續)

Flink基本的API(續)

上一篇介紹了編寫 Flink 程式的基本步驟,以及一些常見 API,如:map、filter、keyBy 等,重點介紹了 keyBy 方法。本篇將繼續介紹 Flink 中常用的 API,主要內容為

  • 指定 transform 函式
  • Flink 支援的資料型別
  • 累加器

指定 transform 函式

許多 transform 操作需要使用者自定義函式來實現,Flink 支援多種自定義 transform 函式,接下來一一介紹。

實現介面

/**
 * 實現 MapFunction 介面
 * 其中泛型的第一 String 代表輸入型別,第二個 Integer 代表輸出型別
 */
class MyMapFunction implements MapFunction<String, Integer> {
    @Override
    public Integer map(String value) { return Integer.parseInt(value); }
}

//使用 transform 函式
data.map(new MyMapFunction());

匿名類

data.map(new MapFunction<String, Integer> () {
  public Integer map(String value) { return Integer.parseInt(value); }
});

匿名類是 Java 語言定義的語法,與 “實現介面” 的方式一樣,只不過不需要顯示定義子類。這種方式比 “實現介面” 更常見一些。

Java 8 Lambda 表示式

data.map(s -> Integer.parseInt(s));
//或者
data.map(Integer::parseInt);

Java 8 支援 Lambda 表示式,用法與 Scala 語法很像, 寫起來簡潔,並且容易維護,推薦使用這種方式。

rich function

顧名思義,比普通的 transform 函式要更豐富,額外提供了 4 個方法:open、close、getRuntimeContext 和 setRuntimeContext。它們可以用來建立/初始化本地狀態、訪問廣播變數、訪問累加器和計數器等。感覺有點像 Hadoop 中的 Mapper 或者 Reducer 類。實現上,可以使用自定義類繼承 RichMapFunction 類的方式

/**
 * 與實現 MapFunction 介面類似,這裡是繼承了 RichMapFunction 類
 * 同時可以實現父類更多的方法
 */
class MyRichMapFunction extends RichMapFunction<String, Integer> {
    @Override
    public void open(Configuration parameters) throws Exception { super.open(parameters); }

    @Override
    public RuntimeContext getRuntimeContext() { return super.getRuntimeContext(); }

    @Override
    public void setRuntimeContext(RuntimeContext t) { super.setRuntimeContext(t); }

    @Override
    public Integer map(String value) throws Exception { return Integer.parseInt(value); }

    @Override
    public void close() throws Exception { super.close(); }
}
data.map(new MyRichMapFunction());

也可以使用匿名類的方式

data.map (new RichMapFunction<String, Integer>() {
    @Override
    public void open(Configuration parameters) throws Exception { super.open(parameters); }
    @Override
    public RuntimeContext getRuntimeContext() { return super.getRuntimeContext(); }
    @Override
    public void setRuntimeContext(RuntimeContext t) { super.setRuntimeContext(t); }
    @Override
    public Integer map(String value) { return Integer.parseInt(value); }
    @Override
    public void close() throws Exception { super.close(); }
});

如果在 rich function 中需要寫較多的業務,那麼用匿名類的方式並不簡潔,並且可讀性差。

Flink支援的資料型別

目前 Flink 支援 6 種資料型別

  • Java Tuple 和 Scala Case Class
  • Java POJO
  • 原子型別
  • 普通類
  • Values
  • Hadoop Writable 型別
  • 特殊類

Java Tuple 和 Scala Case Class

Tuple (元組)是一個混合型別,包含固定數量的屬性,並且每個屬性型別可以不同。例如:二元組有 2 個屬性,類名為 Tuple2;三元組有 3 個屬性,類名為 Tuple3,以此類推。Java 支援的元組為 Tuple1 - Tuple25。訪問屬性可以通過屬性名直接訪問,如:tuple.f4 代表 tuple 的第 5 個屬性。或者使用 tuple.getField(int position) 方法,引數 position 從 0 開始。

/**
 * Tuple2 二元組作為 DataStream 的輸入型別
 */
DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
        new Tuple2<String, Integer>("hello", 1),
        new Tuple2<String, Integer>("world", 2));
wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
    @Override
    public Integer map(Tuple2<String, Integer> value) throws Exception {
        return value.f1;
    }
});

Java POJO

POJO(Plain Ordinary Java Object) 叫做簡單的 Java 物件。滿足以下條件的 Java 或 Scala 類會被 Flink 看做 POJO 型別

  • 類必須是 public
  • 必須有一個 public 修飾的無參構造方法(預設構造器)
  • 所有屬性必須是 public 修飾或者通過 getter 和 setter 方法可以訪問到
  • 屬性型別必須也是 Flink 支援的,Flink 使用 avro 對其序列化

POJO 型別更易使用,且 Flink 更高效地處理 POJO 型別的資料。

public class WordWithCount {

    public String word;
    public int count;

    public WordWithCount() {}

    public WordWithCount(String word, int count) {
        this.word = word;
        this.count = count;
    }
}

DataStream<WordWithCount> wordCounts = env.fromElements(
    new WordWithCount("hello", 1),
    new WordWithCount("world", 2));

原子型別

Flink 支援 Java 和 Scala 中所有的原子型別,例如: Integer、String 和 Double 等。

普通類 

不是 POJO 型別的類都會被 Flink 看做是普通的類型別。Flink 將它們視為黑盒且不會訪問它們的內容,普通類型別使用 Kryo 進行序列化與反序列化。這裡是第二次提到序列化與反序列化,簡單解釋下這個概念。因為在分散式計算的系統中,不可避免要在不同機器之間傳輸資料,因此為了高效傳輸資料且在不同語言之間互相轉化,需要通過某種協議(protobuf、kryo、avro、json)將物件轉化成另外一種形式(序列化),其他機器接到序列化的資料後再轉化成之前的物件(反序列化)就可以正常使用了。

Values

不同於一般的序列化框架,Values 型別通過實現 org.apache.flinktypes.Value 接口裡的 write 和 read 方法,實現自己的序列化和反序列化邏輯。當一般的序列化框架不夠高效的時候,可以使用 Values 型別。例如:對於一個用陣列儲存的稀疏向量。由於陣列大多數元素為 0 ,可以僅對非 0 元素進行特殊編碼,而一般的序列化框架會對所有元素進行序列化操作。

Flink 已經預定義了幾種 Value 型別與基本資料型別相對應。如:ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue, BooleanValue。這些 Value 型別可以看做是基本資料型別的變體,他們的值是可變的,允許程式重複利用物件,減輕 GC 的壓力。例如:Java 基本資料型別 String 是不可變的,但是 Flink 的 StringValue 型別是可變的。

Flink 定義的 Value 型別與 Hadoop Writable 型別相似,本質都是通過改進基本資料型別的缺點,提供系統整體效能。

Hadoop Writable

Hadoop Writable 型別也是手動實現了比較高效的序列化與反序列化的邏輯。Value 型別實現了 org.apache.finktypes.Value 介面,而 Hadoop Writable 型別實現了 org.apache.hadoop.Writable 介面,該介面定義了 write 和 readFields 方法用來手動實現序列化與反序列化邏輯。

特殊型別

特殊型別包括 Scala 中的 Either, Option, and Try 型別,以及 Java API 中的 Either 型別。

累加器 

累加器可以通過 add 操作,對程式中的某些狀態或者操作進行計數,job 結束後會返回計數的結果。累加器可以用來除錯或者記錄資訊。

可以自定義累加器,需要實現 Accumulator 介面,當然 Flink 提供了兩種內建的累加器

  • IntCounter, LongCounter 和 DoubleCounter
  • Histogram:統計分佈

使用累加器的步驟如下:

在 transform 函式中定義累加器物件

private IntCounter numLines = new IntCounter();

註冊累加器物件,可以在 rich function 的 open 方法進行

getRuntimeContext().addAccumulator("num-lines", this.numLines);

在任何需要統計的地方使用累加器

this.numLines.add(1);

獲取累加器結果

myJobExecutionResult.getAccumulatorResult("num-lines")

Job 結束後,累加器的最終值儲存在 JobExecutionResult 物件中,可以通過 execute 方法返回值來獲取 JobExecutionResult 物件。但是對於批處理無法使用呼叫這個方法(官網沒有提到),可以通過 env.getLastJobExecutionResult 方法獲取。下面是使用累加器的完整示例

public static void main(String[] args) throws Exception {
        // set up the batch execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> data =  env.readTextFile("你的輸入路徑");
        //使用 rich function transform 函式
        DataSet<Integer> dataSet = data.map(new MyRichMapFunction());

        // 執行程式
        dataSet.collect();
        // 獲得 job 的結果
        JobExecutionResult jobExecutionResult = env.getLastJobExecutionResult();
        int res = jobExecutionResult.getAccumulatorResult("num-lines");
        // 輸出累機器的值
        System.out.println(res);
}

// 自定義 rich function
/**
 * 與實現 MapFunction 介面類似,這裡是繼承了 RichMapFunction 類
 * 同時可以實現父類更多的方法
 */
class MyRichMapFunction extends RichMapFunction<String, Integer> {
    /**
     * 定義累加器
     */
    private IntCounter numLines = new IntCounter();

    @Override
    public void open(Configuration parameters) throws Exception {
        // 註冊累加器
        getRuntimeContext().addAccumulator("num-lines", this.numLines);
    }

    @Override
    public Integer map(String value) throws Exception {
        // 累加器自增,記錄處理的行數
        this.numLines.add(1);
        return Integer.parseInt(value);
    }
}

 

總結

Flink 基本 API 的使用介紹完了,本篇主要介紹了自定義的 transform 函式、Flink 支援的資料型別和累加器。後續會詳細介紹 Flink 的原理、機制以及程式設計模型。

&n