1. 程式人生 > >Flink DataSet API 之 Broadcast(廣播變數)

Flink DataSet API 之 Broadcast(廣播變數)

基本介紹

1、廣播變數允許程式設計人員在每臺機器上保持1個只讀的快取變數,而不是傳送變數的副本給tasks

2、廣播變數建立後,它可以執行在叢集中的任何function上,而不需要多次傳遞給叢集節點。另外需要記住,不應該修改廣播變數,這樣才能確保每個節點獲取到的值都是一致的。可以理解為是一個公共的共享變數,我們可以把一個dataset 資料集廣播出去,然後不同的task在節點上都能夠獲取到,這個資料在每個節點上只會存在一份。如果不使用broadcast,則在每個節點中的每個task中都需要拷貝一份dataset資料集,比較浪費記憶體(也就是一個節點中可能會存在多份dataset資料)。

3、與DataStreaming 中的Broadcast區別開來,DataStreaming 中的Broadcast是把元素廣播給所有的分割槽,資料會被重複處理,類似於storm中的allGrouping(呼叫方式 dataStream.broadcast())

用法

1:初始化資料       DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3)

2:廣播資料           withBroadcastSet(toBroadcast, "broadcastSetName");

3:獲取資料           Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");

注意

1:廣播出去的變數存在於每個節點的記憶體中,所以這個資料集不能太大。因為廣播出去的資料,會常駐記憶體,除非程式執行結束

2:廣播變數在初始化廣播出去以後不支援修改,這樣才能保證每個節點的資料都是一致的。

使用Demo

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration

import scala.collection.mutable.ListBuffer

object BatchDemoBroadcast {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    // 1、初始化資料
    val broadData = new ListBuffer[Tuple2[String, Int]]()
    broadData.append(("zs", 18))
    broadData.append(("ls", 20))
    broadData.append(("ww", 17))
    val tupleData = env.fromCollection(broadData)
    val toBroadcastData = tupleData.map(data => Map(data._1 -> data._2))

    val text = env.fromElements("zs", "ls", "ww")
    text.map(new RichMapFunction[String, String] {
      var allMap  = Map[String,Int]()
      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        // 3、獲取資料
        val listData = getRuntimeContext.getBroadcastVariable[Map[String,Int]]("broadcastData")
        val it = listData.iterator()
        println("-------------------")
        while (it.hasNext) {
          allMap = allMap.++(it.next())
        }
      }

      override def map(in: String): String = {
        in + "_" + allMap(in)
      }
    }).withBroadcastSet(toBroadcastData, "broadcastData")// 2、廣播資料
      .print()
  }
}