在Spark中使用Pivot重塑資料
本文來自Andrew Ray博士在Silicon Valley Data Science網站上發表的部落格,Andrew Ray博士對大資料有著濃厚的興趣並且有著豐富的Spark使用經驗。Andrew同樣也是一名活躍的Apache Spark原始碼貢獻者,其原始碼貢獻主要集中在Spark SQL和GraphX元件上。
透視(pivot)資料功能是Spark 1.6的眾多新增特性之一,它通過使用DataFrame(目前支援Scala、Java和Python語言)建立透視表(pivot table)。透視可以視為一個聚合操作,通過該操作可以將一個(實際當中也可能是多個)具有不同值的分組列轉置為各個獨立的列。透視表在資料分析和報告中佔有十分重要的地位,許多流行的資料操縱工具(如pandas、reshape2和Excel)和資料庫(如MS SQL和Oracle 11g)都具有透視資料的能力。在
語法
在為透視操作進行pull請求的過程中,我進行了許多相關研究,其中一項便是對其它優秀工具的語法進行比較,目前透視語法格式多種多樣,Spark 透視功能最主要的兩個競爭對手是pandas(Python語言)和reshape2(R語言)。
例如,我們想對A列和B列進行分組,然後在C列上進行透視操作並對D列資料進行求和,pandas的語法格式為 pivot_table(df, values=’D’, index=[‘A’, ‘B’], columns=[‘C’], aggfunc=np.sum),這看起來有點冗長但表達還算清晰,如果使用reshape2的話,其語法格式為 dcast(df, A + B ~ C, sum),藉助於R語言公式的表達能力,這種語法十分緊湊,需要注意的是reshape2不需要指定求值列,因為它自身具備將剩餘DataFrame列作為最終求值列的能力(當然也可能通過其它引數進行顯式指定)。
我們提出Spark透視操作自有的語法格式,它能夠與DataFrame上現有其它聚合操作完美結合,同樣是進行group/pivot/sum操作,在Spark中其語法為:df.groupBy(“A”, “B”).pivot(“C”).sum(“D”),顯然這種語法格式非常直觀,但這其中也有個值得注意的地方:為取得更好的效能,需要明確指定透視列對應的不同值,例如如果C列有兩個不同的值(small 和 large),則效能更優的版本語法為: df.groupBy(“A”, “B”).pivot(“C”, Seq(“small”, “large”)).sum(“D”)。當然,這裡給出的是Scala語言實現,使用Java語言和Python語言實現的話方法也是類似的。
報告
讓我們來看一些實際應用案例,假設你是一個大型零售商(例如我前任東家),銷售資料具有標準交易格式並且你想製作一些彙總資料透視表。當然,你可以選擇將資料聚合到可管理的大小,然後使用其它工具去製作最終的資料透視表(儘管初始聚合操作的粒度受限)。但是現在你可以在Spark中進行所有操作(在進行這些操作之前需要進行若干IF判斷),不過不幸的是沒有大的零售商願意將它們原始的銷售資料共享給我們,因此我們將使用合成的資料進行演示,這裡推薦使用TPC-DS 資料集,該資料集是我用過的資料集中比較好的一個,它的元資料(Schema)與實際零售資料非常相似。
因為TPC-DS是為進行不同大小的“大資料”資料庫基準測試而合成的資料集,所以我們可以使用尺度因子(scale factors)決定最終想要生成的資料集大小。為簡單起見,這裡的尺度因子為1,對應資料集大小為1GB。由於需求有點複雜,我使用了docker映象以便大家可以跟著學習。假設我們想根據種類(category)和季度(quarter)對資料進行彙總,各季度資料最終在資料透視表中以列的形式展示,此時我們可以通過下列程式碼完成上述需求(更真實的查詢可能會有更多條件如時間範圍等):
(sql("""select *, concat('Q', d_qoy) as qoy
from store_sales
join date_dim on ss_sold_date_sk = d_date_sk
join item on ss_item_sk = i_item_sk""")
.groupBy("i_category")
.pivot("qoy")
.agg(round(sum("ss_sales_price")/1000000,2))
.show)
+-----------+----+----+----+----+
| i_category| Q1| Q2| Q3| Q4|
+-----------+----+----+----+----+
| Books|1.58|1.50|2.84|4.66|
| Women|1.41|1.36|2.54|4.16|
| Music|1.50|1.44|2.66|4.36|
| Children|1.54|1.46|2.74|4.51|
| Sports|1.47|1.40|2.62|4.30|
| Shoes|1.51|1.48|2.68|4.46|
| Jewelry|1.45|1.39|2.59|4.25|
| null|0.04|0.04|0.07|0.13|
|Electronics|1.56|1.49|2.77|4.57|
| Home|1.57|1.51|2.79|4.60|
| Men|1.60|1.54|2.86|4.71|
+-----------+----+----+----+----+
注意,我們將銷售額以百萬元為單位並精確到小數點後兩位以便於更清晰地比較,上面的資料結果有兩個值得注意的地方:首先,四季度的資料明顯要更多,這對任何熟悉零售業的人來說都很好理解;其次,同一季度中種類為null的異常結果值比較接近。遺憾的是,即使是如此優秀的合成數據集也與真實情況有出入,如果你有比該合成數據集更好且對公眾開放的資料,請告訴我。
特徵生成
第二個例子,讓我們來看預測模型中的特徵生成,在實際應用中,資料集中的目標觀測值常常以每條一行(稱為長格式或窄資料)的格式進行組織。為構建模型,我們首先需要將資料重塑,每個目標值重塑為一行,根據上下文該任務可以有多種方法來完成,其中一種方法便是通過Spark中的透視操作來完成。這也許是其它工具如pandas、reshape2和Excel完成不了的,因為結果集可能有成百萬甚至數十億行。
為使實驗能夠容易地再現,我將使用相對較小的MovieLens 1M資料集,該資料集中包含了由6040個使用者針對3952個電影生成的大約一百萬個電影評級資料。我們嘗試根據100個最流行的電影評級去預測使用者的性別。在下面的例子當中,評級表有三列:user、 movie和rating。
+----+-----+------+
|user|movie|rating|
+----+-----+------+
| 11| 1753| 4|
| 11| 1682| 1|
| 11| 216| 4|
| 11| 2997| 4|
| 11| 1259| 3|
...
為得到每使用者一行格式的資料,我們進行如下透視操作:
val ratings_pivot = ratings.groupBy("user").pivot("movie", popular.toSeq).agg(expr("coalesce(first(rating),3)").cast("double"))
上面程式碼中的popular變數為最流行的電影列表(通過評級數得到),同時我們將預設評級設為3,對於使用者11,其影評資料結果如下:
+----+----+---+----+----+---+----+---+----+----+---+...
|user|2858|260|1196|1210|480|2028|589|2571|1270|593|...
+----+----+---+----+----+---+----+---+----+----+---+...
| 11| 5.0|3.0| 3.0| 3.0|4.0| 3.0|3.0| 3.0| 3.0|5.0|...
+----+----+---+----+----+---+----+---+----+----+---+...
上面的資料為建模時所需要的寬格式資料,完整例子程式碼在這。需要注意的是:我只使用了100個最流行的電影,因為當前的透視操作需要作用於成千上萬個不同值,在當前的實現中其速度不是特別快。我們未來將解決這一問題。
提示和技巧
為獲取最好的效能,透視操作時需要指定透視列對應的不同值(如果你知道的話),不然的話Spark會立即啟動一個job來確定這些值。除此之外,它們將按照排好的順序放置,對大部分應用而言,這種做法是合理的,但是對部分應用而言,如每週各天的順序,這種做法是不合理的(如Friday, Monday, Saturday等) 。
透視同其它正常的聚合操作一樣,支援多個聚合表示式,只要將多個引數傳遞給agg方法即可,例如df.groupBy(“A”, “B”).pivot(“C”).agg(sum(“D”), avg(“D”))
雖然語法上只允許對某一列進行透視,但你可以將多個列組合起來,其得到的結果與透視多個列得到的結果一樣,例如:
+----+----+---+----+----+---+----+---+----+----+---+...
|user|2858|260|1196|1210|480|2028|589|2571|1270|593|...
+----+----+---+----+----+---+----+---+----+----+---+...
| 11| 5.0|3.0| 3.0| 3.0|4.0| 3.0|3.0| 3.0| 3.0|5.0|...
+----+----+---+----+----+---+----+---+----+----+---+...
最後,你可能會對在未明確指定時,對應透視列所允許的值最大數感興趣,這也是捕獲錯誤及避免記憶體溢位(OOM)場景的主要關注點。其配置鍵(config key)為spark.sql.pivotMaxValues,預設值為10000,你可能並不需要對其進行修改。
實現
透視函式的實現通過新增新的邏輯運算元(o.a.s.sql.catalyst.plans.logical.Pivot)進行,該邏輯運算元被新的分析器規則(o.a.s.sql.catalyst.analysis.Analyzer.ResolvePivot)翻譯,該新分析器規則會將其翻譯成帶有許多帶有if語句的聚合操作,每個透視值對應一個表示式。
例如, df.groupBy(“A”, “B”).pivot(“C”, Seq(“small”, “large”)).sum(“D”)將被翻譯成df.groupBy(“A”, “B”).agg(expr(“sum(if(C = ‘small’, D, null))”), expr(“sum(if(C = ‘large’, D, null))”))。你也可能直接這麼用但這會使程式碼比較冗長且容易出錯。
未來的工作
Spark中的透視功能仍然有待於提升,目前大量的工作集中在以下幾個方面:
• 在R API和SQL語法(類似Oracle 11g和MS SQL)中新增透視功能,為使用者提供更大的語言選擇範圍,使透視功能使用更簡便。
• 新增逆透視的支援,其功能與透視操作相反
• 當透視列中的不同值較多時需要提升透視的速度,我目前正在想辦法解決這一問題。
譯者簡介:牛亞真,中科院計算機資訊處理專業碩士研究生,關注大資料技術和資料探勘方向。責編:仲浩
相關推薦
在Spark中使用Pivot重塑資料
本文來自Andrew Ray博士在Silicon Valley Data Science網站上發表的部落格,Andrew Ray博士對大資料有著濃厚的興趣並且有著豐富的Spark使用經驗。Andrew同樣也是一名活躍的Apache Spark原始碼貢獻者,其原始
如何在Spark中使用動態資料轉置
Dynamic Transpose是Spark中的一個關鍵轉換,因為它需要大量的迭代。本文將為您提供有關如何使用記憶體中運算子處理此複雜方案的清晰概念。 首先,讓我們看看我們擁有的源資料: idoc_number,訂單ID,idoc_qualifier_org,idoc_org 7738
解決spark中遇到的資料傾斜問題
一. 資料傾斜的現象 多數task執行速度較快,少數task執行時間非常長,或者等待很長時間後提示你記憶體不足,執行失敗。 二. 資料傾斜的原因 常見於各種shuffle操作,例如reduceByKey,groupByKey,join等操作。 資
Spark中ip對映資料應用庫,二分查詢省份,將結果寫入mysql
def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") .setAppName(IpLocation3.getClass.getName) val sc =
如何呼叫Spark中的資料標準化庫
在大資料的學習過程中,總有很多小夥伴遇到不知如何呼叫Spark中的資料標準庫,本文的核心這不是在於介紹「資料標準化」,也不是在於實現「Spark呼叫」,畢竟這些概念大家應該耳濡目染了,至於呼叫方法一搜一大堆。今天這個問題也是科多大資料的一名學員提出來的,估計有很多人也遇到類似的問題,一併分享在此,希
零基礎入門大資料之spark中rdd部分運算元詳解
先前文章介紹過一些spark相關知識,本文繼續補充一些細節。 我們知道,spark中一個重要的資料結構是rdd,這是一種並行集合的資料格式,大多數操作都是圍繞著rdd來的,rdd裡面擁有眾多的方法可以呼叫從而實現各種各樣的功能,那麼通常情況下我們讀入的資料來源並非rdd格式的,如何轉
零基礎入門大資料之spark中的幾種key-value操作
今天記錄一下spark裡面的一些key-value對的相關運算元。 key-value對可以簡單理解為是一種認為構造的資料結構方式,比如一個字串"hello",單看"hello"的話,它是一個字串型別,現在假設我想把它在一個文字中出現的次數n作為一個值和"hello"一起操作,那麼可
零基礎入門大資料探勘之spark中的幾種map
今天再來說一下spark裡面的幾種map方法。前面的文章介紹過單純的map,但是spark還有幾種map值得對比一下,主要是下面幾種: map:普通的map flatMap:在普通map的基礎上多了一個操作,扁平化操作; mapPartitions:相對於分割槽P
Spark中的資料本地性
分散式資料並行環境下,保持資料的本地性是非常重要的內容,事關分散式系統性能高下。 概念: block : HDFS的物理空間概念,固定大小,最小是64M,可以是128,256 。。也就是說單個檔案大於block的大小,肯定會被切分,被切分的數目大概是:比如檔案是250
大資料----Spark中的 決策樹 及 SVM 建模
#一、演算法解釋 ~~~~~~~使用決策樹二元分類分析StumbleUpon資料集,預測網頁是暫時性(Ephemeral)或是長青的(Evergreen), ~~~~~~~並且調校引數找出最佳引數組合,提高預測準確度。決策樹的優點:條例清晰、方法簡單、易於理解、
Spark中元件Mllib的學習11之使用ALS對movieLens中一百萬條(1M)資料集進行訓練,並對輸入的新使用者資料進行電影推薦
1解釋 spark-1.5.2 資料集:http://grouplens.org/datasets/movielens/ 一百萬條(1M) 資料劃分: 將樣本評分表以key值切分成3個部分,分別用於訓練 (60%,並加入使用者評分), 校驗 (20
Spark中元件Mllib的學習25之線性迴歸2-較大資料集(多元)
對多組資料進行model的training,然後再利用model來predict具體的值 。過程中有輸出model的權重 公式:f(x)=a1X1+a2X2+a3X3+…… 2.程式碼:
Spark中元件Mllib的學習27之邏輯迴歸-多元邏輯迴歸,較大資料集,帶預測準確度計算
2.程式碼: /** * @author xubo * ref:Spark MlLib機器學習實戰 * more code:https://github.com/xubo245/SparkLearning
Spark中sortByKey和sortBy對(key,value)資料分別 根據key和value排序
最近在用Spark分析Nginx日誌,日誌解析和處理完後需要根據URL的訪問次數等進行排序,取得Top(10)等。 根據對Spark的學習,知道Spark中有一個sortByKey()的函式能夠完成對(key,value)格式的資料進行排序,但是,很明顯,它
spark三種清理資料的方式:UDF,自定義函式,spark.sql;Python中的zip()與*zip()函式詳解//及python中的*args和**kwargs
(1)UDF的方式清理資料 import sys reload(sys) sys.setdefaultencoding('utf8') import re import json from pyspark.sql import SparkSession
Spark中元件Mllib的學習9之ALS訓練的model來預測資料的準確率研究
1解釋 研究ALS的準確率 2.程式碼: package org.apache.spark.mllib.learning.recommend import java.text.SimpleDateFormat import java.util.D
Spark中載入本地(或者hdfs)檔案以及 spark使用SparkContext例項的textFile讀取多個資料夾(巢狀)下的多個數據檔案
Spark中載入本地(或者hdfs)檔案以及 spark使用SparkContext例項的textFile讀取多個資料夾(巢狀)下的多個數據檔案 在正常呼叫過程中,難免需要對多個資料夾下的多個檔案進行讀取,然而之前只是明確了spark具備讀取多個檔案的能力。針對多個資料夾下
資料基礎---spark中的資料型別
mllib中的資料型別 本文是對官方文件的翻譯整理 1、資料型別 Local vector(本地向量) Labeled point(帶標籤資料點) Local matrix(本地矩陣) Distrubuted matrix(分散式矩陣):RowM
Spark求資料集中同一主鍵記錄中的最新資料
/** * 測試資料:* 1,001,10,2015-10-29 * 1,001,8.9,2015-10-28 * 2,002,5,2015-10-27 * 2,002,3,2015-10-28 * 3,003,5.9,2015-11-03 * */object Remov
如何解決spark中的資料傾斜問題
發現數據傾斜的時候,不要急於提高executor的資源,修改引數或是修改程式,首先要檢查資料本身,是否存在異常資料。 1、資料問題造成的資料傾斜 找出異常的key 如果任務長時間卡在最後最後1個(幾個)任務,首先要對key進行抽樣分析,判斷是哪些