1. 程式人生 > >Elasticsearch+Hbase實現海量數據秒回查詢

Elasticsearch+Hbase實現海量數據秒回查詢

ont 都是 lease tor lines app req 空格 java開發

---------------------------------------------------------------------------------------------
[版權申明:本文系作者原創,轉載請註明出處]
文章出處: http://blog.csdn.net/sdksdk0/article/details/53966430
作者:朱培 ID:sdksdk0

--------------------------------------------------------------------------------------------

首先祝大家2017新年快樂,我今天分享的是通過ElasticSearch與hbase進行整合的一個搜索案例,這個案例涉及的技術面比較廣,首先你得有JAVAEE的基礎,要會SSM,而且還要會大數據中的hdfs、zookeeper、hbase以及ElasticSearch和kibana。環境部署在4臺centos7上。主機名為node1-node4。這裏假設你已經安裝好了zookeeper、hadoop、hbase和ElasticSearch還有kibana,我這裏使用的是hadoop2.5.2,ElasticSearch用的你是2.2,kibana是4.4.1。我這裏的環境是 hadoop是4臺在node1-node4, zookeeper是3臺再node1-node3,,ElasticSearch是3臺在node1-node3,kibana是一臺在node1上。該系統可以對億萬數據查詢進行秒回,是一般的關系型數據庫很難做到的。在IntelliJ IDEA 中進行代碼編寫。環境搭建我這裏就不啰嗦,相信大家作為一名由經驗的開發人員來說都是小事一樁。文末提供源碼下載鏈接。

一、ElasticSearch和Hbase

ElasticSearch是一個基於Lucene的搜索服務器。它提供了一個分布式多用戶能力的全文搜索引擎,基於RESTful web接口。Elasticsearch是用Java開發的,並作為Apache許可條款下的開放源碼發布,是當前流行的企業級搜索引擎。設計用於雲計算中,能夠達到實時搜索,穩定,可靠,快速,安裝使用方便。 Elasticsearch的性能是solr的50倍。

HBase – Hadoop Database,是一個高可靠性、高性能、面向列、可伸縮、
實時讀寫的分布式數據庫
– 利用Hadoop HDFS作為其文件存儲系統,利用Hadoop MapReduce來處理
HBase中的海量數據,利用Zookeeper作為其分布式協同服務
– 主要用來存儲非結構化和半結構化的松散數據(列存 NoSQL 數據庫)

二、需求分析&服務器環境設置

主要是做一個文章的搜索。有文章標題、作者、摘要、內容四個主要信息。效果圖如下:這裏樣式我就沒怎麽設置了。。。。想要好看一點的可以自己加css。

技術分享

服務器:

在3臺centos7中部署,主機名為node1-node3.安裝好ElasticSearch並配置好集群,

1. 解壓

2. 修改config/elasticsearch.yml (註意要頂格寫,冒號後面要加一個空格)

a) Cluster.name: tf (同一集群要一樣)

b) Node.name: node-1 (同一集群要不一樣)

c) Network.Host: 192.168.44.137 這裏不能寫127.0.0.1

3. 解壓安裝kibana

4. 再congfig目錄下的kibana.yml中修改elasticsearch.url

5. 安裝插件

Step 1: Install Marvel into Elasticsearch:

bin/plugin install license
bin/plugin install marvel-agent

Step 2: Install Marvel into Kibana

bin/kibana plugin --install elasticsearch/marvel/latest

Step 3: Start Elasticsearch and Kibana

bin/elasticsearch
bin/kibana

啟動好elasticsearch集群後,

然後啟動zookeeper、hdfs、hbase。zkService.sh start 、start-all.sh、start-hbase.sh。

接下來就是剩下編碼步驟了。

技術分享

三、編碼開發

1、首先在IntelliJ IDEA中新建一個maven工程,加入如下依賴。

<dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.9</version>
        </dependency>


        <!-- spring 3.2 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>3.2.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-orm</artifactId>
            <version>3.2.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aspects</artifactId>
            <version>3.2.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>3.2.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>3.2.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>3.2.0.RELEASE</version>
        </dependency>

        <!-- JSTL -->
        <dependency>
            <groupId>jstl</groupId>
            <artifactId>jstl</artifactId>
            <version>1.2</version>
        </dependency>
        <dependency>
            <groupId>taglibs</groupId>
            <artifactId>standard</artifactId>
            <version>1.1.2</version>
        </dependency>
        <!-- slf4j -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.10</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.10</version>
        </dependency>

        <!-- elasticsearch -->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>2.2.0</version>
        </dependency>

        <!-- habse -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.1.3</version>
            <exclusions>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>guava</artifactId>
                </exclusion>
            </exclusions>
        </dependency>


    </dependencies>


2、Dao層

private Integer id;
	private String title;
	
	private String describe;
	
	private String content;
	
	private String author;


實現其getter/setter方法。

3、數據準備

在桌面新建一個doc1.txt文檔,用於把我們需要查詢的數據寫入到裏面,這裏我只準備了5條數據。中間用tab鍵隔開。

技術分享

4、在hbase中建立表。表名師doc,列族是cf。

public static void main(String[] args) throws Exception {
      HbaseUtils hbase = new HbaseUtils();
      //創建一張表 	hbase.createTable("doc","cf");

}

/**  * 創建一張表  * @param tableName  * @param column  * @throws Exception  */ public void createTable(String tableName, String column) throws Exception {
   if(admin.tableExists(TableName.valueOf(tableName))){
      System.out.println(tableName+"表已經存在!");
   }else{
      HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
      tableDesc.addFamily(new HColumnDescriptor(column.getBytes()));
      admin.createTable(tableDesc);
      System.out.println(tableName+"表創建成功!");
   }
}

5、導入索引。這一步的時候確保你的hdfs和hbase以及elasticsearch是處於開啟狀態。

@Test
    public void createIndex() throws Exception {
        List<Doc> arrayList = new ArrayList<Doc>();
        File file = new File("C:\\Users\\asus\\Desktop\\doc1.txt");
        List<String> list = FileUtils.readLines(file,"UTF8");
        for(String line : list){
            Doc Doc = new Doc();
            String[] split = line.split("\t");
            System.out.print(split[0]);
            int parseInt = Integer.parseInt(split[0].trim());
            Doc.setId(parseInt);
            Doc.setTitle(split[1]);
            Doc.setAuthor(split[2]);
            Doc.setDescribe(split[3]);
            Doc.setContent(split[3]);
            arrayList.add(Doc);
        }
        HbaseUtils hbaseUtils = new HbaseUtils();
        for (Doc Doc : arrayList) {
            try {
                //把數據插入hbase
                hbaseUtils.put(hbaseUtils.TABLE_NAME, Doc.getId()+"", hbaseUtils.COLUMNFAMILY_1, hbaseUtils.COLUMNFAMILY_1_TITLE, Doc.getTitle());
                hbaseUtils.put(hbaseUtils.TABLE_NAME, Doc.getId()+"", hbaseUtils.COLUMNFAMILY_1, hbaseUtils.COLUMNFAMILY_1_AUTHOR, Doc.getAuthor());
                hbaseUtils.put(hbaseUtils.TABLE_NAME, Doc.getId()+"", hbaseUtils.COLUMNFAMILY_1, hbaseUtils.COLUMNFAMILY_1_DESCRIBE, Doc.getDescribe());
                hbaseUtils.put(hbaseUtils.TABLE_NAME, Doc.getId()+"", hbaseUtils.COLUMNFAMILY_1, hbaseUtils.COLUMNFAMILY_1_CONTENT, Doc.getContent());
                //把數據插入es
                Esutil.addIndex("tfjt","doc", Doc);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


數據導入成功之後可以在服務器上通過命令查看一下:

curl -XGET http://node1:9200/tfjt/_search

技術分享

7、搜索。

在這裏新建了一個工具類Esutil.java,主要用於處理搜索的。註意,我們默認的elasticsearch是9200端口的,這裏數據傳輸用的是9300,不要寫成9200了,然後就是集群名字為tf,也就是前面配置的集群名。還有就是主機名node1-node3,這裏不能寫ip地址,如果是本地測試的話,你需要在你的window下面配置hosts文件。

public class Esutil {
	public static Client client = null;

		/**
		 * 獲取客戶端
		 * @return
		 */
		public static  Client getClient() {
			if(client!=null){
				return client;
			}
			Settings settings = Settings.settingsBuilder().put("cluster.name", "tf").build();
			try {
				client = TransportClient.builder().settings(settings).build()
						.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("node1"), 9300))
						.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("node2"), 9300))
						.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("node3"), 9300));
			} catch (UnknownHostException e) {
				e.printStackTrace();
			}
			return client;
		}
	
	
	
	
	public static String addIndex(String index,String type,Doc Doc){
		HashMap<String, Object> hashMap = new HashMap<String, Object>();
		hashMap.put("id", Doc.getId());
		hashMap.put("title", Doc.getTitle());
		hashMap.put("describe", Doc.getDescribe());
		hashMap.put("author", Doc.getAuthor());
		
		IndexResponse response = getClient().prepareIndex(index, type).setSource(hashMap).execute().actionGet();
		return response.getId();
	}
	
	
	public static Map<String, Object> search(String key,String index,String type,int start,int row){
		SearchRequestBuilder builder = getClient().prepareSearch(index);
		builder.setTypes(type);
		builder.setFrom(start);
		builder.setSize(row);
		//設置高亮字段名稱
		builder.addHighlightedField("title");
		builder.addHighlightedField("describe");
		//設置高亮前綴
		builder.setHighlighterPreTags("<font color=‘red‘ >");
		//設置高亮後綴
		builder.setHighlighterPostTags("</font>");
		builder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
		if(StringUtils.isNotBlank(key)){
//			builder.setQuery(QueryBuilders.termQuery("title",key));
			builder.setQuery(QueryBuilders.multiMatchQuery(key, "title","describe"));
		}
		builder.setExplain(true);
		SearchResponse searchResponse = builder.get();
		
		SearchHits hits = searchResponse.getHits();
		long total = hits.getTotalHits();
		Map<String, Object> map = new HashMap<String,Object>();
		SearchHit[] hits2 = hits.getHits();
		map.put("count", total);
		List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
		for (SearchHit searchHit : hits2) {
			Map<String, HighlightField> highlightFields = searchHit.getHighlightFields();
			HighlightField highlightField = highlightFields.get("title");
			Map<String, Object> source = searchHit.getSource();
			if(highlightField!=null){
				Text[] fragments = highlightField.fragments();
				String name = "";
				for (Text text : fragments) {
					name+=text;
				}
				source.put("title", name);
			}
			HighlightField highlightField2 = highlightFields.get("describe");
			if(highlightField2!=null){
				Text[] fragments = highlightField2.fragments();
				String describe = "";
				for (Text text : fragments) {
					describe+=text;
				}
				source.put("describe", describe);
			}
			list.add(source);
		}
		map.put("dataList", list);
		return map;
	}

//	public static void main(String[] args) {
//		Map<String, Object> search = Esutil.search("hbase", "tfjt", "doc", 0, 10);
//		List<Map<String, Object>> list = (List<Map<String, Object>>) search.get("dataList");
//	}
}

8、使用spring控制層處理

在裏面的spring配置這裏就不說了,代碼文末提供。

@RequestMapping("/search.do")
	public String serachArticle(Model model,
			@RequestParam(value="keyWords",required = false) String keyWords,
			@RequestParam(value = "pageNum", defaultValue = "1") Integer pageNum,
			@RequestParam(value = "pageSize", defaultValue = "3") Integer pageSize){
		try {
			keyWords = new String(keyWords.getBytes("ISO-8859-1"),"UTF-8");
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		}
		Map<String,Object> map = new HashMap<String, Object>();
		int count = 0;
		try {
			map = Esutil.search(keyWords,"tfjt","doc",(pageNum-1)*pageSize, pageSize);
			count = Integer.parseInt(((Long) map.get("count")).toString());
		} catch (Exception e) {
			logger.error("查詢索引錯誤!{}",e);
			e.printStackTrace();
		}
		PageUtil<Map<String, Object>> page = new PageUtil<Map<String, Object>>(String.valueOf(pageNum),String.valueOf(pageSize),count);
		List<Map<String, Object>> articleList = (List<Map<String, Object>>)map.get("dataList");
		page.setList(articleList);
		model.addAttribute("total",count);
		model.addAttribute("pageNum",pageNum);
		model.addAttribute("page",page);
		model.addAttribute("kw",keyWords);
		return "index.jsp";
	}

9、頁面

<center>
<form action="search.do" method="get">
  <input type="text" name="keyWords" />
  <input type="submit" value="百度一下">
  <input type="hidden" value="1" name="pageNum">
</form>
<c:if test="${! empty page.list }">
<h3>百度為您找到相關結果約${total}個</h3>
<c:forEach items="${page.list}" var="bean">
  <a href="/es/detailDocById/${bean.id}.do">${bean.title}</a>
  <br/>
  <br/>
  <span>${bean.describe}</span>
  <br/>
  <br/>
</c:forEach>

<c:if test="${page.hasPrevious }">
  <a href="search.do?pageNum=${page.previousPageNum }&keyWords=${kw}"> 上一頁</a>
</c:if>
<c:forEach begin="${page.everyPageStart }" end="${page.everyPageEnd }" var="n">
  <a href="search.do?pageNum=${n }&keyWords=${kw}"> ${n }</a>   
</c:forEach>

<c:if test="${page.hasNext }">
  <a href="search.do?pageNum=${page.nextPageNum }&keyWords=${kw}"> 下一頁</a>
</c:if>
</c:if>
</center>

10、項目發布

在IntelliJ IDEA 中配置好常用的項目,這裏發布名Application context名字為es,當然你也可以自定義設置。

技術分享


最終效果如下:搜索COS會得到結果,速度非常快。

技術分享

總結:這個案例的操作流程還是挺多的,要有細心和耐心,特別是服務器配置,各種版本要匹配好,不然會出各種頭疼的問題,當然了,這個還是需要有一定基礎,不然搞不定這個事情。。。。。

源碼地址:https://github.com/sdksdk0/es

Elasticsearch+Hbase實現海量數據秒回查詢