1. 程式人生 > >dataframe 資料統計視覺化---spark scala 應用

dataframe 資料統計視覺化---spark scala 應用

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Row, SaveMode, _}
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.types.StringType import scala.collection.mutable.ArrayBuffer /** * 功能:對hive表的列資訊進行統計。 * 統計結果包括: * 1.包含每列的平均值,中位數,最小值最大值,方差,唯一值,缺失值,列型別。 * 2.列的直方圖分佈(字串top10,數值列10個區間),四分點陣圖分佈(數值列)。 * * 實現邏輯: * 1.利用spark的describe函式獲取到最大值,最小值,均值,方差等。 * 2。利用sql:獲取唯一值及缺失值,sql樣例如下: * select count(distinct(id)) as unique_id , count(distinct(name)) as unique_name, sum(case when id is null then 1 else 0 end) as missing_id, sum(case when name is null then 1 else 0 end) as missing_name, sum(1) as totalrows from zpcrcf * 結果: * +---------+-----------+----------+------------+---------+ * |unique_id|unique_name|missing_id|missing_name|totalrows| * +---------+-----------+----------+------------+---------+ * | 14| 12| 0| 0| 14| * +---------+-----------+----------+------------+---------+ * * 3.利用sql:獲取四分點陣圖,sql樣例如下; * select 'Quartile_id' as colName, ntil, max(id) as num from (select id, ntile(4) OVER (order by id)as ntil from zpcrcf) tt group by ntil * 結果: * +------------+----+---+ * | colName|ntil|num| * +------------+----+---+ * | Quartile_id| 1| 3| * | Quartile_id| 2| 7| * | Quartile_id| 3| 14| * | Quartile_id| 4|100| * * 4.數值型直方圖10階段分割槽間,通過最大值減最小值,獲取各個區間內的分佈。 * sql樣例如下: * select 'MathHistogram_age' as colName, partNum, count(1) as num from ( select age, (case when (age >= 29.0 and age <= 36.1) then 1 when (age > 36.1 and age <= 43.2) then 2 when (age > 43.2 and age <= 50.3) then 3 when (age > 50.3 and age <= 57.4) then 4 when (age > 57.4 and age <= 64.5) then 5 when (age > 64.5 and age <= 71.6) then 6 when (age > 71.6 and age <= 78.69999999999999) then 7 when (age > 78.69999999999999 and age <= 85.8) then 8 when (age > 85.8 and age <= 92.9) then 9 when (age > 92.9 and age <= 100.0) then 10 else 0 end ) as partNum from zpcrcf) temptableScala group by partNum * 結果: * +-----------------+-------+---+ * | colName|partNum|num| * +-----------------+-------+---+ * |MathHistogram_age| 0| 1| * |MathHistogram_age| 1| 3| * |MathHistogram_age| 10| 10| * | MathHistogram_id| 1| 10| * | MathHistogram_id| 2| 3| * | MathHistogram_id| 10| 1| * +-----------------+-------+---+ * * * * Created by zpc on 2016/4/26.
*/ object DataFrameVisiualize extends Logging { def runforstatistic(hiveContext: HiveContext, params: JSONObject) = { val arr = params.getJSONArray("targetType") var i = 0 while( arr != null && i < arr.size()){ val obj = arr.getJSONObject(i) if("dataset".equalsIgnoreCase(obj.getString("targetType"))){ val tableNameKey
= obj.getString("targetName") val tableName = params.getString(tableNameKey) val user = params.getString("user") run(hiveContext, tableName, user) } i = i+1 } } def run(hiveContext: HiveContext, tableName: String, user: String) = { val pathParent = s"/user/$user/mlaas/tableStatistic/$tableName" // val conf = new SparkConf().setAppName("DataFrameVisiualizeJob") // val sc = new SparkContext(conf) // val hiveContext = new HiveContext(sc) // val sqlContext = new SQLContext(sc) //0.獲取DB的schema資訊 val schemadf = hiveContext.sql("desc " + tableName) //schema資訊落地 val filePathSchema = pathParent + "/schemajson" schemadf.write.mode(SaveMode.Overwrite).format("json").save(filePathSchema) //1.載入表到dataframe val df = hiveContext.sql("select * from " + tableName) //2.獲取dataframe的describe資訊,預設為獲取到的都為數值型列 val dfdesc = df.describe() // //3.描述資訊落地 // val filePath = pathParent + "/describejson" // des.write.mode(SaveMode.Overwrite).format("json").save(filePath) // val dfdesc = sqlContext.read.format("json").load(filePath) //4.列資訊區分為mathColArr 和 strColArr val mathColArr = dfdesc.columns.filter(!_.equalsIgnoreCase("summary")) val (colMin, colMax, colMean, colStddev, colMedian) = getDesfromDF(dfdesc, mathColArr) val allColArr = df.columns // col type 存在vector型別,此處僅統計string和num型別的 val typeMap = df.dtypes.toMap val strColArr = allColArr.filter(typeMap.get(_).get.equals(StringType.toString)) // val strColArr = allColArr.filter(!_.equalsIgnoreCase("summary")).diff(mathColArr) saveRecords(hiveContext, tableName, 100, pathParent + "/recordsjson") val jsonobj = getAllStatistics(hiveContext, tableName, allColArr, strColArr, mathColArr, 10, colMin, colMax) jsonobj.put("colMin", colMin) jsonobj.put("colMax", colMax) jsonobj.put("colMean", colMean) jsonobj.put("colStddev", colStddev) jsonobj.put("colMedian", colMedian) val jsonStr = jsonobj.toString val conf1 = new Configuration() val fs = FileSystem.get(conf1) val fileName = pathParent + "/jsonObj" val path = new Path(fileName) val hdfsOutStream = fs.create(path) hdfsOutStream.write(jsonStr.getBytes("utf-8")) hdfsOutStream.flush() hdfsOutStream.close() // fs.close(); } def saveRecords(hiveContext: HiveContext, tableName: String, num: Int, filePath: String) : Unit = { hiveContext.sql(s"select * from $tableName limit $num").write.mode(SaveMode.Overwrite).format("json").save(filePath) } /** * 根據allCols, mathColArr, strColArr 三個陣列,返回帶有所有統計資訊(除去已經根據describe獲取到的)的dataframes。 * 返回的dataframe結果進行遍歷,填充各個屬性的值。 */ def getAllStatistics(hiveContext: HiveContext, tableName: String, allColArr: Array[String], strColArr: Array[String], mathColArr: Array[String], partNum: Int, colMin: java.util.HashMap[String, Double], colMax: java.util.HashMap[String, Double]) : JSONObject = { val jsonobj = new JSONObject() val sb = new StringBuffer() sb.append("select ") allColArr.map{col => sb.append(s"count(distinct(`$col`)) as unique_$col ," + s"sum(case when `$col` is null then 1 else 0 end) as missing_$col, ")} sb.append(s"sum(1) as totalrows from $tableName") val df = hiveContext.sql(sb.toString) val colUnique = new java.util.HashMap[String, Long]//唯一值 val colMissing = new java.util.HashMap[String, Long]//缺失值 var totalrows = 0L df.take(1).foreach(row => (totalrows = row.getAs[Long]("totalrows"), jsonobj.put("totalrows", totalrows) ,allColArr.foreach(col => (colUnique.put(col, row.getAs[Long]("unique_"+col)),colMissing.put(col, row.getAs[Long]("missing_"+col))) ) )) val dfArr = ArrayBuffer[DataFrame]() val strHistogramSql = new StringBuffer() strHistogramSql.append(s""" SELECT tta.colName, tta.value, tta.num FROM ( SELECT ta.colName, ta.value, ta.num, ROW_NUMBER() OVER (PARTITION BY ta.colName ORDER BY ta.num DESC) AS row FROM ( """) var vergin = 0 for(col <- strColArr){ if(vergin == 1){ strHistogramSql.append(" UNION ALL ") } vergin = 1 strHistogramSql.append(s""" SELECT 'StrHistogram_$col' AS colName, `$col` AS value, COUNT(1) AS num FROM $tableName GROUP BY `$col` """) } strHistogramSql.append(s""" ) ta ) tta WHERE tta.row <= $partNum """) //整個表中,可能不存在字串型的列。此時,sql是不完整的,新增到df中會報錯。 if(strColArr != null && strColArr.size != 0 ){ val dfStrHistogram = hiveContext.sql(strHistogramSql.toString) dfArr.append(dfStrHistogram) } for(col <- mathColArr) { val df1 = hiveContext.sql(s"select 'Quartile_$col' as colName, ntil, bigint(max(`$col`)) as num from (select `$col`, ntile(4) OVER (order by `$col`)as ntil from $tableName) tt group by ntil ") log.info("col is :" + col + ", min is :" + colMin.get(col) + ", max is : " + colMax.get(col)) // when the column data contains null, the min and max may be null or be "Infinity". if (colMin == null || colMin.get(col) == null || colMax.get(col) == null || colMax.get(col) == "Infinity" || colMin.get(col) == "Infinity") { log.info("col is :" + col + ", min is :" + colMin.get(col) + ", max is : " + colMax.get(col)) } else { //need toString first, then toDouble。 or:ClassCastException val min = colMin.get(col).toString.toDouble val max = colMax.get(col).toString.toDouble val df2 = getHistogramMathDF(col, hiveContext, tableName, min, max, partNum) dfArr.append(df1) dfArr.append(df2) } } //可能存在沒有列可統計的情況, e.g. 表中的列都為double,但資料都是null. //dfArr.reduce 和會報錯:java.lang.UnsupportedOperationException: empty.reduceLeft //總行數為0時,四分位,條形圖也自然獲取不到,且會出現NullPointerException。 if(dfArr.isEmpty || totalrows == 0L){ jsonobj.put("colUnique", colUnique) jsonobj.put("colMissing", colMissing) }else { val dfAll = dfArr.reduce(_.unionAll(_)) val allRows = dfAll.collect() val mathColMapQuartile = new java.util.HashMap[String, Array[java.util.HashMap[String, Long]]] //四分位 val mathColMapHistogram = new java.util.HashMap[String, Array[java.util.HashMap[String, Long]]] //條形圖 val strColMapHistogram = new java.util.HashMap[String, Array[java.util.HashMap[String, Long]]] //條形圖 val (mathColMapQuartile1, mathColMapHistogram1, strColMapHistogram1) = readRows(allRows) for (col <- strColArr) { strColMapHistogram.put(col, strColMapHistogram1.get(col).toArray[java.util.HashMap[String, Long]]) } for (col <- mathColArr) { mathColMapQuartile.put(col, mathColMapQuartile1.get(col).toArray[java.util.HashMap[String, Long]]) mathColMapHistogram.put(col, mathColMapHistogram1.get(col).toArray[java.util.HashMap[String, Long]]) } jsonobj.put("mathColMapQuartile", mathColMapQuartile) jsonobj.put("mathColMapHistogram", mathColMapHistogram) jsonobj.put("strColMapHistogram", strColMapHistogram) jsonobj.put("colUnique", colUnique) jsonobj.put("colMissing", colMissing) } jsonobj } def readRows(rows: Array[Row]) : (java.util.HashMap[String, ArrayBuffer[java.util.HashMap[String,Long]]] , java.util.HashMap[String, ArrayBuffer[java.util.HashMap[String,Long]]], java.util.HashMap[String, ArrayBuffer[java.util.HashMap[String,Long]]])={ val mathColMapQuartile = new java.util.HashMap[String, ArrayBuffer[java.util.HashMap[String,Long]]] //四分位 val mathColMapHistogram = new java.util.HashMap[String, ArrayBuffer[java.util.HashMap[String,Long]]]//條形圖 val strColMapHistogram = new java.util.HashMap[String, ArrayBuffer[java.util.HashMap[String,Long]]]//條形圖 rows.foreach( row => { val colName = row.getAs[String]("colName") if (colName.startsWith("StrHistogram")) { val value = row.getAs[String](1) val num = row.getAs[Long](2) val map = new java.util.HashMap[String, Long]() val col = colName.substring(colName.indexOf('_') + 1) map.put(value, num) val mapValue = strColMapHistogram.get(col) if (mapValue == null) { val mapValueNew = ArrayBuffer[java.util.HashMap[String, Long]]() mapValueNew.append(map) strColMapHistogram.put(col, mapValueNew) } else { mapValue.append(map) strColMapHistogram.put(col, mapValue) } } else if (colName.toString.startsWith("Quartile")) { val value = row.get(1).toString val num = row.getAs[Long](2) val map = new java.util.HashMap[String, Long]() val col = colName.substring(colName.indexOf('_') + 1) map.put(value, num) val mapValue = mathColMapQuartile.get(col) if (mapValue == null) { val mapValueNew = ArrayBuffer[java.util.HashMap[String, Long]]() mapValueNew.append(map) mathColMapQuartile.put(col, mapValueNew) } else { mapValue.append(map) mathColMapQuartile.put(col, mapValue) } } else if (colName.toString.startsWith("MathHistogram")) { val value =row.get(1).toString val num = row.getAs[Long](2) val map = new java.util.HashMap[String, Long]() val col = colName.substring(colName.indexOf('_') + 1) map.put(value, num) val mapValue = mathColMapHistogram.get(col) if (mapValue == null) { val mapValueNew = ArrayBuffer[java.util.HashMap[String, Long]]() mapValueNew.append(map) mathColMapHistogram.put(col, mapValueNew) } else { mapValue.append(map) mathColMapHistogram.put(col, mapValue) } } }) (mathColMapQuartile, mathColMapHistogram, strColMapHistogram) } /** 數值型的列的條形分佈獲取方法*/ def getHistogramMathDF(col : String, hiveContext: HiveContext, tableName: String, min: Double, max: Double, partNum: Int) : DataFrame = { val len = (max - min) / partNum log.info(s"len is : $len") val sb = new StringBuffer() sb.append(s"select `$col`, (case ") val firstRight = min + len sb.append(s" when (`$col` >= $min and `$col` <= $firstRight) then 1 ") for (i <- 2 until (partNum + 1)) { val left = min + len * (i - 1) val right = min + len * i sb.append(s" when (`$col` > $left and `$col` <= $right) then $i ") } sb.append(s" else 0 end ) as partNum from $tableName") sb.insert(0, s"select 'MathHistogram_$col' as colName, partNum, count(1) as num from ( ") sb.append(") temptableScala group by partNum") log.info("getHistogram is: " + sb.toString) val df = hiveContext.sql(sb.toString) df } def getDesfromDF(dfdesc : DataFrame, mathColArr: Array[String]): (java.util.HashMap[String, Double], java.util.HashMap[String, Double], java.util.HashMap[String, Double], java.util.HashMap[String, Double], java.util.HashMap[String, Double])= { val allRows = dfdesc.collect() val colMin = new java.util.HashMap[String, Double]//最小值 val colMax = new java.util.HashMap[String, Double]//最大值 val colMean = new java.util.HashMap[String, Double]//平均值 val colStddev = new java.util.HashMap[String, Double]//標準差 val colMedian = new java.util.HashMap[String, Double]//中位值 allRows.foreach(row => { val mapKey = row.getAs[String]("summary") for(col <- mathColArr){ if("mean".equalsIgnoreCase(mapKey)){ colMean.put(col, row.getAs[Double](col)) }else if("stddev".equalsIgnoreCase(mapKey)){ colStddev.put(col, row.getAs[Double](col)) }else if("min".equalsIgnoreCase(mapKey)){ log.info("col is " + col +", min is : "+ row.getAs[Double](col)) colMin.put(col, row.getAs[Double](col)) }else if("max".equalsIgnoreCase(mapKey)){ log.info("col is " + col +", max is : "+ row.getAs[Double](col)) colMax.put(col, row.getAs[Double](col)) }else{ colMedian.put(col, row.getAs[Double](col)) } } }) (colMin, colMax, colMean, colStddev, colMedian) } }

相關推薦

dataframe 資料統計視覺---spark scala 應用

import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{Logging, SparkConf, SparkContext} import org.apache.spark.sql.{DataFrame, Row, Sa

python 爬蟲與資料視覺--matplotlib模組應用

一、資料分析的目的(利用大資料量資料分析,幫助人們做出戰略決策)   二、什麼是matplotlib?   matplotlib: 最流行的Python底層繪相簿,主要做資料視覺化圖表,名字取材於MATLAB,模仿MATLAB構建,能將資料進行視覺化、更直觀的呈現、使資料更加客觀、更具說服

如何才能增強資料結構和演算法的能力-資料結構視覺網站

所以,兩個建議: 1. 視覺化資料結構,把它畫出來,在你的腦海中視覺化,可以更好地幫助你直觀地理解它。(推薦兩個資料結構視覺化網站: Data Structure Visualization和  VisuAlgo - visualising data structures a

【手把手教你】Python獲取財經資料視覺分析

內容來自:微信公眾號:python金融量化 關注可瞭解更多的金融與Python乾貨。 “巧婦難為無米之炊”,找不到資料,量化分析也就無從談起。對於金融分析者來說,獲取資料是量化分析的第一步。Python的一個強大功能之一就是資料獲取(爬蟲)。但是對於沒時間學爬蟲程式的小白來說,pytho

2018/12/01 資料視覺工具Treeviz介紹

下載地址: 連結:https://pan.baidu.com/s/1VnXjeDFJoB9C38OQex7Bsg  提取碼:j1b0    以下目錄開啟Treeviz     介紹 file:我們只能選擇file下

Python讓你的資料生成視覺圖形

今天就用 pyecharts 庫來畫圖。 安裝:最簡單快速的命令安裝方法: pip install pyecharts 柱形圖 柱形圖簡明、醒目,是一種常用的統計圖形。以下生成的圖都可以點選 html 檔案開啟,點選圖形右邊下載按鈕可以下載到本地。 from py

Python突破高德API限制爬取交通態勢資料+GIS視覺(超詳細)

一、需求:        爬取高德的交通態勢API,將資料視覺化為含有交通態勢資訊的向量路網資料。 二、使用的工具:        Python IDLE、記事本編輯器、ArcGIS 10.2、申請的高德開發者KEY(免費)。 三、主要思路:        本文的思

BI大資料分析視覺軟體系統開發

大資料時代,人們對資料的整理分析越來越重BI也稱商業智慧,商業智慧一般被理解為將企業中所產生的資料轉化為知識,幫助企業做出明智經營決策的輔助工具。BI大資料分析視覺化軟體適用於任何或產生資料的行業,尤其是現在是大資料時代,從大資料分析出的結論對各個企業都有深遠影響。 這裡所說的資料包括來自企業的業務系統的訂

Ebay開源基於大資料視覺框架:Pulsar Reporting

作者:汪明明,王巧玲 ebay又新添了一個開源專案PulsarReporting – 基於大資料的視覺化框架 Pulsar作為一個實時和近實時大資料分析處理系統,包含了Pulsar Pipeline和Pulsar Reporting. 在今年三月, eBay Puls

Spring Cloud【Finchley】-12使用Hystrix Dashboard實現Hystrix資料視覺監控

文章目錄 概述 Hystrix Dashboard Step 1 新建專案 Step2 增加maven依賴 Step3 啟動類增加註解@EnableHystrixDashboard Step4 配置檔案applicati

【python資料探勘課程】十九.鳶尾花資料視覺、線性迴歸、決策樹花樣分析

這是《Python資料探勘課程》系列文章,也是我這學期上課的部分內容。本文主要講述鳶尾花資料集的各種分析,包括視覺化分析、線性迴歸分析、決策樹分析等,通常一個數據集是可以用於多種分析的,希望這篇文章對大

Python Django+Echarts將資料視覺輸出

以上為最終結果 Django是開放原始碼的Web應用框架,由Python語言編寫。 pip3 install django安裝Django,並加入系統變數Path。 建立Django專案,並在PyCharm下的terminal輸入:python manage.py st

電子商務商電商BI大資料分析視覺系統開發

電子商務商的資料分析很重要,它可以從各種資料找那個分析出哪種商品好賣,哪類人群喜歡買什麼等等。甚至使用者畫像完全可以分析出,在電商領域,資料一般可以分為四大型別,流量、銷量、商品和會員,這也是最基礎的報表需求。 1.流量部分有點選、搜尋、來源等,這些資訊經過分析可以運用在廣告包括一些產品的改版以

七種方法實現Python抓取資料視覺

  Python 的scientific stack(一個介紹Python科學計算包的網站)已經完全成熟,並且有各種各樣用例的庫,包括機器學習(連結:machine learning),資料分析(連結:data analysis)。資料視覺化是探索資料和清晰的解釋結果很重要的一部分,

MNIST 資料視覺程式碼

寫一個matlab小程式將mnist資料集視覺化,將以下程式碼命名為image_visualization 並放在 $caffe_root/data/mnist/  下,{獨立執行,不必編譯caffe,但是要提前Linux下下載資料} clear; clc; cle

基於Python Plotly 對 MySQL 儲存資料視覺初步

Pyhon提供強大的視覺化工具,除matplotlib外,pandas、seaborn、ggplot、bokeh、pygal、plotly都具有強大的可視功能(http://www.thebigdat

利用 ELK系統分析Nginx日誌並對資料進行視覺展示

一、寫在前面   結合之前寫的一篇文章:Centos7 之安裝Logstash ELK stack 日誌管理系統,上篇文章主要講了監控軟體的作用以及部署方法。而這篇文章介紹的是單獨監控nginx 日誌分析再進行視覺化圖形展示,並在使用者前端使用nginx 來代理kibana

某招聘網資訊統計視覺

0x00 前言 資料截至:2016.02.23 你應該猜到是哪個網站了,用python3寫了個多執行緒(非同步也不錯)+多代理爬蟲,大致實現是在執行中不斷往資料庫加入新代理,在獲取中把無效代理去掉及將任務ID添加回佇列,最後剩下穩定的代理迴圈使用,也

教你解決Python資料分析視覺時可以顯示中文

問題描述: 使用Python進行資料分析時,中文是顯示不了的, 那麼怎麼使matplotlib視覺化是能夠顯示中文呢? 你需要具備的知識:matplotlib基本操作,linux基本操作,IPyt

Python資料分析視覺Seaborn例項講解

Seaborn是一種基於matplotlib的圖形視覺化python libraty。它提供了一種高度互動式介面,便於使用者能夠做出各種有吸引力的統計圖表。 Seaborn其實是在matplotlib的基礎上進行了更高階的API封裝,從而使得作圖更加容易,在大