使用 Python 批量將資料插入到 ES中
阿新 • • 發佈:2020-09-15
1. 插入資料
現在我們如果有大量的文件(例如10000000萬條文件)需要寫入es
的某條索引中,該怎麼辦呢?
1.1 順序插入
import time from elasticsearch import Elasticsearch es = Elasticsearch() def timer(func): def wrapper(*args, **kwargs): start = time.time() res = func(*args, **kwargs) print('共耗時約 {:.2f} 秒'.format(time.time() - start)) return res return wrapper @timer def create_data(): """ 寫入資料 """ for line in range(100): es.index(index='s2', doc_type='doc', body={'title': line}) if __name__ == '__main__': create_data() # 執行結果大約耗時 7.79 秒
1.2 批量插入
import time from elasticsearch import Elasticsearch from elasticsearch import helpers es = Elasticsearch() def timer(func): def wrapper(*args, **kwargs): start = time.time() res = func(*args, **kwargs) print('共耗時約 {:.2f} 秒'.format(time.time() - start)) return res return wrapper @timer def create_data(): """ 寫入資料 """ for line in range(100): es.index(index='s2', doc_type='doc', body={'title': line}) @timer def batch_data(): """ 批量寫入資料 """ action = [{ "_index": "s2", "_type": "doc", "_source": { "title": i } } for i in range(10000000)] helpers.bulk(es, action) if __name__ == '__main__': # create_data() batch_data() # MemoryError
我們通過elasticsearch模組匯入helper
,通過helper.bulk
來批量處理大量的資料。首先我們將所有的資料定義成字典形式,各欄位含義如下:
_index
對應索引名稱,並且該索引必須存在。_type
對應型別名稱。_source
對應的字典內,每一篇文件的欄位和值,可有有多個欄位。
首先將每一篇文件(組成的字典)都整理成一個大的列表,然後,通過helper.bulk(es, action)
將這個列表寫入到es物件中。
然後,這個程式要執行的話——你就要考慮,這個一千萬個元素的列表,是否會把你的記憶體撐爆(MemoryError
)!很可能還沒到沒到寫入es那一步,卻因為列表過大導致記憶體錯誤而使寫入程式崩潰!很不幸,我的程式報錯了。下圖是我在生成列表的時候,觀察工作管理員的程序資訊,可以發現此時Python消耗了大量的系統資源,而執行es例項的Java虛擬機器卻沒什麼變動。
解決辦法是什麼呢?我們可以分批寫入,比如我們一次生成長度為一萬的列表,再迴圈著去把一千萬的任務完成。這樣, Python和Java虛擬機器達到負載均衡。
下面的示例測試10萬條資料分批寫入的速度
import time
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es = Elasticsearch()
def timer(func):
def wrapper(*args, **kwargs):
start = time.time()
res = func(*args, **kwargs)
print('共耗時約 {:.2f} 秒'.format(time.time() - start))
return res
return wrapper
@timer
def batch_data():
""" 批量寫入資料 """
# 分批寫
# for i in range(1, 10000001, 10000):
# action = [{
# "_index": "s2",
# "_type": "doc",
# "_source": {
# "title": k
# }
# } for k in range(i, i + 10000)]
# helpers.bulk(es, action)
# 使用生成器
for i in range(1, 100001, 1000):
action = ({
"_index": "s2",
"_type": "doc",
"_source": {
"title": k
}
} for k in range(i, i + 1000))
helpers.bulk(es, action)
if __name__ == '__main__':
# create_data()
batch_data() # 耗時 93.53 s
1.3 批量插入優化
採用 Python
生成器
import time
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es = Elasticsearch()
def timer(func):
def wrapper(*args, **kwargs):
start = time.time()
res = func(*args, **kwargs)
print('共耗時約 {:.2f} 秒'.format(time.time() - start))
return res
return wrapper
@timer
def gen():
""" 使用生成器批量寫入資料 """
action = ({
"_index": "s2",
"_type": "doc",
"_source": {
"title": i
}
} for i in range(100000))
helpers.bulk(es, action)
if __name__ == '__main__':
# create_data()
# batch_data()
gen() # 約90s
參考文章:
https://www.cnblogs.com/Neeo/articles/10788573.html