ElasticSearch 資料增刪改實現
阿新 • • 發佈:2019-02-18
前言
Restful API 實現
建立索引
建立索引
curl -XPOST 'localhost:9200/customer?pretty'
插入資料
單條插入-指定id
curl -XPOST 'localhost:9200/customer/external/1?pretty' -d' {"name": "John Doe" }'
單條插入-不指定id
curl -XPOST 'localhost:9200/customer/external?pretty' -d' {"name": "Jane Doe" }'
批量插入:
curl -XPOST 'localhost:9200/bank/account/_bulk?pretty' --data-binary “@accounts.json"
刪除資料
刪除資料:下面的語句將執行刪除Customer中ID為2的資料
curl -XDELETE 'localhost:9200/customer/external/2?pretty'
根據查詢條件刪除(PS:這條本人沒試過,我用的還是2.4版本,這是參照官網資料的5.4版本寫的)
curl -XPOST 'localhost:9200/customer/external/_delete_by_query?pretty' -d '{
" query": {
"match": {
"name": "John"
}
}
}'
刪除全部
{
"query": {
"match_all": {}
}
}
更新資料
更新文件: 修改id=1的name屬性,並直接增加屬性和屬性值
curl -XPOST 'localhost:9200/customer/external/1/_update?pretty' -d ' {
"doc": {
"name": "xyd",
"age": 20
}
}'
更新索引–指令碼方式
curl -XPOST 'localhost:9200/customer/external/1/_update?pretty' -d' {
"script": "ctx._source.age += 5"
}'
Python API 實現
說明
以下程式碼實現是:單條增加、根據_id刪除、根據_id更新、批量增加等介面。除錯的時候建議一個一個功能執行。
程式碼
# -*- coding: utf-8 -*-
from elasticsearch.helpers import bulk
import elasticsearch
class ElasticSearchClient(object):
@staticmethod
def get_es_servers():
es_servers = [{
"host": "localhost",
"port": "9200"
}]
es_client = elasticsearch.Elasticsearch(hosts=es_servers)
return es_client
class LoadElasticSearch(object):
def __init__(self):
self.index = "hz"
self.doc_type = "xyd"
self.es_client = ElasticSearchClient.get_es_servers()
self.set_mapping()
def set_mapping(self):
"""
設定mapping
"""
mapping = {
self.doc_type: {
"properties": {
"document_id": {
"type": "integer"
},
"title": {
"type": "string"
},
"content": {
"type": "string"
}
}
}
}
if not self.es_client.indices.exists(index=self.index):
# 建立Index和mapping
self.es_client.indices.create(index=self.index, body=mapping, ignore=400)
self.es_client.indices.put_mapping(index=self.index, doc_type=self.doc_type, body=mapping)
def add_date(self, row_obj):
"""
單條插入ES
"""
_id = row_obj.get("_id", 1)
row_obj.pop("_id")
self.es_client.index(index=self.index, doc_type=self.doc_type, body=row_obj, id=_id)
def add_date_bulk(self, row_obj_list):
"""
批量插入ES
"""
load_data = []
i = 1
bulk_num = 2000 # 2000條為一批
for row_obj in row_obj_list:
action = {
"_index": self.index,
"_type": self.doc_type,
"_id": row_obj.get('_id', 'None'),
"_source": {
'document_id': row_obj.get('document_id', None),
'title': row_obj.get('title', None),
'content': row_obj.get('content', None),
}
}
load_data.append(action)
i += 1
# 批量處理
if len(load_data) == bulk_num:
print '插入', i / bulk_num, '批資料'
print len(load_data)
success, failed = bulk(self.es_client, load_data, index=self.index, raise_on_error=True)
del load_data[0:len(load_data)]
print success, failed
if len(load_data) > 0:
success, failed = bulk(self.es_client, load_data, index=self.index, raise_on_error=True)
del load_data[0:len(load_data)]
print success, failed
def update_by_id(self, row_obj):
"""
根據給定的_id,更新ES文件
:return:
"""
_id = row_obj.get("_id", 1)
row_obj.pop("_id")
self.es_client.update(index=self.index, doc_type=self.doc_type, body={"doc": row_obj}, id=_id)
def delete_by_id(self, _id):
"""
根據給定的id,刪除文件
:return:
"""
self.es_client.delete(index=self.index, doc_type=self.doc_type, id=_id)
if __name__ == '__main__':
write_obj = {
"_id": 1,
"document_id": 1,
"title": u"Hbase 測試資料",
"content": u"Hbase 日常運維,這是個假資料監控Hbase執行狀況。通常IO增加時io wait也會增加,現在FMS的機器正常情況......",
}
load_es = LoadElasticSearch()
# 插入單條資料測試
load_es.add_date(write_obj)
# 根據id更新測試
# write_obj["title"] = u"更新標題"
# load_es.update_by_id(write_obj)
# 根據id刪除測試
# load_es.delete_by_id(1)
# 批量插入資料測試
# row_obj_list = []
# for i in range(2, 2200):
# temp_obj = write_obj.copy()
# temp_obj["_id"] = i
# temp_obj["document_id"] = i
# row_obj_list.append(temp_obj)
# load_es.add_date_bulk(row_obj_list)
結果顯示
單條增加:
單條修改:
單條刪除:
批量增加: