1. 程式人生 > >在 Spark DataFrame 中使用Time Window

在 Spark DataFrame 中使用Time Window

從Spark 2.0.0開始,Spark Sql包內建和Spark Streaming類似的Time Window,方便我們通過時間來理解資料。

Spark Sql包中的Window API

Tumbling Window
window(timeColumn: Column, windowDuration: String): Column
Slide Window
window(timeColumn: Column, windowDuration: String, slideDuration: String): Column
window(timeColumn: Column,windowDuration:
String,slideDuration: String,startTime: String): Column
注意
  1. timeColumn 時間列的schema必須是timestamp型別。

  2. 視窗間隔(windowDurationslideDuration)是字串型別。如 0 years0 months1 week0 days0 hours0 minute0 seconds1 milliseconds0 microseconds

  3. startTime 開始的位置。如從每小時第15分鐘開始,startTime15 minutes

測試資料

data/cpu_memory_disk_monitor.
csv,每行是每隔5分鐘對CPU、記憶體、磁碟的監控。如下: eventTime,cpu,memory,disk 2017-12-31 23:21:01,2.87,28.23,58 2017-12-31 23:26:01,4.32,28.47,58 2017-12-31 23:31:02,3.15,28.72,58 2017-12-31 23:36:02,3.62,28.65,58 2017-12-31 23:41:02,3.25,28.70,59 2017-12-31 23:46:02,3.63,28.85,59 2017-12-31 23:51:03,2.76,28.96,59 2017-12-31 23:56:03,3.44
,29.07,59 2018-01-01 00:01:03,6.14,41.54,60 2018-01-01 00:06:03,14.84,35.44,59 2018-01-01 00:11:04,20.68,39.99,59 2018-01-01 00:16:04,7.53,33.55,61 2018-01-01 00:21:05,9.27,36.83,59 2018-01-01 00:26:05,4.78,35.79,59 2018-01-01 00:31:05,12.02,36.55,59 2018-01-01 00:36:06,2.23,34.89,59 2018-01-01 00:41:06,4.44,35.29,59 2018-01-01 00:46:06,3.76,62.45,59

在Spark DataFrame 中使用Time Window

package com.bigData.spark

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SparkSession, functions}

/**
  * Author: Wang Pei
  * License: Copyright(c) Pei.Wang
  * Summary:
  *
  * spark 2.2.2
  *
  */
object SparkDataFrameTimeWindow {
  def main(args: Array[String]): Unit = {

    //設定日誌等級
    Logger.getLogger("org").setLevel(Level.WARN)

    //spark環境
    val spark = SparkSession.builder().master("local[3]").appName(this.getClass.getSimpleName.replace("$","")).getOrCreate()
    import spark.implicits._

    //讀取時序資料
    val data = spark.read.option("header","true").option("inferSchema","true").csv("data/cpu_memory_disk_monitor.csv")
    //data.printSchema()

    /** 1) Tumbling window */
    /** 計算: 每10分鐘的平均CPU、平均記憶體、平均磁碟,並保留兩位小數 */
    data
      .filter(functions.year($"eventTime").between(2017,2018))
      .groupBy(functions.window($"eventTime","10 minute")) //Time Window
      .agg(functions.round(functions.avg($"cpu"),2).as("avgCpu"),functions.round(functions.avg($"memory"),2).as("avgMemory"),functions.round(functions.avg($"disk"),2).as("avgDisk"))
      .sort($"window.start").select($"window.start",$"window.end",$"avgCpu",$"avgMemory",$"avgDisk")
      .limit(5)
      .show(false)


    /** 2) Slide window */
    /** 計算:從第3分鐘開始,每5分鐘計算最近10分鐘內的平均CPU、平均記憶體、平均磁碟,並保留兩位小數 */
    data
      .filter(functions.year($"eventTime").between(2017,2018))
      .groupBy(functions.window($"eventTime","10 minute","5 minute","3 minute")) //Time Window
      .agg(functions.round(functions.avg($"cpu"),2).as("avgCpu"),functions.round(functions.avg($"memory"),2).as("avgMemory"),functions.round(functions.avg($"disk"),2).as("avgDisk"))
      .sort($"window.start").select($"window.start",$"window.end",$"avgCpu",$"avgMemory",$"avgDisk")
      .limit(5)
      .show(false)
  }
}

sparkDataFrame_useWindow.png

相關推薦

Spark DataFrame 使用Time Window

從Spark 2.0.0開始,Spark Sql包內建和Spark Streaming類似的Time Window,方便我們通過時間來理解資料。 Spark Sql包中的Window API Tumbl

spark dataFrame 使用 pandas dataframe

文章目錄 背景 xgboost 預測 toPandas 效果 xgboost 預測 spark dataframe 轉 pandas dataframe 背景

Spark DataFrame的join使用說明

spark sql 中join的型別 Spark DataFrame中join與SQL很像,都有inner join, left join, right join, full join; 型別 說明 inner join 內連線

Spark DataFrame的join型別

Spark DataFrame中join與SQL很像,都有inner join, left join, right join, full join; 那麼join方法如何實現不同的join型別呢? 看其原型 def join(right : DataFra

[Spark][Python]DataFrame取出有限個記錄的例子

dep ins pytho rem json.js art hadoop fileinput taskset [Spark][Python]DataFrame中取出有限個記錄的例子: sqlContext = HiveContext(sc) peopleDF = sql

Spark SQL RDD 轉換到 DataFrame

pre ase replace 推斷 expr context 利用反射 轉換 port 1.people.txtsoyo8, 35小周, 30小華, 19soyo,882./** * Created by soyo on 17-10-10. * 利用反射機制推斷RDD

Spark SQLDataframe join操作含null值的列

dataframe util pre table log n-n dram blog between 當在Spark SQL中對兩個Dataframe使用join時,當作為連接的字段的值含有null值。由於null表示的含義是未知,既不知道有沒有,在SQL中null值與任何

Spark SQLRDDs轉化為DataFrame(詳細全面)

除了呼叫SparkSesion.read().json/csv/orc/parqutjdbc 方法從各種外部結構化資料來源建立DataFrame物件外,Spark SQL還支援 將已有的RDD轉化為DataFrame物件,但是需要注意的是,並不是由任意型別物件組成的RDD均

jQuery的$(window)與$(document)幾個用法區別

圖片 document window function 準備就緒 [window對象] 它是一個頂層對象,而不是另一個對象的屬性,即表示瀏覽器中打開的窗口。 1、屬性  defaultStatus 缺省的狀態條消息  document 當前顯示的文檔(該屬性本身也是一個對象)  f

android View, Window, Activity, WindowManager,ViewRoot幾者之間的關系

line 消息傳遞 post att 顯示 增加 調用 eas window對象 (1)View:最主要的UI組件,表示屏幕上的一個矩形區域。 (2)Window: 表示一個窗體,不一定有屏幕那麽大,能夠非常大也能夠非常小;

spark dataframe函數編程

一行 columns per type 部分 left lena 結構體 filter DataFrame 的函數 Action 操作 1、 collect() ,返回值是一個數組,返回dataframe集合所有的行 2、 collectAsList() 返回值是一個Jav

spark dataframe操作集錦(提取前幾行,合並,入庫等)

tex hang count() time cache height 以及 入庫 blank Spark dataframe派生於RDD類,但是提供了非常強大的數據操作功能。當然主要對類SQL的支持。 在實際工作中會遇到這樣的情況,主要是會進行兩個數據集的篩選、合並,重

Pythontime模塊和datetime模塊的常用操作以及幾種常用時間格式間的轉換

pyrhon time datatime 幾種常用時間格式的轉換 最常見以及常用的幾種時間格式 1、時間戳(timestamp),時間戳表示的是從1970年1月1日00:00:00開始按秒計算的偏移量。 2、時間元組(struct_time),共有九個元素組。 3、格式化時間(fo

如何處理DataFrame缺失項

sta false ace res 處理 結果 大小 pandas http 查看所有單元格是否為NaN DataFrame.isnull() 這個函數會返回一個和原來表格大小相同的表格,原表格值為NaN,此表中為True,否則為False pandas.notnull()

Go_20: Golang time 包的使用

舉例 處理程序 計算表達式 時間格式化 停止 out str div ati time包中包括兩類時間:時間點(某一時刻)和時常(某一段時間) 1. 時間常量(時間格式化) const ( ANSIC = "Mon Jan _2 15:04:05 20

Pythontime模塊詳解(轉)

才有 border 格式化時間 sta 程序 格式化字符串 夏令時 oca import 在平常的代碼中,我們常常需要與時間打交道。在Python中,與時間處理有關的模塊就包括:time,datetime以及calendar。這篇文章,主要講解time模塊。 在開始之前,首

Spark Streaming的操作函數講解

csdn 後綴 rep 包含著 所有 並行計算 技術分享 ref filter Spark Streaming中的操作函數講解 根據根據Spark官方文檔中的描述,在Spark Streaming應用中,一個DStream對象可以調用多種操作,主要分為以下幾類 Tra

pythontime模塊獲取時間的使用

轉化 毫秒 3.0 精確 ftime 時間戳 import 格式 local import time() ·獲取本地時間: time.time() #本地時間的時間戳格式。 1514273633.0474658   eg: int(time.time()) , int

Spark DataFrame ETL教程

string red 封裝 快速 creat 入參 show 很難 .sh 前言 ETL是 Extract-Transform-Load的縮寫,也就是抽取-轉換-加載,在數據工作中是非常重要的部分。實際上,ETL就是一個對數據進行批處理的過程,一個ETL程序就是一個批處理腳

GC調優在Spark應用的實踐(轉載)

avg fix 時也 net aso 會有 介紹 完整 頻繁 Spark是時下非常熱門的大數據計算框架,以其卓越的性能優勢、獨特的架構、易用的用戶接口和豐富的分析計算庫,正在工業界獲得越來越廣泛的應用。與Hadoop、HBase生態圈的眾多項目一樣,Spark的運行離不開J