1. 程式人生 > >elasticsearch5.2.1 java util 工具類

elasticsearch5.2.1 java util 工具類

1.Es :操作es的具體類

package org.ufo;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.xpack.client.PreBuiltXPackTransportClient;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;

public class Es {

	// 連線
	private static TransportClient client;
	// 索引庫名稱,一般都是一個庫,只是裡邊的type不同,比如user/goods
	private static final String index = "testindex";

	public Es() {
		// 通過 setting物件來指定叢集配置資訊
		Settings settings = Settings.builder()//
				.put("client.transport.sniff", true)// 自動嗅探發現叢集節點
				.put("client.transport.ignore_cluster_name", true)// 忽略叢集名稱
				.put("xpack.security.user", "elastic:changeme")// 安全認證
				.build();
		client = new PreBuiltXPackTransportClient(settings);

		try {
			client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.33.111"), 9300));
			client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.33.112"), 9300));
		} catch (UnknownHostException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 
	 * @Description: 關閉連線
	 * @author kang
	 * @date 2017年5月11日
	 */
	public void close() {
		client.close();
	}

	/**
	 * 
	 * @Description: 驗證連結是否正常
	 * @author kang
	 * @date 2017年5月11日
	 */
	public boolean validate() {
		return client.connectedNodes().size() == 0 ? false : true;
	}

	/**
	 * 
	 * @Description:新增文件
	 * @author kang
	 * @date 2017年1月3日
	 */
	public void addDoc(String type, Object id, Object object) {
		client.prepareIndex(index, type, id.toString()).setSource(JSON.toJSONString(object)).get();
	}

	/**
	 * 
	 * @Description:更新文件
	 * @author kang
	 * @date 2017年1月3日
	 */
	public void updateDoc(String type, Object id, Object object) {
		client.prepareUpdate(index, type, id.toString()).setDoc(JSON.toJSONString(object)).get();
	}

	/**
	 * 
	 * @Description:刪除文件
	 * @author kang
	 * @date 2017年1月3日
	 */
	public void delDoc(String type, Object id) {
		client.prepareDelete(index, type, id.toString()).get();
	}

	/**
	 * 
	 * @Description: 分頁高亮查詢
	 * @author kang
	 * @date 2017年1月11日
	 */
	public Page getDocHighLight(String keywords, String type, Set<String> fields, int currentPage, int pageSize, boolean isHighlight) throws Exception {
		// 搜尋資料
		SearchResponse response = client.prepareSearch(index).setTypes(type)//
				.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)//
				.setQuery(QueryBuilders.multiMatchQuery(keywords, fields.toArray(new String[fields.size()]))// 查詢所有欄位
						.analyzer("ik_max_word"))// 分詞器
				.highlighter(new HighlightBuilder().preTags("<span style=\"color:red\">").postTags("</span>").field("*"))// 高亮標籤
				.setFrom((currentPage - 1) * pageSize).setSize(pageSize)// 分頁
				.setExplain(true)// 評分排序
				.execute().actionGet();

		// 獲取查詢結果集
		SearchHits searchHits = response.getHits();
		List<Object> result = Lists.newArrayList();
		// 反射填充高亮
		for (SearchHit hit : searchHits) {
			Map<String, Object> source = hit.getSource();
			if (isHighlight) {
				// 獲取對應的高亮域
				Map<String, HighlightField> highlight = hit.getHighlightFields();
				for (String field : fields) {
					// 從設定的高亮域中取得指定域
					HighlightField titleField = highlight.get(field);
					if (titleField == null) continue;
					// 取得定義的高亮標籤
					String texts = StringUtils.join(titleField.fragments());
					source.put(field, texts);
				}
			}
			result.add(JSON.toJSON(source));
		}
		return new Page(currentPage, pageSize, (int) searchHits.totalHits(), result);
	}

	/**
	 * 
	 * @Description: 重構索引(更新詞庫之後)
	 * @author kang
	 * @date 2017年5月16日
	 */
	public void reindex() {
		SearchResponse scrollResp = client.prepareSearch(index)//
				.setScroll(new TimeValue(60000))//
				.setQuery(QueryBuilders.matchAllQuery())//
				.setSize(100).get(); // max of 100 hits will be returned for
		// Scroll until no hits are returned
		do {
			for (SearchHit hit : scrollResp.getHits().getHits()) {
				client.prepareIndex(index, hit.getType(), hit.getId()).setSource(hit.getSourceAsString()).execute().actionGet();
			}
			scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
		} while (scrollResp.getHits().getHits().length != 0);
	}

}


2.EsFactory:Es的物件工廠

package org.ufo;

import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;

//連結工廠,池子的一些方法由我們實現
public class EsFactory extends BasePooledObjectFactory<Es> {

	// 建立物件
	@Override
	public Es create() throws Exception {
		System.err.println("new es !!!!");
		return new Es();
	}

	// 包裝物件
	@Override
	public PooledObject<Es> wrap(Es arg0) {
		return new DefaultPooledObject<Es>(arg0);
	}

	// 銷燬物件關閉連結
	@Override
	public void destroyObject(PooledObject<Es> p) throws Exception {
		p.getObject().close();
		System.err.println("destory es");
		super.destroyObject(p);
	}

	// 校驗物件是否正常
	@Override
	public boolean validateObject(PooledObject<Es> p) {
		System.err.println("validate es");
		return p.getObject().validate();
	}
}

3.EsUtil:實際使用類

package org.ufo;

import java.util.Set;

import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

// es工具類
public class EsUtil {

	// 初始化一個池子例項
	private static GenericObjectPool<Es> pool;

	static {
		// 池子配置檔案
		GenericObjectPoolConfig config = new GenericObjectPoolConfig();
		config.setMaxTotal(10);// 整個池最大值
		config.setMaxIdle(10);// 最大空閒
		config.setMinIdle(2);// 最小空閒
		config.setMaxWaitMillis(-1);// 最大等待時間,-1表示一直等
		config.setBlockWhenExhausted(true);// 當物件池沒有空閒物件時,新的獲取物件的請求是否阻塞。true阻塞。預設值是true
		config.setTestOnBorrow(false);// 在從物件池獲取物件時是否檢測物件有效,true是;預設值是false
		config.setTestOnReturn(false);// 在向物件池中歸還物件時是否檢測物件有效,true是,預設值是false
		config.setTestWhileIdle(true);// 在檢測空閒物件執行緒檢測到物件不需要移除時,是否檢測物件的有效性。true是,預設值是false
		config.setMinEvictableIdleTimeMillis(10 * 60000L); // 可發呆的時間,10mins
		config.setTestWhileIdle(true); // 發呆過長移除的時候是否test一下先

		pool = new GenericObjectPool<>(new EsFactory(), config);
	}

	/**
	 * 
	 * @Description:新增文件
	 * @author kang
	 * @date 2017年1月3日
	 */
	public static void addDoc(String type, Object id, Object object) {
		Es es = null;
		try {
			es = pool.borrowObject();
			es.addDoc(type, id, object);
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			pool.returnObject(es);
		}
	}

	/**
	 * 
	 * @Description:更新文件
	 * @author kang
	 * @date 2017年1月3日
	 */
	public static void updateDoc(String type, Object id, Object object) {
		Es es = null;
		try {
			es = pool.borrowObject();
			es.updateDoc(type, id, object);
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			pool.returnObject(es);
		}
	}

	/**
	 * 
	 * @Description:刪除文件
	 * @author kang
	 * @date 2017年1月3日
	 */
	public static void delDoc(String type, Object id) {
		Es es = null;
		try {
			es = pool.borrowObject();
			es.delDoc(type, id);
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			pool.returnObject(es);
		}
	}

	/**
	 * 
	 * @Description: 分頁高亮查詢
	 * @author kang
	 * @date 2017年1月11日
	 */
	public static Page getDocHighLight(String keywords, String type, Set<String> fields, int currentPage, int pageSize, boolean isHighlight) {
		Es es = null;
		try {
			es = pool.borrowObject();
			return es.getDocHighLight(keywords, type, fields, currentPage, pageSize, isHighlight);
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			pool.returnObject(es);
		}
		return null;
	}

	/**
	 * 
	 * @Description:重構索引
	 * @author kang
	 * @date 2017年1月3日
	 */
	public static void reindex() {
		Es es = null;
		try {
			es = pool.borrowObject();
			es.reindex();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			pool.returnObject(es);
		}
	}

}

4.分頁輔助類

package org.ufo;
import java.util.List;  
  
public class Page {  
  
    // 指定的或是頁面引數  
    private int currentPage; // 當前頁  
    private int pageSize; // 每頁顯示多少條  
  
    // 查詢資料庫  
    private int recordCount; // 總記錄數  
    private List<?> recordList; // 本頁的資料列表  
  
    // 計算  
    private int pageCount; // 總頁數  
    private int beginPageIndex; // 頁碼列表的開始索引(包含)  
    private int endPageIndex; // 頁碼列表的結束索引(包含)  
  
    /**  
     * 只接受前4個必要的屬性,會自動的計算出其他3個屬生的值  
     *   
     * @param currentPage  
     * @param pageSize  
     * @param recordCount  
     * @param recordList  
     */  
    public Page(int currentPage, int pageSize, int recordCount, List<?> recordList) {  
        this.currentPage = currentPage;  
        this.pageSize = pageSize;  
        this.recordCount = recordCount;  
        this.recordList = recordList;  
  
        // 計算總頁碼  
        pageCount = (recordCount + pageSize - 1) / pageSize;  
  
        // 計算 beginPageIndex 和 endPageIndex  
        // >> 總頁數不多於10頁,則全部顯示  
        if (pageCount <= 10) {  
            beginPageIndex = 1;  
            endPageIndex = pageCount;  
        }  
        // >> 總頁數多於10頁,則顯示當前頁附近的共10個頁碼  
        else {  
            // 當前頁附近的共10個頁碼(前4個 + 當前頁 + 後5個)  
            beginPageIndex = currentPage - 4;  
            endPageIndex = currentPage + 5;  
            // 當前面的頁碼不足4個時,則顯示前10個頁碼  
            if (beginPageIndex < 1) {  
                beginPageIndex = 1;  
                endPageIndex = 10;  
            }  
            // 當後面的頁碼不足5個時,則顯示後10個頁碼  
            if (endPageIndex > pageCount) {  
                endPageIndex = pageCount;  
                beginPageIndex = pageCount - 10 + 1;  
            }  
        }  
    }  
  
    public List<?> getRecordList() {  
        return recordList;  
    }  
  
    public void setRecordList(List<?> recordList) {  
        this.recordList = recordList;  
    }  
  
    public int getCurrentPage() {  
        return currentPage;  
    }  
  
    public void setCurrentPage(int currentPage) {  
        this.currentPage = currentPage;  
    }  
  
    public int getPageCount() {  
        return pageCount;  
    }  
  
    public void setPageCount(int pageCount) {  
        this.pageCount = pageCount;  
    }  
  
    public int getPageSize() {  
        return pageSize;  
    }  
  
    public void setPageSize(int pageSize) {  
        this.pageSize = pageSize;  
    }  
  
    public int getRecordCount() {  
        return recordCount;  
    }  
  
    public void setRecordCount(int recordCount) {  
        this.recordCount = recordCount;  
    }  
  
    public int getBeginPageIndex() {  
        return beginPageIndex;  
    }  
  
    public void setBeginPageIndex(int beginPageIndex) {  
        this.beginPageIndex = beginPageIndex;  
    }  
  
    public int getEndPageIndex() {  
        return endPageIndex;  
    }  
  
    public void setEndPageIndex(int endPageIndex) {  
        this.endPageIndex = endPageIndex;  
    }  
  
}  

5.測試

package org.ufo.test;

import java.util.HashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.junit.Test;
import org.ufo.EsUtil;
import org.ufo.Page;

import com.google.common.collect.Sets;

public class MyTest {

	@Test
	public void testgetone() throws Exception {
		HashSet<String> set = Sets.newHashSet();
		set.add("name");
		set.add("telephone");
		Page page = EsUtil.getDocHighLight("張", "user", set, 1, 10, true);
		System.out.println(page.getRecordList());
	}

	// 併發測試
	@Test
	public void testget() throws Exception {
		HashSet<String> set = Sets.newHashSet();
		set.add("name");

		ExecutorService pool = Executors.newFixedThreadPool(8);
		for (int i = 0; i < 500; i++) {
			pool.execute(new Runnable() {
				@Override
				public void run() {
					Page page = EsUtil.getDocHighLight("張", "user", set, 1, 10, true);
					System.err.println("搜尋成功條數為:" + page.getRecordCount());
				}
			});
		}
		pool.shutdown();
		while (!pool.awaitTermination(10000, TimeUnit.MILLISECONDS)) {
			System.out.println("執行超時!!!!!!");
			pool.shutdownNow();
		}
	}

	@Test
	public void testreindex() throws Exception {
		EsUtil.reindex();
	}
}