1. 程式人生 > >Elasticsearch 高階優化

Elasticsearch 高階優化

  1. 索引優化(Optimized)

    Es在執行一段時間後,會出現分片數增多,刪除的記錄未及時清理,導致佔用很多的儲存空間,查詢效能也下降;但是在優化過程中,其他的請求會被阻止,知道優化完成;如果http請求斷開,優化的請求也會繼續在後臺執行;

$ curl -XPOST 'http://localhost:9200/twitter/_optimize'

管理索引優化

    optimize API允許通過API優化一個或多個索引。優化過程的操作基本上優化的索引搜尋速度更快(和涉及到Lucene索引內儲存每個碎片的段數)。優化操作允許減少的段數,把它們合併。

$ curl -XPOST

'http://localhost:9200/twitter/_optimize'

名稱

描述

max_num_segments

段數優化。要全面優化索引,將其設定為1。預設設定是隻需檢查是否需要執行合併,如果需要執行合併才執行合併操作。【經過測試越小速度越快】

only_expunge_deletes

優化過程中是否只合並帶刪除標記的段。在Lucene中,記錄不會被刪除,只是加了標記刪除。索引會在在合併過程中,建立一個新的沒有刪除標記記錄的分段。此標誌只允許合併段刪除。這個標記可以控制,只合並帶刪除標記的段,預設為

false。【設定為true docs才會合併】

refresh

重新整理。預設為true

flush

資料優化後是否進行資料重新整理進行優化。預設為true

wait_for_merge

請求是否等待合併結束。預設為true。注意,合併有可能是一個非常繁重的操作,所以為了能有感官上的響應,需要把它設定為false;【最好設定為false,預設true請求就會阻塞在那裡,直到完成】

force

Force a merge operation, even if there is a single segment in the shard with no deletions. [1.1.0]

強制優化操作,儘管只有單個塊,且無刪除標記的塊,1.1.0版本後有該屬性。

優化API一個呼叫,可以應用到多個索引,或者所有索引
$ curl -XPOST 'http://localhost:9200/kimchy,elasticsearch/_optimize'


$ curl -XPOST 'http://localhost:9200/_optimize'

引數使用方法: http://localhost:9200/indexName/_optimize?max_num_segments =1&only_expunge_deletes=false&wait_for_merge=false&flush=false

清空帶刪除標記的記錄:

http://localhost:9200/indexName/_optimize?max_num_segments =1&only_expunge_deletes=false&wait_for_merge=false&flush=true

Java API方式:

/**
 * <pre>
 * 索引優化方法
 * optimize API允許通過API優化一個或多個索引。
 * 優化過程的操作可以優化索引搜尋速度<br>
 * (涉及到Lucene索引內儲存每個碎片的段數)。
 * 優化操作合併Lucene段數和物理刪除帶刪除標記的記錄。
 * @param indexName 優化索引名
 * @return 是否優化成功,false:失敗 true:成功
 *  <pre>
 */
public boolean indexOptimize(String indexName ) {
	logger.info("ES索引開始優化,索引名為:"+indexName);
	Client client = getClient();
	try {
		 OptimizeResponse response =  client.admin().indices().optimize(
				new OptimizeRequest(indexName)
				//合併段數量
				.maxNumSegments(1)
				//優化過程中是否只合並帶刪除標記的段,預設為false
				.onlyExpungeDeletes(false)
				.listenerThreaded(true)
				//合併完成後是否執行flush操作,預設為true
				.flush(true)
				).actionGet();
		
		if(response.getShardFailures().length == response.getTotalShards()){
			logger.info("ES索引優化失敗"+response.getShardFailures());
			return false;
		}else if(response.getShardFailures().length>0){
			logger.info("ES索引優化部分分片失敗"+response.getShardFailures());
		}
		logger.info("ES索引優化成功");
		return true;
	}catch (Exception e) {
		logger.error("ES優化失敗", e);
		return false;
	} 
}

2.索引重新整理(refresh)

ES索引過程,在寫入文件時先寫入記憶體,如下圖所示:

新文件寫入新的分片資訊中,這時的分片是可讀寫的。只有到達一定時間和值後,才會flush到硬碟上,並進行合併;

  建立的索引,不會立馬查到,這是為什麼elasticsearch為near-real-time的原因
需要配置index.refresh_interval引數,預設是1s。可以修改conf/elasticsearch.yml檔案中index.refresh_interval:1s的重新整理頻率,對於實時性要求不是很高的,可以設定重新整理頻率大點,這樣會對索引的速度有提升;這樣所有新建的索引都使用這個重新整理頻率。

  Reflash操作允許我們在建立文件時能立即檢索得到該文件,重新整理索引(使新加內容對搜尋可見)

 單索引重新整理命令:

curl -XPOST 'http://localhost:9200/twitter/_refresh'

多索引重新整理:

curl -XPOST 'http://localhost:9200/twitter/_refresh'

 所有索引重新整理:

curl -XPOST 'http://localhost:9200/_refresh'

JAVA API:

(1)建立文件索引時可以通過設定setRefresh為true來實時使文件可查,但是
針對大量文件的操作,慎用,因為重新整理會影響使用效能;
client.prepareIndex(IndexName, indexType, docId)
		    .setSource( XContentFactory.jsonBuilder()
		        .startObject()
		            // Register the query,新增查詢記錄
		            .field("query", qb) 
		        .endObject())
		     //Needed when the query shall be available immediately
		    .setRefresh(true)
(2)整個索引重新整理方法
/**
 * <pre>
 * 索引重新整理方法
 *@param indexName 重新整理索引
 *@return 是否重新整理成功
 * <pre>
 */
public boolean indexRefresh(String ...indexName ) {
	logger.info("ES索引開始重新整理,索引名為:"+indexName);
	Client client = getClient();
	try {
		RefreshResponse response =  client.admin().indices()
				  .refresh(new RefreshRequest(indexName))
				  .actionGet();
		
		if(response.getShardFailures().length == response.getTotalShards()){
			logger.info("ES索引重新整理失敗"+response.getShardFailures());
			return false;
		}else if(response.getShardFailures().length>0){
			logger.info("ES索引重新整理部分分片失敗"+response.getShardFailures());
		}
		logger.info("ES索引重新整理成功");
		return true;
	}catch (Exception e) {
		logger.error("ES重新整理失敗", e);
		return false;
	} 
}

3.Flush清空

     f lush API可以重新整理一個或者多個索引。flush 操作將釋放該索引所佔用的記憶體,並將索引資料儲存在磁碟上,並清除內部事務日誌 transaction log。在預設情況下。ElasticSearch 使用以啟發式自動清除記憶體,並儲存索引。

Curl命令方式:

curl -POST localhost:9200/esfindex/_flush

多個索引操作命令:

curl -POST localhost:9200/zfindex,esfindex/_flush

接收引數:

wait_if_ongoing :是否等待其他的flush操作執行完畢,預設是false的,只要還有其他的程序還在執行,就會丟擲異常資訊,預設是false的;

force :是否強制執行,就算沒有更新;例如沒有任何文件提交到索引中;這樣可以使操作日誌的ID自增長,儘管沒有更新索引(內部操作)。

Java API 方式:

/**
 * <pre>
 * 索引Flush方法
 *@param indexName 重新整理索引
 *@return 是否重新整理成功
 * <pre>
 */
public boolean indexFlush(String ...indexName ) {
	logger.info("ES索引開始重新整理,索引名為:"+indexName);
	Client client = getClient();
	try {
		FlushResponse response =  client.admin().indices()
				  .flush(new FlushRequest(indexName))
				  .actionGet();
		//輸出json格式的響應資訊
		System.out.println(FastJSONHelper.serialize(response));
		if(response.getShardFailures().length == response.getTotalShards()){
			logger.info("ES索引重新整理失敗"+response.getShardFailures());
			return false;
		}else if(response.getShardFailures().length>0){
			logger.info("ES索引重新整理部分分片失敗" + response.getShardFailures());
		}
		logger.info("ES索引重新整理成功");
		return true;
	}catch (Exception e) {
		logger.error("ES重新整理失敗", e);
		return false;
	} 
}

4.FileDescriptor sync方法 (強制所有系統緩衝區與基礎裝置同步)
    在讀Lucene的原始碼過程中,發現FSDirectory類中有一個檔案資訊同步的方法,對其中的一行程式碼file.getFD().sync();不是很清楚(這也可見自己的基礎有多麼的差)。經過一番檢索,終於明白了其大意。
 protected void fsync(String name) throws IOException {
    File fullFile = new File(directory, name);
    boolean success = false;
    int retryCount = 0;
    IOException exc = null;
    while (!success && retryCount < 5) {
      retryCount++;
      RandomAccessFile file = null;
      try {
        try {
          file = new RandomAccessFile(fullFile, "rw");
          file.getFD().sync();//強制所有系統緩衝區與基礎裝置同步
          success = true;
        } finally {
          if (file != null)
            file.close();
        }
      } catch (IOException ioe) {
        if (exc == null)
          exc = ioe;
        try {
          // Pause 5 msec
          Thread.sleep(5);
        } catch (InterruptedException ie) {
          throw new ThreadInterruptedException(ie);
        }
      }
    }
    if (!success)
      // Throw original exception
      throw exc;
  }
} 

FileDescriptor檔案描述符類的例項用作與基礎機器有關的某種結構的不透明控制代碼,該結構表示開放檔案、開放套接字或者位元組的另一個源或接收者。檔案描述符的主要實際用途是建立一個包含該結構的 FileInputStream  FileOutputStream。 應用程式不應建立自己的檔案描述符。 

大部分人都認為flush後,其他使用者應該立即可見。但是在一些極端的情況下也需呼叫後還是無法看見以寫入的資料。 

flush

   重新整理此輸出流並強制寫出所有緩衝的輸出位元組。flush 的常規協定是:如果此輸出流的實現已經緩衝了以前寫入的任何位元組,則呼叫此方法指示應將這些位元組立即寫入它們預期的目標。

     如果此流的預期目標是由基礎作業系統提供的一個抽象(如一個檔案),則重新整理此流只能保證將以前寫入到流的位元組傳遞給作業系統進行寫入,但不保證能將這些位元組實際寫入到物理裝置(如磁碟驅動器)。

    OutputStream  flush 方法不執行任何操作。為什麼會這樣? 原因是,這個緩衝我們java自己實現的。 flush保證的是內部的緩衝寫入到系統中。但是系統中檔案也可能有緩衝,所以並不一定flush後立即可見。

 那麼如何解決這個問題?在檔案流或資料流中均可以看見getFD()這個方法, 它返回的是與此流有關的檔案描述符。

所以呼叫檔案描述符的sync的方法即可讓實際檔案強制同步了。JDK中描述如下:

sync

   強制所有系統緩衝區與基礎裝置同步。該方法在此 FileDescriptor 的所有修改資料和屬性都寫入相關裝置後返回。特別是,如果此 FileDescriptor 引用物理儲存介質,比如檔案系統中的檔案,則一直要等到將與此 FileDesecriptor 有關的緩衝區的所有記憶體中修改副本寫入物理介質中,sync 方法才會返回。 sync 方法由要求物理儲存(比例檔案)處於某種已知狀態下的程式碼使用。例如,提供簡單事務處理設施的類可以使用 sync 來確保某個檔案所有由給定事務造成的更改都記錄在儲存介質上。 sync 隻影響此 FileDescriptor 的緩衝區下游。如果正通過應用程式(例如,通過一個 BufferedOutputStream 物件)實現記憶體緩衝,那麼必須在資料受 sync 影響之前將這些緩衝區重新整理,並轉到 FileDescriptor 中(例如,通過呼叫 OutputStream.flush)。

  1. Flush與reflesh區別

   如果新新增一個文件到索引中,我們通過執行reflesh或者flush操作都能使該文件立即刻查詢,表面上看兩者是沒有區別的,但是在設計上兩者還是有區別的;

refresh操作有效地對Lucene index reader呼叫了reopen,使得在資料的那個時間快照進行了更新。這是Lucene擁有的近實時搜尋api的特性。

ES refresh讓文件可以搜尋到,但是不保證這些資訊被寫入disk進入一個永久的儲存狀態,因為它並沒有呼叫fsync,這就不能保證永續性了。讓你資料獲得永續性的是Lucene commit,這個操作代價比較大。

當你可以每秒都呼叫lucene reopen時,你不能這樣使用lucene的commit。

藉助lucene你可以儘可能頻繁地呼叫reopen以使新的文件可以被搜尋到,但是你仍然需要呼叫commit來確保資料寫入disk並且fsynced,這樣會安全。

ES通過增加了一個在每個shard(一個lucene的索引)上的事務解決這個問題,還未被commit的寫操作會被存起來。事務log被fsynced,已經安全了,所以你每時每刻都獲得了永續性,甚至對於那些沒有被commit的文件,都是這樣。因為refresh每秒自動地發生,所以你可以近實時地搜尋文件,並且如果有不好的事件發生,事務log可以被替代從而恢復那些丟失的文件。事務log的優越性是它可以被用來做其他的事情,例如提供實時的get_by_id。

elasticsearch flush高效地觸發lucene commit,並同時清空事務log,因為一旦資料在lucene層面提交,永續性將會由lucene保證。Flush同樣是一個api,也可以進行微調,雖然通常沒有必要這樣。Flush自動發生取決於事務log增加了多少操作、它們有多大、最後一次flush何時發生。

  Flush操作觸發Lucene的commit操作,並清空transaction log資訊。直到索引文件被持久化到索引檔案上,保證了資料的安全和持久化;flush操作api允許對flush操作進行配置,儘管普通情況下不是必要的,可以配置為新增多少條文件或提交的資料量達到多大時才進行flush操作;

flush操作與translog

   我們可能已經意識到如果資料在filesystem cache之中是很有可能在意外的故障中丟失。這個時候就需要一種機制,可以將對es的操作記錄下來,來確保當出現故障的時候,保留在filesystem的資料不會丟失,並在重啟的時候可以從這個記錄中將資料恢復過來。elasticsearch提供了translog來記錄這些操作。

當向elasticsearch傳送建立document索引請求的時候,document資料會先進入到index buffer之後,與此同時會將操作記錄在translog之中,當發生refresh時(資料從index buffer中進入filesystem cache的過程)translog中的操作記錄並不會被清除,而是當資料從filesystem cache中被寫入磁碟之後才會將translog中清空。而從filesystem cache寫入磁碟的過程就是flush。可能有點暈,我畫了一個圖幫大家理解這個過程:

有關於translog和flush的一些配置項:

index.translog.flush_threshold_ops:當發生多少次操作時進行一次flush。預設是 unlimited。

index.translog.flush_threshold_size:當translog的大小達到此值時會進行一次flush操作。預設是512mb。

index.translog.flush_threshold_period:在指定的時間間隔內如果沒有進行flush操作,會進行一次強制flush操作。預設是30m。

index.translog.interval:多少時間間隔內會檢查一次translog,來進行一次flush操作。es會隨機的在這個值到這個值的2倍大小之間進行一次操作,預設是5s。

總結和思考

Elasticsearch的索引思路:

將磁盤裡的東西儘量搬進記憶體,減少磁碟隨機讀取次數(同時也利用磁碟順序讀特性),結合各種奇技淫巧的壓縮演算法,用及其苛刻的態度使用記憶體。

所以,對於使用Elasticsearch進行索引時需要注意:

  • 不需要索引的欄位,一定要明確定義出來,因為預設是自動建索引的
  • 同樣的道理,對於String型別的欄位,不需要analysis的也需要明確定義出來,因為預設也是會analysis的
  • 選擇有規律的ID很重要,隨機性太大的ID(比如java的UUID)不利於查詢

關於最後一點,個人認為有多個因素:

其中一個(也許不是最重要的)因素: 上面看到的壓縮演算法,都是對Posting list裡的大量ID進行壓縮的,那如果ID是順序的,或者是有公共字首等具有一定規律性的ID,壓縮比會比較高;

另外一個因素: 可能是最影響查詢效能的,應該是最後通過Posting list裡的ID到磁碟中查詢Document資訊的那步,因為Elasticsearch是分Segment儲存的,根據ID這個大範圍的Term定位到Segment的效率直接影響了最後查詢的效能,如果ID是有規律的,可以快速跳過不包含該ID的Segment,從而減少不必要的磁碟讀次數