1. 程式人生 > 其它 >使用Python Pandas處理億級資料

使用Python Pandas處理億級資料

原文:http://www.justinablog.com/archives/1357?utm_source=tuicool&utm_medium=referral

在資料分析領域,最熱門的莫過於Python和R語言,此前有一篇文章《別老扯什麼Hadoop了,你的資料根本不夠大》指出:只有在超過5TB資料量的規模下,Hadoop才是一個合理的技術選擇。這次拿到近億條日誌資料,千萬級資料已經是關係型資料庫的查詢分析瓶頸,之前使用過Hadoop對大量文字進行分類,這次決定採用Python來處理資料:

  • 硬體環境
    • CPU:3.5 GHz Intel Core i7
    • 記憶體:32 GB HDDR 3 1600 MHz
    • 硬碟:3 TB Fusion Drive
  • 資料分析工具
    • Python:2.7.6
    • Pandas:0.15.0
    • IPython notebook:2.0.0

源資料如下表所示:

Table

Size

Desc

ServiceLogs

98,706,832 rows x 14 columns

8.77 GB

交易日誌資料,每個交易會話可以有多條交易

ServiceCodes

286 rows × 8 columns

20 KB

交易分類的字典表

資料讀取

啟動IPython notebook,載入pylab環境:

ipython notebook --pylab=inline

Pandas提供了IO工具可以將大檔案分塊讀取,測試了一下效能,完整載入9800萬條資料也只需要263秒左右,還是相當不錯了。

import pandas as pd
reader = pd.read_csv('data/servicelogs', iterator=True)try:
    df = reader.get_chunk(100000000)except StopIteration:
    print "Iteration is stopped."

1百萬條

1千萬條

1億條

ServiceLogs

1 s

17 s

263 s

使用不同分塊大小來讀取再呼叫 pandas.concat

連線DataFrame,chunkSize設定在1000萬條左右速度優化比較明顯。

loop = TruechunkSize = 100000chunks = []while loop:
    try:
        chunk = reader.get_chunk(chunkSize)
        chunks.append(chunk)
    except StopIteration:
        loop = False
        print "Iteration is stopped."df = pd.concat(chunks, ignore_index=True)

下面是統計資料,Read Time是資料讀取時間,Total Time是讀取和Pandas進行concat操作的時間,根據資料總量來看,對5~50個DataFrame物件進行合併,效能表現比較好。

Chunk Size

Read Time (s)

Total Time (s)

Performance

100,000

224.418173

261.358521

200,000

232.076794

256.674154

1,000,000

213.128481

234.934142

√ √

2,000,000

208.410618

230.006299

√ √ √

5,000,000

209.460829

230.939319

√ √ √

10,000,000

207.082081

228.135672

√ √ √ √

20,000,000

209.628596

230.775713

√ √ √

50,000,000

222.910643

242.405967

100,000,000

263.574246

263.574246

如果使用Spark提供的Python Shell,同樣編寫Pandas載入資料,時間會短25秒左右,看來Spark對Python的記憶體使用都有優化。

資料清洗

Pandas提供了 DataFrame.describe 方法檢視資料摘要,包括資料檢視(預設共輸出首尾60行資料)和行列統計。由於源資料通常包含一些空值甚至空列,會影響資料分析的時間和效率,在預覽了資料摘要後,需要對這些無效資料進行處理。

首先呼叫 DataFrame.isnull() 方法檢視資料表中哪些為空值,與它相反的方法是 DataFrame.notnull() ,Pandas會將表中所有資料進行null計算,以True/False作為結果進行填充,如下圖所示:

Pandas的非空計算速度很快,9800萬資料也只需要28.7秒。得到初步資訊之後,可以對錶中空列進行移除操作。嘗試了按列名依次計算獲取非空列,和 DataFrame.dropna() 兩種方式,時間分別為367.0秒和345.3秒,但檢查時發現 dropna() 之後所有的行都沒有了,查了Pandas手冊,原來不加引數的情況下, dropna() 會移除所有包含空值的行。如果只想移除全部為空值的列,需要加上 axis 和 how 兩個引數:

df.dropna(axis=1, how='all')

共移除了14列中的6列,時間也只消耗了85.9秒。

接下來是處理剩餘行中的空值,經過測試,在 DataFrame.replace() 中使用空字串,要比預設的空值NaN節省一些空間;但對整個CSV檔案來說,空列只是多存了一個“,”,所以移除的9800萬 x 6列也只省下了200M的空間。進一步的資料清洗還是在移除無用資料和合並上。

對資料列的丟棄,除無效值和需求規定之外,一些表自身的冗餘列也需要在這個環節清理,比如說表中的流水號是某兩個欄位拼接、型別描述等,通過對這些資料的丟棄,新的資料檔案大小為4.73GB,足足減少了4.04G!

資料處理

使用 DataFrame.dtypes 可以檢視每列的資料型別,Pandas預設可以讀出int和float64,其它的都處理為object,需要轉換格式的一般為日期時間。DataFrame.astype() 方法可對整個DataFrame或某一列進行資料格式轉換,支援Python和NumPy的資料型別。

df['Name'] = df['Name'].astype(np.datetime64)

對資料聚合,我測試了 DataFrame.groupby 和 DataFrame.pivot_table 以及 pandas.merge ,groupby 9800萬行 x 3列的時間為99秒,連線表為26秒,生成透視表的速度更快,僅需5秒。

df.groupby(['NO','TIME','SVID']).count() # 分組fullData = pd.merge(df, trancodeData)[['NO','SVID','TIME','CLASS','TYPE']] # 連線actions = fullData.pivot_table('SVID', columns='TYPE', aggfunc='count') # 透視表

根據透視表生成的交易/查詢比例餅圖:

將日誌時間加入透視表並輸出每天的交易/查詢比例圖:

total_actions = fullData.pivot_table('SVID', index='TIME', columns='TYPE', aggfunc='count')total_actions.plot(subplots=False, figsize=(18,6), kind='area')

除此之外,Pandas提供的DataFrame查詢統計功能速度表現也非常優秀,7秒以內就可以查詢生成所有型別為交易的資料子表:

tranData = fullData[fullData['Type'] == 'Transaction']

該子表的大小為 [10250666 rows x 5 columns]。在此已經完成了資料處理的一些基本場景。實驗結果足以說明,在非“>5TB”資料的情況下,Python的表現已經能讓擅長使用統計分析語言的資料分析師遊刃有餘。