1. 程式人生 > >Flink中的Broadcast廣播變數

Flink中的Broadcast廣播變數

在Flink中,同一個運算元可能存在若干個不同的並行例項,計算過程可能不在同一個Slot中進行,不同運算元之間更是如此,因此不同運算元的計算資料之間不能像Java陣列之間一樣互相訪問,而廣播變數Broadcast便是解決這種情況的。如下程式碼所示:

    val env = ExecutionEnvironment.getExecutionEnvironment
    val ds1 = env.fromElements("1", "2", "3", "4", "5")
    val ds2 = env.fromElements("a", "b", "c", "d", "e")

    ds1.map
{ t => (t, ds2) }.print()

執行上述程式碼會報InvalidProgramException的錯,因為在ds1map運算元中無法再去呼叫ds2,此時可以使用廣播變數將ds2這個變數進行廣播,使得ds2這一被廣播的資料集在ds1map運算元的所有並行例項中都可用,具體處理方式如下:

object BroadcastTest {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val ds1 = env.fromElements("1"
, "2", "3", "4", "5") val ds2 = env.fromElements("a", "b", "c", "d", "e") ds1.map(new RichMapFunction[String, (String, String)] { private var ds2: Traversable[String] = null override def open(parameters: Configuration) { ds2 = getRuntimeContext.getBroadcastVariable[String
]("broadCast").asScala } def map(t: String): (String, String) = { var result = "" for (broadVariable <- ds2) { result = result + broadVariable + " " } (t, result) } }).withBroadcastSet(ds2, "broadCast").print() } }

上述程式碼能夠成功執行出如下結果:

(1,a b c d e )
(2,a b c d e )
(3,a b c d e )
(4,a b c d e )
(5,a b c d e )

可以看到,ds1map運算元通過訪問廣播變數成功訪問到ds2中的資料。該過程分為兩步:設定廣播變數和獲取廣播變數。

  1. 設定廣播變數
    在某個需要用到該廣播變數的運算元後呼叫withBroadcastSet(var1, var2)進行設定,var1為需要廣播變數的變數名,var2是自定義變數名,為String型別。注意,被廣播的變數只能為DataSet型別,不能為ListIntString等型別。
  2. 獲取廣播變數
    建立該運算元對應的富函式類,例如map函式的富函式類是RichMapFunction,該類有兩個構造引數,第一個引數為運算元輸入資料型別,第二個引數為運算元輸出資料型別。首先建立一個Traversable[_]介面用於接收廣播變數並初始化為空,接收型別與運算元輸入資料型別相對應;然後重寫open函式,通過getRuntimeContext.getBroadcastVariable[_](var)獲取到廣播變數,var即為設定廣播變數時的自定義變數名,型別為Stringopen函式在運算元生命週期的初始化階段便會呼叫;最後在map方法中對獲取到的廣播變數進行訪問及其它操作。

注意:只有在某個Operator中使用到不屬於該OperatorDataSet時才需要廣播變數,在iterate內部可以將某個DataSet直接作為起始節點,不需要使用廣播變數。

相關推薦

FlinkBroadcast廣播變數

在Flink中,同一個運算元可能存在若干個不同的並行例項,計算過程可能不在同一個Slot中進行,不同運算元之間更是如此,因此不同運算元的計算資料之間不能像Java陣列之間一樣互相訪問,而廣播變數Broadcast便是解決這種情況的。如下程式碼所示:

Flink Broadcast 廣播變數應用案例實戰-Flink牛刀小試

版權宣告:本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。版權宣告:禁止轉載,歡迎學習。QQ郵箱地址:[email protected],如有任何問題,可隨時聯絡。 1.1 Broa

Flink DataSet API 之 Broadcast(廣播變數)

基本介紹 1、廣播變數允許程式設計人員在每臺機器上保持1個只讀的快取變數,而不是傳送變數的副本給tasks 2、廣播變數建立後,它可以執行在叢集中的任何function上,而不需要多次傳遞給叢集節點。另外需要記住,不應該修改廣播變數,這樣才能確保每個節點獲取到的值都是一致的。可

spark廣播變數broadcast

Spark中的Broadcast處理 首先先來看一看broadcast的使用程式碼: val values = List[Int](1,2,3) val broadcastValues = sparkContext.broadcast(values) rdd.mapPart

[Flink基礎]--Apache Flink廣播狀態實用指南

感謝英文原文作者:https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink Apache Flink中的廣播狀態實用指南 從版本1.5.0開始,Apache FlinkⓇ具

spark動態廣播變數的使用

今天來說一下spark,動態廣播變數的用法,如果對廣播變數用法不清楚的可以檢視這個部落格,在實際專案中,有時候我們的廣播變數是動態的,比如需要一分鐘更新一次,這個也是可以實現的,我們知道廣播變數是在driver端初始化,在excetors端獲取這個變數,但是不能修改,所以,我們可以在driver

Apache Flink廣播狀態實用指南

感謝英文原文作者:https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink 不過,原文最近好像不能訪問了。應該是https://www.da-platform.com/網站移除了blog板塊了。

spark streaming廣播變數應用

1. 廣播變數 我們知道spark 的廣播變數允許快取一個只讀的變數在每臺機器上面,而不是每個任務儲存一份拷貝。常見於spark在一些全域性統計的場景中應用。通過廣播變數,能夠以一種更有效率的方式將一個大資料量輸入集合的副本分配給每個節點。Spark也嘗試著利用有效的廣播演

在使用Flink廣播變數broadcast時遇到的坑

在使用Flink廣播變數遇到的坑 如下程式碼中需要特別注意: (1)需要手動匯入org.apache.flink.api.scala._ (2)需要手動匯入scala.collection.JavaConverters._ 【如果不手動匯入該包,導致asScala使用隱式轉換失敗】 pa

【雲星資料---Apache Flink實戰系列(精品版)】:Apache Flink高階特性與高階應用015-Flink廣播變數和分散式快取001

1.flink中的廣播變數 flink支援將變數廣播到worker上,以供程式運算使用。 執行程式 package code.book.batch.sinksource.scala i

spark廣播變數的使用

import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org

Android廣播Broadcast詳解

                今天來看一下Android中的廣播機制,我們知道廣播Broadcast是Android中的四大元件之一,可見他的重要性了,當然它的用途也很大的,比如一些系統的廣播:電量低、開機、鎖屏等一些操作都會發送一個廣播,具體的Android系統中的廣播可以參見我的另外一篇部落格:http

spark的累加器和廣播變數

     在spark程式中,當一個傳遞給Spark操作(例如map和reduce)的函式在遠端節點上面執行時,Spark操作實際上操作的是這個函式所用變數的一個獨立副本。這些變數會被複制到每臺機器上,並且這些變數在遠端機器上的所有更新都不會傳遞迴驅動程式。通常跨任

Spark之廣播變數Broadcast Variables與計數器Accumulators

一、廣播變數Broadcast Variables   根據官方文件,廣播變數Broadcast Variables可以使開發者在每個節點–即Executor上快取一個只讀的變數,它相對於在每個task上覆制一份這個變數具有更好的優勢。因為它能減少網路和記憶體的開銷。例如,有一個Map資料,

Flink的流廣播(Broadcast State)

  上一篇Flink的狀態管理中,我們提到了Operator state,本文介紹的廣播狀態(Broadcast State)是 Apache Flink 中支援的第三種類型的operator state。Broadcast State使得 Flink 使用者能夠以容錯、一致、可擴縮容地

Spark大師之路:廣播變數Broadcast)原始碼分析

概述 最近工作上忙死了……廣播變數這一塊其實早就看過了,一直沒有貼出來。 本文基於Spark 1.0原始碼分析,主要探討廣播變數的初始化、建立、讀取以及清除。 類關係 BroadcastManager類中包含一個BroadcastFactory物件的引用。大部分操作通過呼

numpy廣播機制

AD 並且 block .cn image RR ray and tin 廣播的引出 numpy兩個數組的相加、相減以及相乘都是對應元素之間的操作。 import numpy as np x = np.array([[2,2,3],[1,2,3]]) y

Angular$broadcast和$emit的使用方法

parent ots 子元素 fyi NPU 發生 作用 services 運用 要在控制器之間傳遞變量變化需要使用angular中的$broadcast和$emit方法來傳遞,同時使用$on來接收事件並作出響應。 broadcast譯為廣播,即上級傳遞下級。 示例代碼:

spark streaming 廣播變數的測試

最近寫的一個流式的程式需要從redis 中獲取變數資訊,並廣播,其中redis裡面的資訊是變動的,要求廣播變數也要跟著改變,下面是測試程式碼: val dStream = KafkaUtils.createDirectStream[String, String]( ssc,

Python什麼是變數

在Python中,變數的概念基本上和初中代數的方程變數是一致的。 例如,對於方程式 y=x*x ,x就是變數。當x=2時,計算結果是4,當x=5時,計算結果是25。 只是在計算機程式中,變數不僅可以是數字,還可以是任意資料型別。 在Python程式中,變數是用一個變數名錶示,變數名必須是大小寫英