1. 程式人生 > >python bulk批量儲存elasticsearch資料

python bulk批量儲存elasticsearch資料

之前用kclpy讀取kinesis流資料,處理並儲存到elasticsearch中,現在發現elk中的資料展示與當前時間差越來越大(源資料增加了三倍)。閱讀kinesis文件進行相應分片、例項數擴充套件,均沒有明顯效果。

重新優化了下程式碼,使用了bulk批量儲存資料到elasticsearch,存放速率明顯提高。

相關示例程式碼:

from datetime import datetime
import pytz 
import time
from elasticsearch import Elasticsearch 
from elasticsearch.helpers import
bulk import json es = Elasticsearch(hosts=[{'host': "ip", 'port': "9200"}], http_auth=("username", "password")) def index_bulk(): ACTIONS = [] count = 0 for i in range(500): t = time.time() kinesisdict = { "priority": 0, "tags": {i}, "threshold
": 0, "kinesis": True, "env": "test", "region": "cn", "metric": "/var/log/sengled/bulk.log", "dataSource": "bulk", "service": "bulk", "status": "", "endpoint": "test-cn-inception-10.12.112.165",
"starttime": t, "product": "bulk", "step": 0, "value": "bulk", "ip": "10.12.112.165", "objectType": "dev", "endtime": t, "timestamp": t, "counterType": "" } count = i # kinesisdict = json.loads(json.dumps(bulk_json)) kdict = kinesisdict.copy() kdict['@timestamp'] = datetime.fromtimestamp(int(kinesisdict['timestamp']),pytz.timezone('Asia/Shanghai')) if kdict['starttime'] == 0: kdict['starttime'] = datetime.fromtimestamp(int(kinesisdict['timestamp']),pytz.timezone('Asia/Shanghai')) else: kdict['starttime'] = datetime.fromtimestamp(int(kinesisdict['starttime']),pytz.timezone('Asia/Shanghai')) if kdict['endtime'] == 0: kdict['endtime'] = datetime.fromtimestamp(int(kinesisdict['timestamp']),pytz.timezone('Asia/Shanghai')) else: kdict['endtime'] = datetime.fromtimestamp(int(kinesisdict['endtime']),pytz.timezone('Asia/Shanghai')) kdict['value'] = str(kinesisdict['value']) kdict['threshold'] = str(kinesisdict['threshold']) kdict['tags'] = str(kinesisdict['tags']) del kdict['timestamp'] action = { "_index": "kinesis-2018.07.19", "_type": "kinesisdata", "_source": kdict } ACTIONS.append(action) print(ACTIONS) bulk(es, ACTIONS, index = "kinesis-2018.11.28", raise_on_error=True) print("insert %s lines" % count) index_bulk()