python 處理大資料之資料讀取
1 參考1:python讀取GB級的文字資料,防止memoryError
Preliminary
我們談到“文字處理”時,我們通常是指處理的內容。Python 將文字檔案的內容讀入可以操作的字串變數非常容易。檔案物件提供了三個“讀”方法: .read()、.readline() 和 .readlines()。每種方法可以接受一個變數以限制每次讀取的資料量,但它們通常不使用變數。 .read() 每次讀取整個檔案,它通常用於將檔案內容放到一個字串變數中。然而 .read() 生成檔案內容最直接的字串表示,但對於連續的面向行的處理,它卻是不必要的,並且如果檔案大於可用記憶體,則不可能實現這種處理。下面是read()方法示例:
try:
f = open('/path/to/file', 'r')
print f.read()
finally:
if f:
f.close()
呼叫read()會一次性讀取檔案的全部內容,如果檔案有10G,記憶體就爆了,所以,要保險起見,可以反覆呼叫read(size)方法,每次最多讀取size個位元組的內容。另外,呼叫readline()可以每次讀取一行內容,呼叫readlines()一次讀取所有內容並按行返回list。因此,要根據需要決定怎麼呼叫。
如果檔案很小,read()一次性讀取最方便;如果不能確定檔案大小,反覆呼叫read(size)比較保險;如果是配置檔案,呼叫readlines()最方便:
for line in f.readlines():
process(line) # <do something with line>
Read In Chunks
處理大檔案是很容易想到的就是將大檔案分割成若干小檔案處理,處理完每個小檔案後釋放該部分記憶體。這裡用了iter & yield:
def read_in_chunks(filePath, chunk_size=1024*1024):
"""
Lazy function (generator) to read a file piece by piece.
Default chunk size: 1M
You can set your own chunk size
"""
file_object = open(filePath)
while True:
chunk_data = file_object.read(chunk_size)
if not chunk_data:
break
yield chunk_data
if __name__ == "__main__":
filePath = './path/filename'
for chunk in read_in_chunks(filePath):
process(chunk) # <do something with chunk>
Using with open()
with語句開啟和關閉檔案,包括丟擲一個內部塊異常。for line in f檔案物件f視為一個迭代器,會自動的採用緩衝IO和記憶體管理,所以你不必擔心大檔案。
#If the file is line based
with open(...) as f:
for line in f:
process(line) # <do something with line>
Conclusion
在使用python進行大檔案讀取時,應該讓系統來處理,使用最簡單的方式,交給直譯器,就管好自己的工作就行了。
參考2:利用python處理兩千萬條資料的一些經驗
先開啟目標檔案,寫入列名,再開啟原始檔案,按行讀取,按照條件判斷這一行是否為“髒”資料,不是的話再按照上面表格裡的要求進行處理,之後按行寫入目標檔案,這樣一來,電腦記憶體佔用率下降,電腦也就不會卡了,最後再將初步處理過的檔案利用pandas開啟,利用其中的DataFrame資料結構的方法進行去重,兩千萬條的資料五分鐘之內處理完成,以下為原始碼:
import csv
rows=[]
with open(r'C:\Users\Hanju\Desktop\uploadPortal(5).csv',"w", newline='') as _csvfile:
writer = csv.writer(_csvfile)
#先寫入columns_name
writer.writerow(["Dev_mac","Action","User_mac","User_mac_head","Bssid","WiFi","Time","Date"])
i=0
with open(r'D:\UploadPortalData\uploadPortal (5).csv',encoding='UTF-8') as csvfile:
readCSV=csv.reader(csvfile,delimiter=',')
for row in readCSV:
if(len(row)!=8):
continue
row1=[]
i+=1
row1.append(row[0].replace(':','')[-5:])
if row[2]=='auth':
row1.append('1')
elif row[2]=='deauth':
row1.append('2')
elif row[2]=='portal':
row1.append('3')
elif row[2]=='portalauth':
row1.append('4')
row1.append(str(row[3].replace(':','')))
row1.append(str(row[3].replace(':','')[0:6]))
if row[0]==row[4]:
row1.append('2')
else:
row1.append('5')
if 'City-WiFi-5G' in row[5]:
row1.append('2')
elif 'City-WiFi' in row[5]:
row1.append('1')
else:
row1.append('0')
row1.append(float(row[6])/86400.0-2.0/3.0+719530.0)
row1.append(row[7])
writer.writerow(row1)
print('Done')
print(i)
import pandas as pd
df=pd.read_csv(r'C:\Users\Hanju\Desktop\uploadPortal(5).csv')
#print(df.head())
#print(df.tail())
print(df.shape)
New_df=df.drop_duplicates(['Action','User_mac','Time'])
print(New_df.shape)
#print(New_df.head())
#print(New_df.tail())
New_df.to_csv(r'C:\Users\Hanju\Desktop\uploadPortal(5)_Final.csv')
print('Done')
參考3:
解決的方法:
1)使用SSCursor(流式遊標),避免客戶端佔用大量記憶體。(這個cursor實際上沒有快取下來任何資料,它不會讀取所有所有到記憶體中,它的做法是從儲存塊中讀取記錄,並且一條一條返回給你。)
2)使用迭代器而不用fetchall,即省記憶體又能很快拿到資料。
import MySQLdb.cursors
conn = MySQLdb.connect(host='ip地址', user='使用者名稱', passwd='密碼', db='資料庫名', port=3306,
charset='utf8', cursorclass = MySQLdb.cursors.SSCursor)
cur = conn.cursor()
cur.execute("SELECT * FROM bigtable");
row = cur.fetchone()
while row is not None:
do something
row = cur.fetchone()
cur.close()
conn.close()
需要注意的是,
1.因為SSCursor是沒有快取的遊標,結果集只要沒取完,這個conn是不能再處理別的sql,包括另外生成一個cursor也不行的。
如果需要幹別的,請另外再生成一個連線物件。
2. 每次讀取後處理資料要快,不能超過60s,否則mysql將會斷開這次連線,也可以修改 SET NET_WRITE_TIMEOUT = xx 來增加超時間隔。