elasticsearch2.1.0文件的相關操作
阿新 • • 發佈:2019-01-02
這裡記錄一下elasticsearch2.1.0文件的相關的操作,程式碼都是自己經過測試可以直接使用的程式碼。
/** * es文件相關操作測試類 * @author yujie.wang */ public class DocOperator { //es索引 private static String INDEX = "megacorp"; //索引megacorp下的型別 private static String TYPE = "employee"; public static void main(String [] args) throws Exception{ GenEsClient client = new GenEsClient(); DocOperator es = new DocOperator(); if(args.length >= 1 && "node".equals(args[0])){ System.out.println("genNodeClient"); es.getDocument(client.getNodeClient(), INDEX, TYPE, "4"); }else { System.out.println("indexgenTransportClient"); //得到一條文件 //es.getDocument(client.genTransportClient(), INDEX, TYPE, "4"); //建立一個文件 //es.IndexDocument(client.genTransportClient(), INDEX, TYPE, "5"); //刪除一個文件 //es.delDocument(client.genTransportClient(), INDEX, TYPE, "5"); //更新一個文件 //es.updateDocument(client.genTransportClient(), INDEX, TYPE, "4"); //批量操作 //es.bulk(client.genTransportClient()); es.checkIndexStatus(INDEX,client.genTransportClient()); } } public void checkIndexStatus(String index,Client client){ ClusterStateResponse response = client.admin().cluster().prepareState().setIndices(index).execute().actionGet(); ImmutableOpenMap<String, MappingMetaData> map = response.getState().getMetaData().index(index).getMappings(); Iterator<ObjectObjectCursor<String, MappingMetaData>> it = map.iterator(); while(it.hasNext()){ ObjectObjectCursor<String, MappingMetaData> objectMap = (ObjectObjectCursor<String, MappingMetaData>)it.next(); MappingMetaData mmd = objectMap.value; //System.out.println("result: "+mmd.toString()); } } /** * 從索引index中型別為type中獲得一個文件 id:id * @param client * @param index * @param type * @param id */ public void getDocument(Client client,String index,String type, String id){ GetResponse response = client.prepareGet(index, type, id).get(); System.out.println(response.getId() + response.getIndex() + response.getType() + response.getVersion() + response.getSourceAsString()); } /** * 向索引index中型別為type中儲存一個文件 id:id * @param client * @param index * @param type * @param id * @throws Exception */ public void IndexDocument(Client client,String index,String type, String id) throws Exception{ IndexResponse response = client.prepareIndex(index, type, id) .setSource(XContentFactory.jsonBuilder() .startObject() .field("first_name", "yujie5") .field("last_name", "wang5") .field("age", "20") .field("about", "I love to go rock climbing5") .field("interests", "15") .endObject()).get(); System.out.println(response.isCreated() + response.getId() + response.getIndex() + response.getType() + response.getVersion()); } /** * 從索引index中型別為type中刪除一個文件 id:id * @param client * @param index * @param type * @param id */ public void delDocument(Client client,String index,String type, String id){ DeleteResponse response = client.prepareDelete(index, type, id).get(); System.out.println(response.getId() + response.getIndex() + response.getType() + response.getVersion() + response.isFound()); } /** * 更新索引index中型別為type中的一個文件 id:id * @param client * @param index * @param type * @param id * @throws Exception */ public void updateDocument(Client client,String index,String type, String id) throws Exception{ UpdateResponse response = null; try { UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index(index); updateRequest.type(type); updateRequest.id(id); updateRequest.doc(XContentFactory.jsonBuilder() .startObject() .field("first_name", "yujie4") .endObject()); response = client.update(updateRequest).get(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(response.getId() + response.getIndex() + response.getType() + response.getVersion()); } /** * 存在更新 不存在則直接索引一個新文件 * @param client * @param index * @param type * @param id */ public void updateAndInsert(Client client,String index,String type, String id){ try { IndexRequest indexRequest = new IndexRequest(index, type, id) .source("json content"); UpdateRequest updateRequest = new UpdateRequest(index, type, id) .doc("json content") .upsert(indexRequest); client.update(updateRequest).get(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 根據給定的索引index 獲得多個文件 * @param client */ public void getMultiDocument(Client client){ MultiGetResponse multiGetItemResponses = client.prepareMultiGet() .add("twitter", "tweet", "1") .add("twitter", "tweet", "2", "3", "4") .add("another", "type", "foo") .get(); for (MultiGetItemResponse itemResponse : multiGetItemResponses) { GetResponse response = itemResponse.getResponse(); if (response.isExists()) { String json = response.getSourceAsString(); } } } /** * 可以執行批量索引和批量刪除 * @param client * @throws Exception */ public void bulk(Client client) throws Exception{ BulkRequestBuilder bulkRequest = client.prepareBulk(); // either use client#prepare, or use Requests# to directly build index/delete requests bulkRequest.add(client.prepareIndex(INDEX, TYPE, "6") .setSource(XContentFactory.jsonBuilder() .startObject() .field("first_name", "yujie6") .field("last_name", "wang6") .field("age", "26") .field("about", "I love to go rock climbing6") .field("interests", "16") .endObject()) ); bulkRequest.add(client.prepareDelete(INDEX, TYPE, "3")); BulkResponse bulkResponse = bulkRequest.get(); if (bulkResponse.hasFailures()) { // process failures by iterating through each bulk response item System.out.println("found failed"); } } public void bulkProcess(Client client){ BulkProcessor bulkProcessor = BulkProcessor.builder( client, new BulkProcessor.Listener() { @Override public void afterBulk(long arg0, BulkRequest arg1, BulkResponse arg2) { // TODO Auto-generated method stub } @Override public void afterBulk(long arg0, BulkRequest arg1, Throwable arg2) { // TODO Auto-generated method stub } @Override public void beforeBulk(long arg0, BulkRequest arg1) { // TODO Auto-generated method stub } }) .setBulkActions(10000) .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) .setFlushInterval(TimeValue.timeValueSeconds(5)) .setConcurrentRequests(1) .build(); } public void scorll(Client client ,String test){ QueryBuilder qb = QueryBuilders.termQuery("multi", "test"); SearchResponse scrollResp = client.prepareSearch(test) .setSearchType(SearchType.SCAN) .setScroll(new TimeValue(60000)) .setQuery(qb) .setSize(100).execute().actionGet(); //100 hits per shard will be returned for each scroll //Scroll until no hits are returned while (true) { for (SearchHit hit : scrollResp.getHits().getHits()) { //Handle the hit... } scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet(); //Break condition: No hits are returned if (scrollResp.getHits().getHits().length == 0) { break; } } } public void mult(Client client){ SearchRequestBuilder srb1 = client.prepareSearch() .setQuery(QueryBuilders .queryStringQuery("elasticsearch")) .setSize(1); SearchRequestBuilder srb2 = client.prepareSearch() .setQuery(QueryBuilders.matchQuery("name", "kimchy")) .setSize(1); MultiSearchResponse sr = client.prepareMultiSearch() .add(srb1) .add(srb2) .execute() .actionGet(); // You will get all individual responses from MultiSearchResponse#getResponses() long nbHits = 0; for (MultiSearchResponse.Item item : sr.getResponses()) { SearchResponse response = item.getResponse(); nbHits += response.getHits().getTotalHits(); } } }