Spark分散式程式設計之全域性變數專題【共享變數】
轉載自:http://www.aboutyun.com/thread-19652-1-1.html
問題導讀
1.spark共享變數的作用是什麼?
2.什麼情況下使用共享變數?
3.如何在程式中使用共享變數?
4.廣播變數原始碼包含哪些內容?
spark程式設計中,我們經常會遇到使用全域性變數,來累加或則使用全域性變數。然而對於分散式程式設計這個卻與傳統程式設計有著很大的區別。不可能在程式中宣告一個全域性變數,在分散式程式設計中就可以直接使用。因為程式碼會分發到多臺機器,導致我們認為的全域性變數失效。那麼spark,spark Streaming該如何實現全域性變數。
一般情況下,當一個傳遞給Spark操作(例如map和reduce)的函式在遠端節點上面執行時,Spark操作實際上操作的是這個函式所用變數的一個獨立副本。這些變數被複制到每臺機器上,並且這些變數在遠端機器上 的所有更新都不會傳遞迴驅動程式。通常跨任務的讀寫變數是低效的,但是,Spark還是為兩種常見的使用模式提供了兩種有限的共享變數:廣播變數(broadcast variable)和累加器(accumulator)+
1.概念
1.1 廣播變數:
廣播可以將變數傳送到閉包中,被閉包使用。但是,廣播還有一個作用是同步較大資料。比如你有一個IP庫,可能有幾G,在map操作中,依賴這個ip庫。那麼,可以通過廣播將這個ip庫傳到閉包中,被並行的任務應用。廣播通過兩個方面提高資料共享效率:
1,叢集中每個節點(物理機器)只有一個副本,預設的閉包是每個任務一個副本;
2,廣播傳輸是通過BT下載模式實現的,也就是P2P下載,在叢集多的情況下,可以極大的提高資料傳輸速率。廣播變數修改後,不會反饋到其他節點。
1.2 累加器:
累加器是僅僅被相關操作累加的變數,因此可以在並行中被有效地支援。它可以被用來實現計數器和總和。Spark原生地只支援數字型別的累加器,程式設計者可以新增新型別的支援。如果建立累加器時指定了名字,可以在Spark的UI介面看到。這有利於理解每個執行階段的程序。(對於Python還不支援)
累加器通過對一個初始化了的變數v呼叫SparkContext.accumulator(v)來建立。在叢集上執行的任務可以通過add或者”+=”方法在累加器上進行累加操作。但是,它們不能讀取它的值。只有驅動程式能夠讀取它的值,通過累加器的value方法。
2.如何使用全域性變數
2.1 Java版本:
- package com.Streaming;
- import org.apache.spark.Accumulator;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.broadcast.Broadcast;
- import org.apache.spark.streaming.Durations;
- import org.apache
- import org.apache.spark.streaming.api.java.JavaStreamingContext;
- import org.apache.spark.api.java.function.FlatMapFunction;
- import org.apache.spark.api.java.function.Function2;
- import org.apache.spark.api.java.function.PairFunction;
- import org.apache.spark.streaming.api.java.JavaDStream;
- import org.apache.spark.streaming.api.java.JavaPairDStream;
- import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
- import scala.Tuple2;
- import java.util.*;
- /**
- * 利用廣播進行黑名單過濾!
- *
- * 無論是計數器還是廣播!都不是想象的那麼簡單!
- * 聯合使用非常強大!!!絕對是高階應用!
- *
- * 如果 聯合使用擴充套件的話,該怎麼做!!!
- *
- * ?
- */
- publicclassBroadcastAccumulator{
- /**
- * 肯定要建立一個廣播List
- *
- * 在上下文中例項化!
- */
- privatestaticvolatileBroadcast<List<String>> broadcastList =null;
- /**
- * 計數器!
- * 在上下文中例項化!
- */
- privatestaticvolatileAccumulator<Integer> accumulator =null;
- publicstaticvoid main(String[] args){
- SparkConf conf =newSparkConf().setMaster("local[2]").
- setAppName("WordCountOnlieBroadcast");
- JavaStreamingContext jsc =newJavaStreamingContext(conf,Durations.seconds(5));
- /**
- * 沒有action的話,廣播並不會發出去!
- *
- * 使用broadcast廣播黑名單到每個Executor中!
- */
- broadcastList = jsc.sc().broadcast(Arrays.asList("Hadoop","Mahout","Hive"));
- /**
- * 全域性計數器!用於統計線上過濾了多少個黑名單!
- */
- accumulator = jsc.sparkContext().accumulator(0,"OnlineBlackListCounter");
- JavaReceiverInputDStream<String> lines = jsc.socketTextStream("Master",9999);
- /**
- * 這裡省去flatmap因為名單是一個個的!
- */
- JavaPairDStream<String,Integer> pairs = lines.mapToPair(newPairFunction<String,String,Integer>(){
- @Override
- publicTuple2<String,Integer> call(String word){
- returnnewTuple2<String,Integer>(word,1);
- }
- });
- JavaPairDStream<String,Integer> wordsCount = pairs.reduceByKey(newFunction2<Integer,Integer,Integer>(){
- @Override
- publicInteger call(Integer v1,Integer v2){
- return v1 + v2;
- }
- });
- /**
- * Funtion裡面 前幾個引數是 入參。
- * 後面的出參。
- * 體現在call方法裡面!
- *
- * 這裡直接基於RDD進行操作了!
- */
- wordsCount.foreach(newFunction2<JavaPairRDD<String,Integer>,Time,Void>(){
- @Override
- publicVoid call(JavaPairRDD<String,Integer> rdd,Time time)throwsException{
- rdd.filter(newFunction<Tuple2<String,Integer>,Boolean>(){
- @Override
- publicBoolean call(Tuple2<String,Integer> wordPair)throwsException{
- if(broadcastList.value().contains(wordPair._1)){
- /**
- * accumulator不應該僅僅用來計數。
- * 可以同時寫進資料庫或者redis中!
- */
- accumulator.add(wordPair._2);
- returnfalse;
- }else{
- returntrue;
- }
- };
- /**
- * 這裡真的希望 廣播和計數器執行的話。要進行一個action操作!
- */
- }).collect();
- System.out.println("廣播器裡面的值"+broadcastList.value());
- System.out.println("計時器裡面的值"+accumulator.value());
- returnnull;
- }
- });
- jsc.start();
- jsc.awaitTermination();
- jsc.close();
- }
- }
2.2 Scala版本
- package com.Streaming
- import java.util
- import org.apache.spark.streaming.{Duration,StreamingContext}
- import org.apache.spark.{Accumulable,Accumulator,SparkContext,
相關推薦
Spark分散式程式設計之全域性變數專題【共享變數】
轉載自:http://www.aboutyun.com/thread-19652-1-1.html問題導讀1.spark共享變數的作用是什麼?2.什麼情況下使用共享變數?3.如何在程式中使用共享變數?4.廣播變數原始碼包含哪些內容?spark程式設計中,我們經常會遇到使用全域性變數,來累加或則使用全域性變數
大資料之Spark(四)--- Dependency依賴,啟動模式,shuffle,RDD持久化,變數傳遞,共享變數,分散式計算PI的值
一、Dependency:依賴:RDD分割槽之間的依存關係 --------------------------------------------------------- 1.NarrowDependency: 子RDD的每個分割槽依賴於父RDD的少量分割槽。 |
Python學習【第21篇】:程序池以及回撥函式 python併發程式設計之多程序2-------------資料共享及程序池和回撥函式
python併發程式設計之多程序2-------------資料共享及程序池和回撥函式 一、資料共享 1.程序間的通訊應該儘量避免共享資料的方式 2.程序
MFC程式設計之全域性唯一識別符號(GUID,Globally Unique Identifier)
GUID簡介 全域性唯一識別符號(GUID,Globally Unique Identifier)也稱作 UUID(Universally Unique IDentifier)。GUID是一種由演算法生成的二進位制長度為128位的數字識別符號。GUID主要用於在擁有多個節點
Spark核心程式設計之RDD持久化詳解
RDD持久化原理 Spark非常重要的一個功能特性就是可以將RDD持久化在記憶體中。當對RDD執行持久化操作時,每個節點都會將自己操作的RDD的partition持久化到記憶體中,並且在之後對該RDD的反覆使用中,直接使用記憶體快取的partition。這樣的
程式設計之法第二章【快速排序的兩種方法】
花了兩個多小時重新複習了快速排序,之前以為懂,但是真正實踐的時候才發現自己錯了。 快速排序有兩種實現方式。都是兩個指標,不過之前學的一種是一個從頭開始掃,一個從尾開始掃。另外一種是兩個都是從頭開始掃,不過一個比另外一個前一個位置。 如果真正的理解快排的原理,對於一些排序的問
Cloudera Spark 及 Hadoop 開發員培訓學習【北京上海】
auto 行處理 分布式 大數據應用 使用 考試 ado 生態系統 flume Spark 及 Hadoop 開發員培訓 學習如何將數據導入到 Apache Hadoop 機群並使用 Spark、Hive、Flume、Sqoop、Impala 及其他 Hadoop 生態
設計模式之工廠類模式【非原創】
分支 難度 抽象工廠模式 並且 客戶端使用 對比 類圖 分享圖片 In 工廠模式屬於創建型設計模式,需要生成的對象叫做產品,生成對象的地 方叫做工廠。 在任何需要生成復雜對象的地方,都可以使用工廠方法模式。 --- 一、簡單工廠 1.簡介 ??簡單工廠模式是指專門定義一個
Tensorflow程式設計遇到的問題彙總【持續更新】
1、在搭建GAN模型時,使用了和Generator相同結構和引數的simpler,用於在固定epoch後生成樣本,便於觀察訓練效果。但是在執行的過程中出現了 ValueError: Variable generator/g_1e1_conv/w already exists, disallowe
【Mac系統 + Python + Django】之開發一個釋出會系統【Django模型(二)】 【Mac系統 + Mysql】之安裝Mysql資料庫 【Python + Mysql】之用pymysql庫連線Mysql資料庫並進行增刪改查操作
上一部分給大家介紹Django的檢視。 接下來繼續來了解Django框架,來看第二部分,此部分是對資料庫的操作。 目錄: 一、設計系統表 二、admin後臺管理 三、基本資料訪問(SQLite資料庫) 四、Django配置MySQL &
npm缺少css-loader,/style-compiler,stylus-loader問題,npm沒有許可權無法全域性更新問題【已解決】
ERROR in ./node_modules/css-loader!./node_modules/vue-loader/lib/style-compiler?{"vue":true,"id":"data-v-a5e4f82a","scoped":false,"hasInlineConfig":false}!
排序演算法之希爾排序【java實現】
前面介紹的冒泡、選擇、插入排序演算法雖然簡單直觀,但是在排序上的效率一般。對於大量的資料排序就需要更加高效的演算法,那麼下面來介紹一下高效的排序演算法----希爾排序,又稱Shell排序,縮小增量排序。 實際上,希爾排序是基於插入排序的思想。 實現步驟: (1)將有n個元素的陣列分成n/
排序演算法之插入排序演算法【java實現】
插入排序演算法通過對未排序的資料執行逐個插入至合適的位置而完成排序工作。思路簡單,應用較多。但是此演算法在資料無規律的情況下,需要移動大量的資料,效率不高。 步驟: (1)首先對陣列的前兩個資料進行從小到大排序。 (2)接著將第3個數據與排好序的兩個資料進行比較,將第3個數據插入合適的位
排序演算法之選擇排序演算法【java實現】
簡介:遍歷陣列,每次選出最小的數與索引第一個進行交換,直到全部完成。 package zhgyu.sort; /** /*選擇排序演算法 * @author zhgyu * */ public class SelectionSort { static final int SIZE =
js實現之--防抖節流【理解+程式碼】
防抖: 理解:在車站上車,人員上滿了車才發走重點是人員上滿觸發一次。 場景:實時搜尋,拖拽。 實現: //每一次都要清空定時器,重新設定上計時
《機器學習》程式設計作業的Python實現【ex1_multi.py】
Python程式碼 import numpy as np import matplotlib.pyplot as plt def featureNormalize(X): X_norm = X mu = np.zeros((1, X.shape[1])) sigma
《機器學習》程式設計作業的Python實現【ex1.py】
Python程式碼 from mpl_toolkits.mplot3d import Axes3D import numpy as np import matplotlib.pyplot as plt # ============= warmUpExercise ================
Spring自動裝配之byName和byType【Spring入門】
在使用Spring框架時,有些bean中有個成員變數是另一個bean。舉個實際的例子,比如說CDPlayer類(CD播放器類)中有一個屬性是CD(介面),代表著CDPlayer可以放各種CD,可以例項
"尚學堂杯"哈爾濱理工大學第七屆程式設計競賽 C.Collection Game【遞推】
Collection Game Time Limit: 1000 MS Memory Limit: 128000 K Total Submit: 41(21 users) Total Accept
windown 解壓縮版Mysql安裝和環境變數配置【絕對成功】
1.下載mysql 地址:http://pan.baidu.com/s/1hsgqY6S 2.解壓縮 解壓縮到 D:\MyProgram 環境變數配置:我的電腦 --> 屬性 --> 環境變數 --> PATH 加入:D:\MyProgram\mysql-