Flink DataSet API 之 Broadcast(廣播變數)
基本介紹
1、廣播變數允許程式設計人員在每臺機器上保持1個只讀的快取變數,而不是傳送變數的副本給tasks
2、廣播變數建立後,它可以執行在叢集中的任何function上,而不需要多次傳遞給叢集節點。另外需要記住,不應該修改廣播變數,這樣才能確保每個節點獲取到的值都是一致的。可以理解為是一個公共的共享變數,我們可以把一個dataset 資料集廣播出去,然後不同的task在節點上都能夠獲取到,這個資料在每個節點上只會存在一份。如果不使用broadcast,則在每個節點中的每個task中都需要拷貝一份dataset資料集,比較浪費記憶體(也就是一個節點中可能會存在多份dataset資料)。
3、與DataStreaming 中的Broadcast區別開來,DataStreaming 中的Broadcast是把元素廣播給所有的分割槽,資料會被重複處理,類似於storm中的allGrouping(呼叫方式 dataStream.broadcast())
用法
1:初始化資料 DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3)
2:廣播資料 withBroadcastSet(toBroadcast, "broadcastSetName");
3:獲取資料 Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
注意
1:廣播出去的變數存在於每個節點的記憶體中,所以這個資料集不能太大。因為廣播出去的資料,會常駐記憶體,除非程式執行結束
2:廣播變數在初始化廣播出去以後不支援修改,這樣才能保證每個節點的資料都是一致的。
使用Demo
import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.configuration.Configuration import scala.collection.mutable.ListBuffer object BatchDemoBroadcast { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ // 1、初始化資料 val broadData = new ListBuffer[Tuple2[String, Int]]() broadData.append(("zs", 18)) broadData.append(("ls", 20)) broadData.append(("ww", 17)) val tupleData = env.fromCollection(broadData) val toBroadcastData = tupleData.map(data => Map(data._1 -> data._2)) val text = env.fromElements("zs", "ls", "ww") text.map(new RichMapFunction[String, String] { var allMap = Map[String,Int]() override def open(parameters: Configuration): Unit = { super.open(parameters) // 3、獲取資料 val listData = getRuntimeContext.getBroadcastVariable[Map[String,Int]]("broadcastData") val it = listData.iterator() println("-------------------") while (it.hasNext) { allMap = allMap.++(it.next()) } } override def map(in: String): String = { in + "_" + allMap(in) } }).withBroadcastSet(toBroadcastData, "broadcastData")// 2、廣播資料 .print() } }