1. 程式人生 > 實用技巧 >Flink例項(六十六): Flink的任務鏈實操(二)

Flink例項(六十六): Flink的任務鏈實操(二)

Operator Chains(操作鏈)

  • Flink出於分散式執行的目的,將operator的subtask連結在一起形成task(類似spark中的管道)。

  • 每個task在一個執行緒中執行。

  • 將operators連結成task是非常有效的優化:它可以減少執行緒與執行緒間的切換和資料緩衝的開銷,並在降低延遲的同時提高整體吞吐量。

  • 連結的行為可以在程式設計API中進行指定,詳情請見程式碼OperatorChainTest。

  • 開啟操作鏈 和 禁用操作鏈的對比圖(預設開啟):

  • Flink預設會將多個operator進行串聯,形成任務鏈(task chain)

  • 注意: task chain 可以理解為就是 operator chain 只是不同場景下,稱呼不同。

  • 我們也可以禁用任務鏈,讓每個operator形成一個task。

  • StreamExecutionEnvironment.disableOperatorChaining() 這個方法會禁用整條工作鏈

  • 操作鏈其實就是類似spark的pipeline管道模式,一個task可以執行同一個窄依賴中的運算元操作。

  • 我們也可以細粒度的控制工作鏈的形成,比如呼叫dataStreamSource.map(...).startNewChain(),但不能使用dataStreamSource.startNewChain()

  • dataStreamSource.filter(...).map(...).startNewChain().map(...),需要注意的是,當這樣寫時相當於source和filter組成一條鏈,兩個map組成一條鏈。

  • 即在filter和map之間斷開,各自形成單獨的鏈。

  • 程式碼:

package com.ronnie.flink.stream.test;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * 開啟與禁用工作鏈時,輸出的結果不一樣。 * 當開啟工作鏈時(預設啟動),operator map1與map2 組成一個task. * 此時task執行時,對於hello,flink 這兩條資料是: * 先列印 hello ---- 1 , hello->1 ---- 2 * 後列印 flink ---- 1 , flink->1 ---- 2 * 當禁用工作鏈時,operator map1與map2 分別在兩個task中執行 * 此時task執行時,對於hello,flink 這兩條資料是: * 先列印 hello ---- 1 , flink ---- 1 * 後列印 hello->1 ---- 2 , flink->1 ---- 2 * * 注:操作鏈類似spark的管道,一個task執行多個的運算元. */ public class OperatorChainTest { public static final String[] WORDS = new String[] { "hello", "flink", "spark", "hbase" }; public static void main(String[] args) { // 設定執行環境, 類似spark中初始化sparkContext一樣 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 關閉操作鏈.. env.disableOperatorChaining(); DataStreamSource<String> dataStreamSource = env.fromElements(WORDS); SingleOutputStreamOperator<String> pairStream = dataStreamSource.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { System.err.println(value + " ---- 1"); return value + "->1"; } }).map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { System.err.println(value + " ---- 2"); return value + "->2"; } }); // 還可以控制更細粒度的任務鏈,比如指明從哪個operator開始形成一條新的鏈 // someStream.map(...).startNewChain(),但不能使用someStream.startNewChain()。 try { env.execute(); } catch (Exception e) { e.printStackTrace(); } } }

http://www.manongjc.com/detail/13-plgrtuapvoblful.html