elasticsearch5.2.1 java util 工具類
阿新 • • 發佈:2019-02-20
1.Es :操作es的具體類
package org.ufo; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.commons.lang3.StringUtils; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.elasticsearch.search.fetch.subphase.highlight.HighlightField; import org.elasticsearch.xpack.client.PreBuiltXPackTransportClient; import com.alibaba.fastjson.JSON; import com.google.common.collect.Lists; public class Es { // 連線 private static TransportClient client; // 索引庫名稱,一般都是一個庫,只是裡邊的type不同,比如user/goods private static final String index = "testindex"; public Es() { // 通過 setting物件來指定叢集配置資訊 Settings settings = Settings.builder()// .put("client.transport.sniff", true)// 自動嗅探發現叢集節點 .put("client.transport.ignore_cluster_name", true)// 忽略叢集名稱 .put("xpack.security.user", "elastic:changeme")// 安全認證 .build(); client = new PreBuiltXPackTransportClient(settings); try { client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.33.111"), 9300)); client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.33.112"), 9300)); } catch (UnknownHostException e) { e.printStackTrace(); } } /** * * @Description: 關閉連線 * @author kang * @date 2017年5月11日 */ public void close() { client.close(); } /** * * @Description: 驗證連結是否正常 * @author kang * @date 2017年5月11日 */ public boolean validate() { return client.connectedNodes().size() == 0 ? false : true; } /** * * @Description:新增文件 * @author kang * @date 2017年1月3日 */ public void addDoc(String type, Object id, Object object) { client.prepareIndex(index, type, id.toString()).setSource(JSON.toJSONString(object)).get(); } /** * * @Description:更新文件 * @author kang * @date 2017年1月3日 */ public void updateDoc(String type, Object id, Object object) { client.prepareUpdate(index, type, id.toString()).setDoc(JSON.toJSONString(object)).get(); } /** * * @Description:刪除文件 * @author kang * @date 2017年1月3日 */ public void delDoc(String type, Object id) { client.prepareDelete(index, type, id.toString()).get(); } /** * * @Description: 分頁高亮查詢 * @author kang * @date 2017年1月11日 */ public Page getDocHighLight(String keywords, String type, Set<String> fields, int currentPage, int pageSize, boolean isHighlight) throws Exception { // 搜尋資料 SearchResponse response = client.prepareSearch(index).setTypes(type)// .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)// .setQuery(QueryBuilders.multiMatchQuery(keywords, fields.toArray(new String[fields.size()]))// 查詢所有欄位 .analyzer("ik_max_word"))// 分詞器 .highlighter(new HighlightBuilder().preTags("<span style=\"color:red\">").postTags("</span>").field("*"))// 高亮標籤 .setFrom((currentPage - 1) * pageSize).setSize(pageSize)// 分頁 .setExplain(true)// 評分排序 .execute().actionGet(); // 獲取查詢結果集 SearchHits searchHits = response.getHits(); List<Object> result = Lists.newArrayList(); // 反射填充高亮 for (SearchHit hit : searchHits) { Map<String, Object> source = hit.getSource(); if (isHighlight) { // 獲取對應的高亮域 Map<String, HighlightField> highlight = hit.getHighlightFields(); for (String field : fields) { // 從設定的高亮域中取得指定域 HighlightField titleField = highlight.get(field); if (titleField == null) continue; // 取得定義的高亮標籤 String texts = StringUtils.join(titleField.fragments()); source.put(field, texts); } } result.add(JSON.toJSON(source)); } return new Page(currentPage, pageSize, (int) searchHits.totalHits(), result); } /** * * @Description: 重構索引(更新詞庫之後) * @author kang * @date 2017年5月16日 */ public void reindex() { SearchResponse scrollResp = client.prepareSearch(index)// .setScroll(new TimeValue(60000))// .setQuery(QueryBuilders.matchAllQuery())// .setSize(100).get(); // max of 100 hits will be returned for // Scroll until no hits are returned do { for (SearchHit hit : scrollResp.getHits().getHits()) { client.prepareIndex(index, hit.getType(), hit.getId()).setSource(hit.getSourceAsString()).execute().actionGet(); } scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet(); } while (scrollResp.getHits().getHits().length != 0); } }
2.EsFactory:Es的物件工廠
package org.ufo; import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; //連結工廠,池子的一些方法由我們實現 public class EsFactory extends BasePooledObjectFactory<Es> { // 建立物件 @Override public Es create() throws Exception { System.err.println("new es !!!!"); return new Es(); } // 包裝物件 @Override public PooledObject<Es> wrap(Es arg0) { return new DefaultPooledObject<Es>(arg0); } // 銷燬物件關閉連結 @Override public void destroyObject(PooledObject<Es> p) throws Exception { p.getObject().close(); System.err.println("destory es"); super.destroyObject(p); } // 校驗物件是否正常 @Override public boolean validateObject(PooledObject<Es> p) { System.err.println("validate es"); return p.getObject().validate(); } }
3.EsUtil:實際使用類
package org.ufo; import java.util.Set; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; // es工具類 public class EsUtil { // 初始化一個池子例項 private static GenericObjectPool<Es> pool; static { // 池子配置檔案 GenericObjectPoolConfig config = new GenericObjectPoolConfig(); config.setMaxTotal(10);// 整個池最大值 config.setMaxIdle(10);// 最大空閒 config.setMinIdle(2);// 最小空閒 config.setMaxWaitMillis(-1);// 最大等待時間,-1表示一直等 config.setBlockWhenExhausted(true);// 當物件池沒有空閒物件時,新的獲取物件的請求是否阻塞。true阻塞。預設值是true config.setTestOnBorrow(false);// 在從物件池獲取物件時是否檢測物件有效,true是;預設值是false config.setTestOnReturn(false);// 在向物件池中歸還物件時是否檢測物件有效,true是,預設值是false config.setTestWhileIdle(true);// 在檢測空閒物件執行緒檢測到物件不需要移除時,是否檢測物件的有效性。true是,預設值是false config.setMinEvictableIdleTimeMillis(10 * 60000L); // 可發呆的時間,10mins config.setTestWhileIdle(true); // 發呆過長移除的時候是否test一下先 pool = new GenericObjectPool<>(new EsFactory(), config); } /** * * @Description:新增文件 * @author kang * @date 2017年1月3日 */ public static void addDoc(String type, Object id, Object object) { Es es = null; try { es = pool.borrowObject(); es.addDoc(type, id, object); } catch (Exception e) { e.printStackTrace(); } finally { pool.returnObject(es); } } /** * * @Description:更新文件 * @author kang * @date 2017年1月3日 */ public static void updateDoc(String type, Object id, Object object) { Es es = null; try { es = pool.borrowObject(); es.updateDoc(type, id, object); } catch (Exception e) { e.printStackTrace(); } finally { pool.returnObject(es); } } /** * * @Description:刪除文件 * @author kang * @date 2017年1月3日 */ public static void delDoc(String type, Object id) { Es es = null; try { es = pool.borrowObject(); es.delDoc(type, id); } catch (Exception e) { e.printStackTrace(); } finally { pool.returnObject(es); } } /** * * @Description: 分頁高亮查詢 * @author kang * @date 2017年1月11日 */ public static Page getDocHighLight(String keywords, String type, Set<String> fields, int currentPage, int pageSize, boolean isHighlight) { Es es = null; try { es = pool.borrowObject(); return es.getDocHighLight(keywords, type, fields, currentPage, pageSize, isHighlight); } catch (Exception e) { e.printStackTrace(); } finally { pool.returnObject(es); } return null; } /** * * @Description:重構索引 * @author kang * @date 2017年1月3日 */ public static void reindex() { Es es = null; try { es = pool.borrowObject(); es.reindex(); } catch (Exception e) { e.printStackTrace(); } finally { pool.returnObject(es); } } }
4.分頁輔助類
package org.ufo;
import java.util.List;
public class Page {
// 指定的或是頁面引數
private int currentPage; // 當前頁
private int pageSize; // 每頁顯示多少條
// 查詢資料庫
private int recordCount; // 總記錄數
private List<?> recordList; // 本頁的資料列表
// 計算
private int pageCount; // 總頁數
private int beginPageIndex; // 頁碼列表的開始索引(包含)
private int endPageIndex; // 頁碼列表的結束索引(包含)
/**
* 只接受前4個必要的屬性,會自動的計算出其他3個屬生的值
*
* @param currentPage
* @param pageSize
* @param recordCount
* @param recordList
*/
public Page(int currentPage, int pageSize, int recordCount, List<?> recordList) {
this.currentPage = currentPage;
this.pageSize = pageSize;
this.recordCount = recordCount;
this.recordList = recordList;
// 計算總頁碼
pageCount = (recordCount + pageSize - 1) / pageSize;
// 計算 beginPageIndex 和 endPageIndex
// >> 總頁數不多於10頁,則全部顯示
if (pageCount <= 10) {
beginPageIndex = 1;
endPageIndex = pageCount;
}
// >> 總頁數多於10頁,則顯示當前頁附近的共10個頁碼
else {
// 當前頁附近的共10個頁碼(前4個 + 當前頁 + 後5個)
beginPageIndex = currentPage - 4;
endPageIndex = currentPage + 5;
// 當前面的頁碼不足4個時,則顯示前10個頁碼
if (beginPageIndex < 1) {
beginPageIndex = 1;
endPageIndex = 10;
}
// 當後面的頁碼不足5個時,則顯示後10個頁碼
if (endPageIndex > pageCount) {
endPageIndex = pageCount;
beginPageIndex = pageCount - 10 + 1;
}
}
}
public List<?> getRecordList() {
return recordList;
}
public void setRecordList(List<?> recordList) {
this.recordList = recordList;
}
public int getCurrentPage() {
return currentPage;
}
public void setCurrentPage(int currentPage) {
this.currentPage = currentPage;
}
public int getPageCount() {
return pageCount;
}
public void setPageCount(int pageCount) {
this.pageCount = pageCount;
}
public int getPageSize() {
return pageSize;
}
public void setPageSize(int pageSize) {
this.pageSize = pageSize;
}
public int getRecordCount() {
return recordCount;
}
public void setRecordCount(int recordCount) {
this.recordCount = recordCount;
}
public int getBeginPageIndex() {
return beginPageIndex;
}
public void setBeginPageIndex(int beginPageIndex) {
this.beginPageIndex = beginPageIndex;
}
public int getEndPageIndex() {
return endPageIndex;
}
public void setEndPageIndex(int endPageIndex) {
this.endPageIndex = endPageIndex;
}
}
5.測試
package org.ufo.test;
import java.util.HashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.ufo.EsUtil;
import org.ufo.Page;
import com.google.common.collect.Sets;
public class MyTest {
@Test
public void testgetone() throws Exception {
HashSet<String> set = Sets.newHashSet();
set.add("name");
set.add("telephone");
Page page = EsUtil.getDocHighLight("張", "user", set, 1, 10, true);
System.out.println(page.getRecordList());
}
// 併發測試
@Test
public void testget() throws Exception {
HashSet<String> set = Sets.newHashSet();
set.add("name");
ExecutorService pool = Executors.newFixedThreadPool(8);
for (int i = 0; i < 500; i++) {
pool.execute(new Runnable() {
@Override
public void run() {
Page page = EsUtil.getDocHighLight("張", "user", set, 1, 10, true);
System.err.println("搜尋成功條數為:" + page.getRecordCount());
}
});
}
pool.shutdown();
while (!pool.awaitTermination(10000, TimeUnit.MILLISECONDS)) {
System.out.println("執行超時!!!!!!");
pool.shutdownNow();
}
}
@Test
public void testreindex() throws Exception {
EsUtil.reindex();
}
}