在使用Flink廣播變數broadcast時遇到的坑
阿新 • • 發佈:2018-11-25
在使用Flink廣播變數遇到的坑
如下程式碼中需要特別注意:
(1)需要手動匯入org.apache.flink.api.scala._
(2)需要手動匯入scala.collection.JavaConverters._
【如果不手動匯入該包,導致asScala使用隱式轉換失敗】
package testbrocast
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache. flink.api.scala._
import org.apache.flink.configuration.Configuration
import scala.collection.JavaConverters._ //asScala需要使用隱式轉換
/**
* @description: ${description}
* @author: fangchangtan
* @create: 2018-11-23 19:31
**/
object BroadCastTest {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataset1 = env.fromElements("11", "22", "33")
val dataset2 = env.fromElements("aa", "bb", "cc")
dataset1.map(new RichMapFunction[String, (String, String)] {
private var dataset2: Traversable[String] = null
override def open( parameters: Configuration) {
//import scala.collection.JavaConverters._ //asScala需要使用隱式轉換,切記!!
dataset2 = getRuntimeContext.getBroadcastVariable[String]("broadCast").asScala
}
def map(t: String): (String, String) = {
var result = ""
for (broadVariable <- dataset2) {
result = result + broadVariable + " "
}
(t, result)
}
}).withBroadcastSet(dataset2, "broadCast").print()
}
}
最終輸出結果:
Broadcast 廣播變數:可以理解為是一個公共的共享變數,我們可以把一個dataset 或者不變的快取物件(例如map list集合物件等)資料集廣播出去,然後不同的任務在節點上都能夠獲取到,並在每個節點上只會存在一份,而不是在每個併發執行緒中存在。如果不使用broadcast,則在每個節點中的每個任務中都需要拷貝一份dataset資料集,比較浪費記憶體(也就是一個節點中可能會存在多份dataset資料)。
因此在廣播小資料量的dataset 和或者不大的不可變快取物件的時候,特別適合使用Broadcast 廣播變數