java操作elasticsearch
阿新 • • 發佈:2018-01-01
upd 都是 2.4 port nlp mcr .json 不存在 artifacts
Elasticsearch是一個搜索引擎,建立在Lucene之上
集群 (cluster)
代表一個集群,集群中有多個節點,其中有一個為主節點,這個主節點是可以通過選舉產生的,主從節點是對於集群內部來說的。
es的一個概念就是去中心化,字面上理解就是無中心節點,這是對於集群外部來說的,因為從外部來看es集群,在邏輯上是個整體,
你與任何一個節點的通信和與整個es集群通信是等價的。
節點(node)
每一個運行實例稱為一個節點,每一個運行實例既可以在同一機器上,也可以在不同的機器上.所謂運行實例,就是一個服務器進程.
在測試環境內,可以在一臺服務器上運行多個服務器進程,在生產環境建議每臺服務器運行一個服務器進程
索引(index)
這裏的索引是名詞不是動詞,在elasticsearch裏面支持多個索引。類似於關系數據庫裏面每一個服務器可以支持多個數據庫一樣。
在每一索引下面又支持多種類型,類似於關系數據庫裏面的一個數據庫可以有多張表。但是本質上和關系數據庫有很大的區別。
分片(shards)
把一個索引分解為多個小的索引,每一個小的索引叫做分片。分片後就可以把各個分片分配到不同的節點中,構成分布式搜索
分片的數量只能在索引創建前指定,並且索引創建後不能更改
副本(replicas)
副本的作用一是提高系統的容錯性,當個某個節點某個分片損壞或丟失時可以從副本中恢復。二是提高es的查詢效率,es會自動對搜索請求進行負載均衡
recovery
代表數據恢復或叫數據重新分布,es在有節點加入或退出時會根據機器的負載對索引分片進行重新分配,掛掉的節點重新啟動時也會進行數據恢復。
river
代表es的一個數據源,也是其它存儲方式(如:數據庫)同步數據到es的一個方法。它是以插件方式存在的一個es服務,通過讀取river中的數據並把它索引到es中,
官方的river有couchDB的,RabbitMQ的,Twitter的,Wikipedia的,river這個功能將會在後面的文件中重點說到。
gateway
代表es索引的持久化存儲方式,es默認是先把索引存放到內存中,當內存滿了時再持久化到硬盤。當這個es集群關閉再重新啟動時就會從gateway中讀取索引數據。 es支持多種類型的gateway,有本地文件系統(默認),分布式文件系統,Hadoop的HDFS和amazon的s3雲存儲服務。---將各種集群狀態信息、索引配置信息等全部持久存放在網關中
discovery.zen
代表es的自動發現節點機制,es是一個基於p2p的系統,它先通過廣播尋找存在的節點,再通過多播協議來進行節點之間的通信,同時也支持點對點的交互。
Transport
代表es內部節點或集群與客戶端的交互方式,默認內部是使用tcp協議進行交互,同時它支持http協議(json格式)、thrift、servlet、
memcached、zeroMQ等的傳輸協議(通過插件方式集成)。
索引(Index)
ElaticSearch將數據存放在一個或多個索引當中。一個索引相當於一個數據庫,裏面存放用戶文檔數據。在底層,ElasticSearch實際上還是
使用Lucene完成讀寫數據的操作,ElasticSearch索引是由一個或多個Lucene索引組成,所以ES中的分片或副本實際上就是一個Lucene索引。
文檔(Document)
文檔是ES中主要的實體,所有ES的查詢都是基於存放在ES中文檔資源的查詢。每個文檔都是由各種域(Field)組成,每個域(Field)有一個名
稱和一個或多個值構成。實際上,從用戶的角度看,一個ES文檔就是一個JSON對象。
映射(Mapping)
映射用於定義文檔域的屬性,這些屬性包括分詞器,字段類型,存儲類型等。對於沒有定義的字段類型的屬性,ES可以自動通過其字段值進行識別。
類型(Type)
ES中每個文檔必須有一個類型定義。這裏的類型相當於數據庫當中的表,類型定義了字段映射(類似數據庫表結構),
這樣一來,每個索引可以包含多種文檔類型,而每種文檔類型定義一種映射關系。
路由(Routing)
ES給每個文檔建索引後,通過路由可以算出所查的文檔處在哪個分片上,因為在建立索引之初使用公式:shard = hash(routting) % number_of_pr
imary_shards進行文檔分配。routing值是一個任意的字符串,默認是文檔的ID,通過人工指定就可以控制文檔存放在哪個shard的位置了。
索引別名(Index Alias)
索引別名相當於快捷方式或軟鏈接,可以指向一個或多個索引,甚至可以指向帶路由的分片。
近實時性 near realtime (nrt)
Elasticsearch是一個近實時性的搜索平臺,所以對於剛建過的索引文件進行查詢時需要一個輕微的等待時間(通常為1秒)。
java操作elastic:
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <runSuite>**/MainTestSuite.class</runSuite> <elasticsearch.plugin.name>sql</elasticsearch.plugin.name> <elasticsearch.plugin.site>true</elasticsearch.plugin.site> <elasticsearch.plugin.jvm>true</elasticsearch.plugin.jvm> <elasticsearch.version>5.6.2</elasticsearch.version> <elasticsearch.rest.version>5.5.2</elasticsearch.rest.version> <slf4j.version>1.7.7</slf4j.version> <elasticsearch.plugin.classname>org.elasticsearch.plugin.nlpcn.SqlPlug</elasticsearch.plugin.classname> </properties> <repositories> <repository> <id>elasticsearch-releases</id> <url>https://artifacts.elastic.co/maven</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>4.0.2.RELEASE</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>${elasticsearch.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>${elasticsearch.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>rest</artifactId> <version>${elasticsearch.rest.version}</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>x-pack-transport</artifactId> <version>${elasticsearch.version}</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.4</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>15.0</version> </dependency> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-all</artifactId> <version>1.3</version> <scope>test</scope> </dependency> <dependency> <groupId>org.locationtech.spatial4j</groupId> <artifactId>spatial4j</artifactId> <version>0.6</version> </dependency> <dependency> <groupId>com.vividsolutions</groupId> <artifactId>jts</artifactId> <version>1.13</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.1.41</version> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.0.15</version> </dependency> <!-- LOGGING begin --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> </dependency> <!-- common-logging 實際調用slf4j --> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <version>${slf4j.version}</version> </dependency> <!-- java.util.logging 實際調用slf4j --> <dependency> <groupId>org.slf4j</groupId> <artifactId>jul-to-slf4j</artifactId> <version>${slf4j.version}</version> </dependency> <!-- LOGGING end --> <dependency> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> <version>2.5</version> <scope>provided</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies>
EsQuery
public class EsQuery { protected static Logger logger = LoggerFactory.getLogger(EsQuery.class); /** * 集群狀態 */ public void clusterStatus(){ ClusterAdminClient clusterAdminClient = ElasticUtil.getClusterClient().admin().cluster(); ClusterHealthResponse healths = clusterAdminClient.prepareHealth().get(); String clusterName = healths.getClusterName(); int numberOfDataNodes = healths.getNumberOfDataNodes(); int numberOfNodes = healths.getNumberOfNodes(); ClusterHealthStatus status = healths.getStatus(); System.out.println("集群名稱:"+clusterName); System.out.println("數據節點:"+numberOfDataNodes); System.out.println("正常節點:"+numberOfNodes); System.out.println("狀態值:"+status.name()); } /** * 判斷索引庫是否存在 * @param indexName * @return */ public boolean isIndexExists(String indexName) { IndicesExistsRequest inExistsRequest = new IndicesExistsRequest(indexName); IndicesExistsResponse inExistsResponse = ElasticUtil.getClusterClient().admin().indices() .exists(inExistsRequest).actionGet(); return inExistsResponse.isExists(); } /** * 創建索引 indexName 索引名稱 * @param indexName * @return * @throws IOException */ public boolean createIndex(String indexName){ if(isIndexExists(indexName)){ return false; } CreateIndexResponse response = ElasticUtil.getClusterClient().admin().indices().prepareCreate(indexName).execute().actionGet(); if(response.isAcknowledged()){ return true; } return false; } /** * 刪除索引庫 * @param indexName * @return */ public boolean dropIndex(String indexName) { if (!isIndexExists(indexName)) { return false; } else { DeleteIndexResponse dResponse = ElasticUtil.getClusterClient().admin().indices().prepareDelete(indexName).execute().actionGet(); if (dResponse.isAcknowledged()) { return true; }else{ return false; } } } /** * 采用standard分詞器-默認*/ public boolean addType(String indexName,String typeName){ XContentBuilder builder=null; try { builder = XContentFactory.jsonBuilder() .startObject() .startObject(typeName) .endObject() .endObject(); } catch (IOException e) { e.printStackTrace(); } PutMappingRequest mappingRequest = Requests.putMappingRequest(indexName).type(typeName).source(builder); try { PutMappingResponse mappingResponse = ElasticUtil.getClusterClient().admin().indices().putMapping(mappingRequest).actionGet(); if (mappingResponse.isAcknowledged()) { return true; }else{ return false; } } catch (IndexNotFoundException e) { System.out.println("索引不存在,創建失敗..."); } return false; } /** * 采用IK分詞器 */ public boolean addIKType(String indexName,String typeName){ XContentBuilder builder=null; try { builder = XContentFactory.jsonBuilder() .startObject() .startObject(typeName) .startObject("properties") .startObject("poi_id").field("type","integer").endObject() .startObject("poi_title").field("type","text").field("analyzer","ik_max_word").endObject() .startObject("poi_address").field("type","text").field("analyzer","ik_max_word").endObject() .startObject("poi_tags").field("type","text").field("analyzer","ik_max_word").endObject() .startObject("poi_phone").field("type","text").endObject() .endObject() .endObject() .endObject(); } catch (IOException e) { e.printStackTrace(); } PutMappingRequest mappingRequest = Requests.putMappingRequest(indexName).type(typeName).source(builder); try { PutMappingResponse mappingResponse = ElasticUtil.getClusterClient().admin().indices().putMapping(mappingRequest).actionGet(); if (mappingResponse.isAcknowledged()) { return true; }else{ return false; } } catch (IndexNotFoundException e) { System.out.println("索引不存在,創建失敗..."); } return false; } /** * 添加或修改數據 設置自己的id * @param id * @param json */ public static String insertOrUpdate(String indexName,String typeName,String id,Map<String, Object> json) { if(json==null){ return null; } IndexResponse response = ElasticUtil.getClusterClient().prepareIndex(indexName, typeName,id).setSource(json).execute().actionGet(); return response.getId(); } /** * 添加或修改數據 使用隨機id * @param json */ public String insertOrUpdate(String indexName,String typeName,Map<String, Object> json) { if(json==null){ return null; } IndexResponse response = ElasticUtil.getClusterClient().prepareIndex(indexName, typeName).setSource(json).execute().actionGet(); return response.getId(); } /** * 通過id查詢單條數據 * @param id * @return */ public GetResponse getResourceById(String indexName,String typeName,String id){ GetResponse response = ElasticUtil.getClusterClient().prepareGet(indexName,typeName, id).get(); //Map<String, Object> source = response.getSource(); return response; } /** * 刪除數據 * @param id */ public void deleteResourceByIds(String indexName,String typeName,String[] ids) { if(ids==null||ids.length<1){ return; } for(String id :ids){ ElasticUtil.getClusterClient().prepareDelete(indexName, typeName, id) .execute().actionGet(); System.out.println("刪除id: "+id); } System.out.println("delete over.."); } /** * 查詢 index/type 數據 * @param indexName */ public static void simpleQuery(String indexName,String typeName){ SearchRequestBuilder prepareSearch = ElasticUtil.getClusterClient().prepareSearch(indexName); if(typeName!=null && !"".equals(typeName.trim())){ prepareSearch.setTypes(typeName); } // prepareSearch.setFrom(1).setSize(10); SearchResponse response = prepareSearch.execute().actionGet(); SearchHit[] searchHits = response.getHits().getHits(); for (SearchHit sh : searchHits) { System.out.println(sh.getId()+"==>"+sh.getSource()); } System.out.println("total==> "+searchHits.length); } /** * 通過field字段過濾索引庫 * @param indexName * @param typeName * @param field * @param value */ public void matchFieldQuery(String indexName,String typeName,String field,String value){ QueryBuilder qb = QueryBuilders.matchQuery(field,value); SearchRequestBuilder requestBuilder = ElasticUtil.getClusterClient().prepareSearch(indexName); if(typeName!=null&&!"".equals(typeName.trim())){ requestBuilder.setTypes(typeName); } SearchResponse response = requestBuilder.setQuery(qb).execute().actionGet(); SearchHit[] searchHits = response.getHits().getHits(); for (SearchHit sh : searchHits) { System.out.println(sh.getId()+"==>"+sh.getSource()); } System.out.println("total==> "+searchHits.length); } /** * 通過多個field字段過濾索引庫 * @param indexName * @param typeName * @param field1 * @param field2 * @param value */ public void multiFieldMatchQuery(String indexName,String typeName,String field1,String field2,String value){ QueryBuilder qb = QueryBuilders.multiMatchQuery(value,field1, field2); SearchRequestBuilder requestBuilder = ElasticUtil.getClusterClient().prepareSearch(indexName); if(typeName!=null&&!"".equals(typeName.trim())){ requestBuilder.setTypes(typeName); } SearchResponse response = requestBuilder.setQuery(qb) .execute().actionGet(); SearchHit[] searchHits = response.getHits().getHits(); for (SearchHit sh : searchHits) { System.out.println(sh.getId()+"==>"+sh.getSource()); } } /** * 通過id 獲取多條數據*/ public void idsQuery(String indexName,String typeName,String[] ids){ IdsQueryBuilder qb = QueryBuilders.idsQuery().addIds(ids); SearchRequestBuilder requestBuilder = ElasticUtil.getClusterClient().prepareSearch(indexName); if(typeName!=null&&!"".equals(typeName.trim())){ requestBuilder.setTypes(typeName); } SearchResponse response = requestBuilder.setQuery(qb).execute().actionGet(); SearchHit[] searchHits = response.getHits().getHits(); for (SearchHit sh : searchHits) { System.out.println(sh.getId()+"==>"+sh.getSource()); } System.out.println("total==> "+searchHits.length); } }
java操作elasticsearch