IntelliJ IDEA 常用設定(配置)吐血整理(首次安裝必需)
首先,python 多執行緒不能充分利用多核CPU的計算資源(只能共用一個CPU),所以得用多程式。筆者從3.7億資料的索引,取200多萬的資料,從取資料到構造pandas dataframe總共大概用時14秒左右。每個分片用一個程式查詢資料,最後拼接出完整的結果。
由於返回的json資料量較大,每次100多萬到200多萬,如何快速根據json構造pandas 的dataframe是個問題 — 筆者測試過read_json()、json_normalize()、DataFrame(eval(pandas_json))及DataFrame.from_dict(),from_dict()速度最快
轉載請註明出處:https://www.cnblogs.com/NaughtyCat/p/how-to-get-all-results-from-es-by-scroll-python-version.html
- Elasticsearch scroll取資料— python版
原始碼如下:
def es_scroll(index, min_timestamp, max_timestamp, slice_no):
es = Elasticsearch('http://localhost:9200', timeout = 30, max_retries=10, retry_on_timeout=True)page = es.search(
index = index,
doc_type = "tls_book",
scroll = '1m',
body={
"slice": {
"id": slice_no,
"max": SLICES
},
"_source": [
"SrcIP"
],
"sort": ["_doc"
],
"query": {
"range" : {
"@timestamp" : {
"gte" : min_timestamp,
"lte" : max_timestamp,
"boost" : 2.0
}
}
}
},
version = False,
size = 10000)
sid = page['_scroll_id']
scroll_size = page['hits']['total'] # Start scrolling
df = pd.DataFrame()
appended_data = [] while (scroll_size > 0):
frame = pd.DataFrame.from_dict([document['_source'] for document in page["hits"]["hits"]])
appended_data.append(frame)
page = es.scroll(scroll_id = sid, scroll = '1m', request_timeout = 30)
# Update the scroll ID
sid = page['_scroll_id']
# Get the number of results that we returned in the last scroll
scroll_size = len(page['hits']['hits'])
if len(appended_data) > 0:
df = pd.concat(appended_data, ignore_index=True, sort = False)
del appended_data
gc.collect()
es.clear_scroll(body={'scroll_id': sid})
return df
注:
(1)通過"_source" 關鍵字,指定要取的欄位,可減少不必要的欄位,提高查詢速度
(2)官方檔案指出,通過"sort": [ "_doc"] —即按照_doc排序,可提高查詢效率
(3)根據自己的環境,測試合理的 size,效率會有數倍的差距。筆者環境(128G, 32核)一次取10000效能最好,網上大多測試,size取2000或者1000似乎較佳
(4)clear_scroll及時清理用完的scroll_id
(5)如果資料量較大,設定超時和重試次數(預設是10秒,否則超時會取不到資料),具體如下
timeout = 30, max_retries=10, retry_on_timeout=True
(6)Sliced scroll
如果返回的資料量特別大,可通過slice讓多個分片獨自來處理請求,如下(id從0開始):
"slice": {
"id": slice_no,
"max": SLICES
},
參考: https://www.elastic.co/guide/en/elasticsearch/reference/5.1/search-request-scroll.html#sliced-scroll
- python 多程式如何個函式傳多個引數
python多程式或者多執行緒要向呼叫的函式傳遞多個引數,需要構造引數元組集合,程式碼如下(本示例每個程式不同的只有es的slice_id):
def build_parameters(index, min_timestamp, max_timestamp):
parmeters =[]
for num in range(0, SLICES):
tuple_paremeter = (index, min_timestamp, max_timestamp, num)
parmeters.append(tuple_paremeter)
return parmeters
- python多程式例項
示例使用程式池,及starmap 傳遞呼叫的函式及引數 (with相當於try, excepion, finallly的集合,會自動做資源的釋放或關閉等)
with multiprocessing.Pool(processes = SLICES) as pool:
result = pool.starmap(es_scroll, parameters)
然後,拼接返回的dataframe 集合即可構造一個完整的dataframe,如下:
frame = pd.concat(result, ignore_index=True, sort = False)
*******************************************************************************************
精力有限,想法太多,專注做好一件事就行
- 我只是一個程式猿。5年內把程式碼寫好,技術部落格字字推敲,堅持零拷貝和原創
- 寫部落格的意義在於打磨文筆,訓練邏輯條理性,加深對知識的系統性理解;如果恰好又對別人有點幫助,那真是一件令人開心的事
*******************************************************************************************