1. 程式人生 > >elasticsearch2.1.0文件的相關操作

elasticsearch2.1.0文件的相關操作

這裡記錄一下elasticsearch2.1.0文件的相關的操作,程式碼都是自己經過測試可以直接使用的程式碼。

/**
 * es文件相關操作測試類
 * @author yujie.wang
 */
public class DocOperator {
	
	//es索引
	private static String INDEX = "megacorp";
	//索引megacorp下的型別
	private static String TYPE = "employee";
	
	
	public static void main(String [] args) throws Exception{
		GenEsClient client = new GenEsClient();
		DocOperator es = new DocOperator();
		if(args.length >= 1 && "node".equals(args[0])){
			System.out.println("genNodeClient");
			es.getDocument(client.getNodeClient(), INDEX, TYPE, "4");
		}else {
			System.out.println("indexgenTransportClient");
			//得到一條文件
			//es.getDocument(client.genTransportClient(), INDEX, TYPE, "4");
			//建立一個文件
			//es.IndexDocument(client.genTransportClient(), INDEX, TYPE, "5");
			//刪除一個文件
			//es.delDocument(client.genTransportClient(), INDEX, TYPE, "5");
			//更新一個文件
			//es.updateDocument(client.genTransportClient(), INDEX, TYPE, "4");
			//批量操作
			//es.bulk(client.genTransportClient());
			es.checkIndexStatus(INDEX,client.genTransportClient());
		}
		
	}
	
	
	
	public void checkIndexStatus(String index,Client client){
		ClusterStateResponse response = client.admin().cluster().prepareState().setIndices(index).execute().actionGet();
		ImmutableOpenMap<String, MappingMetaData> map = response.getState().getMetaData().index(index).getMappings();
		Iterator<ObjectObjectCursor<String, MappingMetaData>> it = map.iterator();
		while(it.hasNext()){
			ObjectObjectCursor<String, MappingMetaData> objectMap = (ObjectObjectCursor<String, MappingMetaData>)it.next();
			MappingMetaData mmd = objectMap.value;
			//System.out.println("result: "+mmd.toString());
		}
		
	}
	
	/**
	 * 從索引index中型別為type中獲得一個文件 id:id
	 * @param client
	 * @param index
	 * @param type
	 * @param id
	 */
	public void getDocument(Client client,String index,String type, String id){
		GetResponse response = client.prepareGet(index, type, id).get();
		System.out.println(response.getId() + response.getIndex() + response.getType() + response.getVersion()
				+ response.getSourceAsString());
	}
	

	
	/**
	 * 向索引index中型別為type中儲存一個文件 id:id
	 * @param client
	 * @param index
	 * @param type
	 * @param id
	 * @throws Exception 
	 */
	public void IndexDocument(Client client,String index,String type, String id) throws Exception{
		IndexResponse response = client.prepareIndex(index, type, id)
		        .setSource(XContentFactory.jsonBuilder()
					    .startObject()
				        .field("first_name", "yujie5")
				        .field("last_name", "wang5")
				        .field("age", "20")
				        .field("about", "I love to go rock climbing5")
				        .field("interests", "15")
				    .endObject()).get();
		System.out.println(response.isCreated() + response.getId() + response.getIndex()
				+ response.getType() + response.getVersion());
	}
	
	/**
	 * 從索引index中型別為type中刪除一個文件 id:id
	 * @param client
	 * @param index
	 * @param type
	 * @param id
	 */
	public void delDocument(Client client,String index,String type, String id){
		DeleteResponse response = client.prepareDelete(index, type, id).get();
		System.out.println(response.getId() + response.getIndex() + response.getType() + response.getVersion()
				+ response.isFound());
	}
	
	/**
	 * 更新索引index中型別為type中的一個文件 id:id
	 * @param client
	 * @param index
	 * @param type
	 * @param id
	 * @throws Exception 
	 */
	public void updateDocument(Client client,String index,String type, String id) throws Exception{
		UpdateResponse response = null;
		try {
			UpdateRequest updateRequest = new UpdateRequest();
			updateRequest.index(index);
			updateRequest.type(type);
			updateRequest.id(id);
			updateRequest.doc(XContentFactory.jsonBuilder()
				    .startObject()
			        .field("first_name", "yujie4")
			    .endObject());
			response = client.update(updateRequest).get();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		System.out.println(response.getId() + response.getIndex() + response.getType() + response.getVersion());
	}
	
	/**
	 * 存在更新 不存在則直接索引一個新文件
	 * @param client
	 * @param index
	 * @param type
	 * @param id
	 */
	public void updateAndInsert(Client client,String index,String type, String id){
            
		try {
			IndexRequest indexRequest = new IndexRequest(index, type, id)
	        .source("json content");
			UpdateRequest updateRequest = new UpdateRequest(index, type, id)
			        .doc("json content")
			        .upsert(indexRequest);  
			client.update(updateRequest).get();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	/**
	 * 根據給定的索引index 獲得多個文件
	 * @param client
	 */
	public void getMultiDocument(Client client){
		MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
			    .add("twitter", "tweet", "1")           
			    .add("twitter", "tweet", "2", "3", "4") 
			    .add("another", "type", "foo")          
			    .get();

			for (MultiGetItemResponse itemResponse : multiGetItemResponses) { 
			    GetResponse response = itemResponse.getResponse();
			    if (response.isExists()) {                      
			        String json = response.getSourceAsString(); 
			    }
			}
	}
	
	/**
	 * 可以執行批量索引和批量刪除
	 * @param client
	 * @throws Exception 
	 */
	public void bulk(Client client) throws Exception{
		BulkRequestBuilder bulkRequest = client.prepareBulk();

		// either use client#prepare, or use Requests# to directly build index/delete requests
		bulkRequest.add(client.prepareIndex(INDEX, TYPE, "6")
		        .setSource(XContentFactory.jsonBuilder()
					    .startObject()
				        .field("first_name", "yujie6")
				        .field("last_name", "wang6")
				        .field("age", "26")
				        .field("about", "I love to go rock climbing6")
				        .field("interests", "16")
				    .endObject())
		        );

		bulkRequest.add(client.prepareDelete(INDEX, TYPE, "3"));
		BulkResponse bulkResponse = bulkRequest.get();
		if (bulkResponse.hasFailures()) {
		    // process failures by iterating through each bulk response item
			System.out.println("found failed");
		}
	}
	
	
	public void bulkProcess(Client client){
		BulkProcessor bulkProcessor = BulkProcessor.builder(
		        client,  
		        new BulkProcessor.Listener() {


					@Override
					public void afterBulk(long arg0, BulkRequest arg1,
							BulkResponse arg2) {
						// TODO Auto-generated method stub
						
					}

					@Override
					public void afterBulk(long arg0, BulkRequest arg1,
							Throwable arg2) {
						// TODO Auto-generated method stub
						
					}

					@Override
					public void beforeBulk(long arg0, BulkRequest arg1) {
						// TODO Auto-generated method stub
						
					} 
		        })
		        .setBulkActions(10000) 
		        .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) 
		        .setFlushInterval(TimeValue.timeValueSeconds(5)) 
		        .setConcurrentRequests(1) 
		        .build();
	}
	
	
	public void scorll(Client client ,String test){
		QueryBuilder qb = QueryBuilders.termQuery("multi", "test");

		SearchResponse scrollResp = client.prepareSearch(test)
		        .setSearchType(SearchType.SCAN)
		        .setScroll(new TimeValue(60000))
		        .setQuery(qb)
		        .setSize(100).execute().actionGet(); //100 hits per shard will be returned for each scroll
		//Scroll until no hits are returned
		while (true) {

		    for (SearchHit hit : scrollResp.getHits().getHits()) {
		        //Handle the hit...
		    }
		    scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
		    //Break condition: No hits are returned
		    if (scrollResp.getHits().getHits().length == 0) {
		        break;
		    }
		}
	}
	
	
	public void mult(Client client){
		SearchRequestBuilder srb1 = client.prepareSearch()
				.setQuery(QueryBuilders
				.queryStringQuery("elasticsearch"))
				.setSize(1);
		SearchRequestBuilder srb2 = client.prepareSearch()
				.setQuery(QueryBuilders.matchQuery("name", "kimchy"))
				.setSize(1);

		MultiSearchResponse sr = client.prepareMultiSearch()
		        .add(srb1)
		        .add(srb2)
		        .execute()
		        .actionGet();

		// You will get all individual responses from MultiSearchResponse#getResponses()
		long nbHits = 0;
		for (MultiSearchResponse.Item item : sr.getResponses()) {
		    SearchResponse response = item.getResponse();
		    nbHits += response.getHits().getTotalHits();
		}
	}
	
}