1. 程式人生 > >spark視窗函式簡單實現

spark視窗函式簡單實現

Window函式,可以統計最近一段時間的資料,使用Window函式載入成DStream:DStream.window("視窗長度","滑動間隔")reduceByKeyAndWindow視窗長度:必須是BathInterval的整數倍滑動間隔:必須是BatchInterval的整數倍/** * 1、local的模擬執行緒數必須大於等於2 因為一條執行緒被receiver(接受資料的執行緒)佔用,另外一個執行緒是job執行 * 2、Durations時間的設定,就是我們能接受的延遲度,這個我們需要根據叢集的資源情況以及監控每一個job的執行時間來調節出最佳時間。 * 3、 建立JavaStreamingContext有兩種方式 (sparkconf、sparkcontext) * 4、業務邏輯完成後,需要有一個output operator * 5、JavaStreamingContext.start()straming框架啟動之後是不能在次新增業務邏輯 * 6、JavaStreamingContext.stop()無參的stop方法會將sparkContext一同關閉,stop(false) ,預設為true,會一同關閉 * 7、JavaStreamingContext.stop()停止之後是不能在呼叫start */public class WindowOperator { @SuppressWarnings("resource") public static void main(String[] args) { SparkConf conf = new SparkConf(); //local[2]模擬執行緒數 conf.setMaster("local[2]").setAppName("ww"); //在此處設定最小間隔時間durations JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(5)); jsc.sparkContext().setLogLevel("WARN"); jsc.checkpoint("./check"); //設定監聽的節點和ip埠 JavaReceiverInputDStream<String> sts = jsc.socketTextStream("CentOS16",9999);//設定檔案保留時間,之後得而RDD使用Window時,只會取到最近60秒的資料 JavaDStream<String> window = sts.window(Durations.seconds(60),Durations.seconds(5)); JavaDStream<String> flatMap = sts.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String arg0) throws Exception { return Arrays.asList(arg0.split(" ")); } }); JavaPairDStream<String,Integer> mapToPair = flatMap.mapToPair(new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String arg0) throws Exception { return new Tuple2<String, Integer>(arg0,1); } }); /** * 每隔5秒,計算最近60秒內的資料,那麼這個視窗大小就是60秒,裡面有12個rdd,在沒有計算之前,這些rdd是不會進行計算的。 * 那麼在計算的時候會將這12個rdd聚合起來,然後一起執行reduceByKeyAndWindow操作 , * reduceByKeyAndWindow是針對視窗操作的而不是針對DStream操作的。 */// JavaPairDStream<String, Integer> reduceByKeyAndWindow = // mapToPair.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {// private static final long serialVersionUID = 1L;// @Override// public Integer call(Integer arg0, Integer arg1) throws Exception {// return arg0+arg1;// }// }, Durations.seconds(30),Durations.seconds(5)); /** * window視窗操作優化,設定加上後面10s的資料,然後減去統計在內的最前面10s的資料 * 可以看成只產生了兩個rdd,在計算之後,這些rdd才開始計算 */ JavaPairDStream<String, Integer> reduceByKeyAndWindow = mapToPair.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer arg0, Integer arg1) throws Exception { return arg0+arg1; } }, new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer arg0, Integer arg1) throws Exception { return arg0-arg1; } //下面的兩個時間必須是前面設定的durations屬性的整數倍 },Durations.seconds(60),Durations.seconds(10)); reduceByKeyAndWindow.print(); jsc.start(); jsc.awaitTermination(); jsc.stop(false); jsc.close(); }}

相關推薦

spark視窗函式簡單實現

Window函式,可以統計最近一段時間的資料,使用Window函式載入成DStream:DStream.window("視窗長度","滑動間隔")reduceByKeyAndWindow視窗長度:必須是BathInterval的整數倍滑動間隔:必須是BatchInterval

[無心插柳]簡單實現常用的表單校驗函式

有意取而不得,失落而歸。無意間有所獲,未有喜悅,但珍惜依舊 1.前言 表單校驗,相信絕大部分的開發者會遇到過,網上也有很多外掛可使用。但當時想著就是簡單的校驗,沒必要引外掛,就自己寫一個簡單的函式。隨著校驗的需求多樣化,函式越來越大。有點輪子的雛形,算是無心插柳吧。現在也該分享出來了,和大家交流

Spark非常實用的視窗函式

spark 累加歷史主要用到了視窗函式,而進行全部統計,則需要用到rollup函式 1 應用場景: 1、我們需要統計使用者的總使用時長(累加歷史) 2、前臺展現頁面需要對多個維度進行查詢,如:產品、地區等等 3、需要展現的表格頭如: 產品、2015-04、2015-05、20

釘釘機器人呼叫函式計算實現serverless web服務:傳統門禁的簡單改造,懶惰癌的福音

本文通過釘釘機器人呼叫函式計算實現的serverless web服務,打通物聯網平臺,和樹莓派實時通訊。實現了將原有傳統的磁吸門禁,改造成可以由釘釘來控制開門的簡單應用。 場景 由於本部門擁有獨立封閉的空間,在大門口配置了磁吸玻璃門,因此規定在工作期間出入需要隨手關門,以保證工作環境的私密性和安全性

Spark2.3(三十四):Spark Structured Streaming之withWaterMark和windows視窗是否可以實現最近一小時統計

WaterMark除了可以限定來遲資料範圍,是否可以實現最近一小時統計? WaterMark目的用來限定引數計算資料的範圍:比如當前計算資料內max timestamp是12::00,waterMark限定資料分為是60 minutes,那麼如果此時輸入11:00之前的資料就會被捨棄不參與統計,視為來遲範圍

Hive函式分類、CLI命令、簡單函式、聚合函式、集合函式、特殊函式(分析函式視窗函式、混合函式,UDTF),常用函式Demo

1.1 Hive函式分類 1.2  Hive  CLI命令 顯示當前會話有多少函式可用 show  functions; 顯示函式的描述資訊: DESC  FUNCTION  concat; 顯示函式的擴充套

spark運算元:滑動視窗函式reduceByKeyAndWindow的使用

1.reduceByKeyAndWindow這個運算元也是lazy的,它用來計算一個區間裡面的資料,如下圖: 截圖自官網,例如每個方塊代表5秒鐘,上面的虛線框住的是3個視窗就是15秒鐘,這裡的15秒鐘就是視窗的長度,其中虛線到實線移動了2個方塊表示10秒鐘,這裡的10秒鐘就表示每隔10秒計算一

【MFC】簡單實現視窗始終置底

環境:win10,vs2017 注意這裡是始終置底,不是始終置頂   關於視窗持續置底,嘗試了不少網上方法,但都不怎麼適合自己的程式。於是自己想了個比較笨的方法,總算是基本實現自己想要的效果了。 下面這句程式碼能將視窗進行一次置底,但視窗一啟用又顯示了。 SetWi

.NET 簡單實現MD5加密函式

一、自定義Md5加密函式 public static string Md5(string str) {   MD5 md5 = MD5.Create();   byte[] bufstr = Encoding.GetEncoding("GBK").GetBytes(str);   byte[] has

Scala +Spark+Hadoop+Zookeeper+IDEA實現WordCount單詞計數(簡單例項)

                 IDEA+Scala +Spark實現wordCount單詞計數 一、新建一個Scala的object單例物件,修改pom檔案 (1)下面文章可以幫助參考安裝 IDEA 和 新建一個Scala程式。 (2)pom檔案 <?xml

第71課:Spark SQL視窗函式解密與實戰

內容:     1.SparkSQL視窗函式解析     2.SparkSQL視窗函式實戰 一、SparkSQL視窗函式解析     1.spark支援兩種方式使用視窗函式:  &nb

Hive SQL視窗函式實現頁面統計(以騰雲天下頁面訪問為例)

埋點資料欄位為: userid,at,sid,pid分別表示使用者id,訪問時間,sessionId(區分一次啟動),頁面id 表名為beacon 所有資料均為模擬資料 2018-07-04 11:46:37 2856 efda26adec1c3eb8 h_01 20

使用巨集定義,簡單實現jni函式命名

在android中呼叫C語言介面時,要為native函式名稱,命名規則是Java_包名(點用下劃線替換)_類名_函式自定義名稱,舉個例子:在java類JninameActivity(包名為com.ckl.jniname)中宣告native介面: private native

inet_pton函式和inet_ntop函式的用法及簡單實現

        這兩個函式是隨IPv6出現的新函式,對於IPv4地址和IPv6地址都適用。函式名中的p和n非別代表表達(presentation)和數值(numeric)。地址的表達格式通常是ASCII字串,數值格式則是存放到套接字地址結構中的二進位制值。函式如下: #i

[2.4]以row_number為例解讀spark sql的視窗函式

參考 場景 將本地檔案toNGroup.txt中的內容: hadoop@master:~/resource$ cat toNGroup.txt hadoop 29 hadoop 87 hadoop 39 hadoop 27 hadoop 88

sqlserver使用視窗函式實現分頁

ALTER PROC [dbo].[usp_GetStuPage] @pageIndex INT =1,--當前頁碼 @pageSize INT =10,--頁容量 @pageCount int OUTPUT--輸出總頁數 AS BEGIN SELECT @pageCoun

spark sql視窗函式

視窗函式是spark sql模組從1.4之後開始支援的,主要用於解決對一組資料進行操作,同時為每條資料返回單個結果,比如計算指定訪問資料的均值、計算累進和或訪問當前行之前行資料等,這些場景使用普通函式實現是比較困難的。 視窗函式計算的一組行,被稱為Frame。每

Qt5 圓角加陰影視窗簡單實現

實現圓角加陰影的視窗,基本思路是利用QFrame,把QFrame通過改變QSS樣式變成圓角,然後利用QGraphicsDropShadowEffect給QFrame設定陰影首先向ui檔案裡的Widget託人個QFrame,然後把QFrame放到中間,與Widget上下左右都隔

利用Spark sql操作Hdfs資料與Mysql資料,sql視窗函式的使用

需求說明:                                                                  對熱門商品進行統計        根據商品的點選資料,統計出各個區域的銷量排行TOPK 產品        輸入:開始時間與結束時間 

hive,spark獲取TOPN視窗函式

TOPNrow number說明: row_number() over ([partition col1] [order by col2]) rank() over ([partition col1] [order by col2]) dense_rank() over ([