1. 程式人生 > >資料清洗很要命?那是因為你沒看到這份攻略!

資料清洗很要命?那是因為你沒看到這份攻略!

對資料探勘和分析的人員來說,資料清洗和轉化是一項非常繁瑣和複雜的工作,佔用了很大的工作量。

目前,資料的挖掘和分析,基本都是採用pandas,numpy或者R語言,這種處理過程複雜,而且沒有一個統一的規範。本文將給大家介紹一項技術,使用FEA-spk技術,可以快速實現資料的清洗和轉化工作,而且任何人都能看懂。

FEA-spk技術,它的底層基於最流行的大資料開發框架spark,而且可以和很多流行大資料開發框架結合,比如Hadoop,hbase,mongodb等。使用FEA-spk來做互動分析,不但非常簡單易懂,而且幾乎和spark的功能一樣強大,更重要的一點,它可以實現視覺化,處理的資料規模更大,下面就實際的專案為例進行說明。

1. 要想使用FEA-spk技術,首先要建立一個spk的連線,所有的操作都是以它為上下文進行的。

在fea介面執行以下命令:

2. DataFrame的轉換

FEA-spk技術操作有2種dataframe,一種是pandas的dataframe,可以直接在fea裡面執行dump檢視。另外一種是spark的dataframe,它能夠進行各種各樣的spark運算元操作,比如group,agg等

spark dataframe需要轉換為pandas的dataframe才能執行dump命令檢視,轉換的原語如下

pd= @udf df by spk.to_DF  #spark dataframe df轉換為pandas dataframe pd

dump pd   #可以直接使用dump命令檢視

sdf= @udf spk,pd by spk.to_SDF #將pandas dataframe pd轉換為spark dataframe sdf,以便進行spark的各種操作

3. 匯入資料來源

FEA-spk技術支援各種各樣的資料來源,hive,mongodb,text,avro , json, csv , parquet,mysql,oracle, postgresql以及具有特定格式的檔案

下面舉其中幾個為例進行說明

(1) 載入csv資料來源。

csv資料來源分為2種,第一種是帶header的(即有欄位名的),另外一種是沒有header欄位名的,格式稍有區別

a.csv檔案格式如下

id,hash

1,ssss

2,333

3,5567

下面進行資料載入的命令。

原語如下

df= @udf spk by spk.load_csv with (header,/data/a.csv)

#header為具有欄位名的,/data/a.csv為hdfs上的檔案路徑,如果沒有heade欄位,原語為df= @udf spk by spk.load_csv with (/data/a.csv)

  

(2)  關係型資料來源的載入,比如mysql,oracle,postgresql等

首先需要定義一個json連線串,用來指定連線的地址,資料庫名,使用者名稱,密碼。

格式如下

define mysql1  as ({"url":"jdbc:mysql://bigdata-89:3306","database":"test",

"user":"root","passwd":"123456"})

在mysql的test資料庫裡面有一張student_infos表,下面進行載入

df= @udf spk by spk.load_db with (mysql1,student_infos)

#載入student_infos表

(3)hive資料來源的載入

在hive的mydb資料庫裡面有一張student表,下面來載入它

df= @udf spk by spk.load_hive with (mydb.student)

 

4. 對資料進行切割,提取對於日誌分析資料來說,最重要的一步就是對資料進行切割,提取,這樣才能進行下一步的分析。下面以美國宇航局肯尼迪航天中心WEB日誌為例進行說明。

資料的下載地址為

http://ita.ee.lbl.gov/html/contrib/NASA-HTTP.html

下面就到了至關重要的一步了,對資料進行正則化提取,提取出其中的主機名,時間戳,路徑,訪問狀態,返回的位元組數這5個欄位,原語命令如下

df1= @udf df by spk.reg_extract with (regexp_extract('value', r'^([^\s]+\s)', 1).alias("host"),

regexp_extract('value', r'^.*\[(\d\d/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]', 1).alias('timestamp'),

regexp_extract('value', r'^.*"\w+\s+([^\s]+)\s+HTTP.*"', 1).alias('path'),

regexp_extract('value', r'^.*"\s+([^\s]+)', 1).cast('integer').alias('status'),

regexp_extract('value', r'^.*\s+(\d+)$', 1).cast('integer').alias('content_size'))

#將df表的value欄位進行正則表示式提取出第一個匹配的主機名,將其重新命名為host列

將df表的value欄位進行正則表示式提取出第一個匹配的時間,將其重新命名為timestamp列

將df表的value欄位進行正則表示式提取出第一個匹配的路徑,將其重新命名為path列

將df表的value欄位進行正則表示式提取出第一個匹配的狀態碼,將它的型別轉化為int型別並將其重新命名為status列

將df表的value欄位進行正則表示式提取出第一個匹配的狀態碼,將它的型別轉化為int型別並將其重新命名為status列

將df表的value欄位進行正則表示式提取出第一個匹配的位元組數,將它的型別轉化為int型別並將其重新命名為content_size列

可以看到資料已經被切割成5列了

5. 清除無效語句

根據分析目標進行清洗得到所需要的資料,下面以fea經典的cd_esql為例進行說明

日誌的格式如下:

下面過濾掉日誌中的錯誤日誌

正常的日誌都包含有”-mylogger-”這個欄位內容,根據這個特徵過濾掉錯誤日誌。

 df1= @udf df by spk.filter with (instr('value', '- mylogger -')<> 0)

# instr('value', '- mylogger -'),value欄位如果不包含- mylogger -,返回0,否則返回它所在的索引。<>表示不為0,這樣就過濾掉了錯誤日誌。

6. 分割有效欄位

經過無效語句清洗,保留有效語句,但是還是不能滿足我們基礎DF表的要求,下面進行有效欄位的分割,提取。

有效的一條語句完整結構如下:

時間(精確到毫秒)/分割符(-mylogger-)/字串(info-)/語句(事件)

2016-03-29 13:56:13,748 /- mylogger -/ INFO -/ select * from people_trail01_dest where KSSJ>=2001-02-28T01:05:24.000Z

整條語句中就是時間與事件是分析統計有用的,要從整條語句中分割出來,

原語如下所示。

df2= @udf df1 by spk.opfield with (split(value,'- mylogger - ')[0] as d1:split(value,'- mylogger - ')[1] as event)

#將df1表的value欄位按照- mylogger –分割,第一個欄位並存儲到d1列中、提取第二個欄位儲存到event列中

可以看到event列還是不能滿足要求,再進行分割

7. 提取時間,日期欄位

對上面的資料提取天數

還有很多資料清洗攻略,我們將在下一篇繼續介紹,敬請期待!

FEA-spk簡單,強大,視覺化

不懂Java,Python同樣玩轉Spark

專門為資料分析師打造!