Elasticsearch java api(五) Bulk批量索引
阿新 • • 發佈:2018-12-26
這篇部落格介紹一下Elasticsearch對多個文件進行索引的簡便方法。Bulk api的支援可以實現一次請求執行批量的新增、刪除、更新等操作.Bulk操作使用的是UDP協議,UDP無法確保與ElasticSearch伺服器通訊時不丟失資料.
一、Bulk API
使用bulk命令時,REST API以_bulk
結尾,批量操作寫在json檔案中,官網給出的語法格式:
action_and_meta_data\n optional_source\n action_and_meta_data\n optional_source\n .... action_and_meta_data\n optional_source\n
也就是說每一個操作都有2行資料組成,末尾要回車換行。第一行用來說明操作命令和原資料、第二行是自定義的選項.舉個例子,同時執行插入2條資料、刪除一條資料, 新建bulkdata.json,寫入如下內容:
{ "create" : { "_index" : "blog", "_type" : "article", "_id" : "3" }} { "title":"title1","posttime":"2016-07-02","content":"內容一" } { "create" : { "_index" : "blog", "_type" : "article", "_id" : "4" }} { "title":"title2","posttime":"2016-07-03","content":"內容2" } { "delete":{"_index" : "blog", "_type" : "article", "_id" : "1" }}
執行:
$ curl -XPOST "http://localhost:9200/_bulk?pretty" --data-binary @bulkAdd.json { "took" : 11, "errors" : false, "items" : [ { "create" : { "_index" : "blog", "_type" : "article", "_id" : "13", "_version" : 1, "_shards" : { "total" : 1, "successful" : 1, "failed" : 0 }, "status" : 201 } } ] }
注意:行末要回車換行,不然會因為命令不能識別而出現錯誤.
$ curl -XPOST "http://localhost:9200/_bulk?pretty" --data-binary @bulkAdd.json
{
"error" : {
"root_cause" : [ {
"type" : "action_request_validation_exception",
"reason" : "Validation Failed: 1: no requests added;"
} ],
"type" : "action_request_validation_exception",
"reason" : "Validation Failed: 1: no requests added;"
},
"status" : 400
}
二、批量匯出
下面的例子是把索引庫中的文件以json格式批量匯出到檔案中,其中叢集名稱為”bropen”,索引庫名為”blog”,type為”article”,專案根目錄下新建files/bulk.txt,索引內容寫入bulk.txt中:
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHits;
public class ElasticSearchBulkOut {
public static void main(String[] args) {
try {
Settings settings = Settings.settingsBuilder()
.put("cluster.name", "bropen").build();// cluster.name在elasticsearch.yml
Client client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(
InetAddress.getByName("127.0.0.1"), 9300));
QueryBuilder qb = QueryBuilders.matchAllQuery();
SearchResponse response = client.prepareSearch("blog")
.setTypes("article").setQuery(QueryBuilders.matchAllQuery())
.execute().actionGet();
SearchHits resultHits = response.getHits();
File article = new File("files/bulk.txt");
FileWriter fw = new FileWriter(article);
BufferedWriter bfw = new BufferedWriter(fw);
if (resultHits.getHits().length == 0) {
System.out.println("查到0條資料!");
} else {
for (int i = 0; i < resultHits.getHits().length; i++) {
String jsonStr = resultHits.getHits()[i]
.getSourceAsString();
System.out.println(jsonStr);
bfw.write(jsonStr);
bfw.write("\n");
}
}
bfw.close();
fw.close();
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
三、批量匯入
從剛才匯出的bulk.txt檔案中按行讀取,然後bulk匯入。首先通過呼叫client.prepareBulk()
例項化一個BulkRequestBuilder物件,呼叫BulkRequestBuilder物件的add方法新增資料。實現程式碼:
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
public class ElasticSearchBulkIn {
public static void main(String[] args) {
try {
Settings settings = Settings.settingsBuilder()
.put("cluster.name", "bropen").build();// cluster.name在elasticsearch.yml中配置
Client client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(
InetAddress.getByName("127.0.0.1"), 9300));
File article = new File("files/bulk.txt");
FileReader fr=new FileReader(article);
BufferedReader bfr=new BufferedReader(fr);
String line=null;
BulkRequestBuilder bulkRequest=client.prepareBulk();
int count=0;
while((line=bfr.readLine())!=null){
bulkRequest.add(client.prepareIndex("test","article").setSource(line));
if (count%10==0) {
bulkRequest.execute().actionGet();
}
count++;
//System.out.println(line);
}
bulkRequest.execute().actionGet();
bfr.close();
fr.close();
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
參考文件:
掃碼向博主提問
中科院碩士_姚攀
部落格專家
熟悉Lucene、ES、ELK