1. 程式人生 > 實用技巧 >使用 Python 批量將資料插入到 ES中

使用 Python 批量將資料插入到 ES中

1. 插入資料

現在我們如果有大量的文件(例如10000000萬條文件)需要寫入es 的某條索引中,該怎麼辦呢?

1.1 順序插入

import time
from elasticsearch import Elasticsearch

es = Elasticsearch()

def timer(func):
    def wrapper(*args, **kwargs):
        start = time.time()
        res = func(*args, **kwargs)
        print('共耗時約 {:.2f} 秒'.format(time.time() - start))
        return res
    return wrapper

@timer
def create_data():
    """ 寫入資料 """
    for line in range(100):
        es.index(index='s2', doc_type='doc', body={'title': line})

if __name__ == '__main__':
    create_data()   # 執行結果大約耗時 7.79 秒

1.2 批量插入

import time
from elasticsearch import Elasticsearch
from elasticsearch import helpers

es = Elasticsearch()

def timer(func):
    def wrapper(*args, **kwargs):
        start = time.time()
        res = func(*args, **kwargs)
        print('共耗時約 {:.2f} 秒'.format(time.time() - start))
        return res

    return wrapper

@timer
def create_data():
    """ 寫入資料 """
    for line in range(100):
        es.index(index='s2', doc_type='doc', body={'title': line})

@timer
def batch_data():
    """ 批量寫入資料 """
    action = [{
        "_index": "s2",
        "_type": "doc",
        "_source": {
            "title": i
        }
    } for i in range(10000000)]
    helpers.bulk(es, action)


if __name__ == '__main__':
    # create_data()
    batch_data()  # MemoryError

我們通過elasticsearch模組匯入helper,通過helper.bulk來批量處理大量的資料。首先我們將所有的資料定義成字典形式,各欄位含義如下:

  • _index對應索引名稱,並且該索引必須存在。
  • _type對應型別名稱。
  • _source對應的字典內,每一篇文件的欄位和值,可有有多個欄位。

首先將每一篇文件(組成的字典)都整理成一個大的列表,然後,通過helper.bulk(es, action)將這個列表寫入到es物件中。
然後,這個程式要執行的話——你就要考慮,這個一千萬個元素的列表,是否會把你的記憶體撐爆(MemoryError)!很可能還沒到沒到寫入es那一步,卻因為列表過大導致記憶體錯誤而使寫入程式崩潰!很不幸,我的程式報錯了。下圖是我在生成列表的時候,觀察工作管理員的程序資訊,可以發現此時Python消耗了大量的系統資源,而執行es例項的Java虛擬機器卻沒什麼變動。

解決辦法是什麼呢?我們可以分批寫入,比如我們一次生成長度為一萬的列表,再迴圈著去把一千萬的任務完成。這樣, Python和Java虛擬機器達到負載均衡。

下面的示例測試10萬條資料分批寫入的速度

import time
from elasticsearch import Elasticsearch
from elasticsearch import helpers

es = Elasticsearch()

def timer(func):
    def wrapper(*args, **kwargs):
        start = time.time()
        res = func(*args, **kwargs)
        print('共耗時約 {:.2f} 秒'.format(time.time() - start))
        return res

    return wrapper
@timer
def batch_data():
    """ 批量寫入資料 """
    # 分批寫
    # for i in range(1, 10000001, 10000):
    #     action = [{
    #         "_index": "s2",
    #         "_type": "doc",
    #         "_source": {
    #             "title": k
    #         }
    #     } for k in range(i, i + 10000)]
    #     helpers.bulk(es, action)
    # 使用生成器
    for i in range(1, 100001, 1000):
        action = ({
            "_index": "s2",
            "_type": "doc",
            "_source": {
                "title": k
            }
        } for k in range(i, i + 1000))
        helpers.bulk(es, action)

if __name__ == '__main__':
    # create_data()
    batch_data()	# 耗時 93.53 s

1.3 批量插入優化

採用 Python 生成器

import time
from elasticsearch import Elasticsearch
from elasticsearch import helpers

es = Elasticsearch()

def timer(func):
    def wrapper(*args, **kwargs):
        start = time.time()
        res = func(*args, **kwargs)
        print('共耗時約 {:.2f} 秒'.format(time.time() - start))
        return res

    return wrapper
@timer
def gen():
    """ 使用生成器批量寫入資料 """
    action = ({
        "_index": "s2",
        "_type": "doc",
        "_source": {
            "title": i
        }
    } for i in range(100000))
    helpers.bulk(es, action)

if __name__ == '__main__':
    # create_data()
    # batch_data()
    gen()		# 約90s

參考文章:https://www.cnblogs.com/Neeo/articles/10788573.html