1. 程式人生 > >solrCloud 索引更新邏輯學習筆記

solrCloud 索引更新邏輯學習筆記

ZkCoreNodeProps 封裝了一個node的相關資訊,包括base_url,core_name,state,node_name,core_url,isLeader

SolrCmdDistributor

solr分散式更新的一個重要實現工具類,因為它本身的只負責分散式的請求處理,並沒有很多的業務邏輯。

  1. staticAdjustableSemaphoresemaphore = new AdjustableSemaphore(8);  

限制同時併發的請求最多數。從建構函式看可以跟結點數相關,但最大是16.

  1. public SolrCmdDistributor(intnumHosts, ThreadPoolExecutorexecutor) {  
  2.     int maxPermits = Math.max(16, numHosts * 16);  
  3.     // limits how many tasks can actually execute at once
  4.     if (maxPermits != semaphore.getMaxPermits()) {  
  5.       semaphore.setMaxPermits(maxPermits);  
  6.     }  
  7.     completionService = new ExecutorCompletionService<Request>(executor);  
  8.     pending = new
     HashSet<Future<Request>>();  
  9.   }  


  1. privatefinalMap<Node,List<AddRequest>> adds = new HashMap<Node,List<AddRequest>>();  
  2. privatefinalMap<Node,List<DeleteRequest>> deletes = new HashMap<Node,List<DeleteRequest>>();  

這兩個欄位主要是實現用於快取更新請求

執行快取的請求,呼叫下面方法

  1. publicvoid finish() {  
  2.    // piggyback on any outstanding adds or deletes if possible.
  3.    flushAdds(1);  
  4.    flushDeletes(1);  
  5.    checkResponses(true);  
  6.  }  


提交請求

  1. void submit(UpdateRequestExt ureq, Node node) {  
  2.     Request sreq = new Request();  
  3.     sreq.node = node;  
  4.     sreq.ureq = ureq;  
  5.     submit(sreq);  
  6.   }  

然後是檢查響應結果,呼叫

void checkResponses(booleanblock)  作為檢查上一次提交的請求響應。當請求需要重試的時候,會預設重試最大次數10次

將最終結果返回到響應結果裡,有異常也會記錄下來。

分散式增加更新

  1. publicvoid distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException {  
  2. //執行前都會去掉之前還會檢查可能沒響應完的請求,不等待,直接刪除舊的請求。
  3.     checkResponses(false);  
  4.     // 確保所有刪除的請求被執行
  5.     flushDeletes(1);  
  6.       //克隆更新請求重用
  7.     AddUpdateCommand clone = new AddUpdateCommand(null);  
  8.     clone.solrDoc = cmd.solrDoc;  
  9.     clone.commitWithin = cmd.commitWithin;  
  10.     clone.overwrite = cmd.overwrite;  
  11.     clone.setVersion(cmd.getVersion());  
  12.     AddRequest addRequest = new AddRequest();  
  13.     addRequest.cmd = clone;  
  14.     addRequest.params = params;  
  15.        //增加對每個節點的請求到快取adds裡
  16.     for (Nodenode : nodes) {  
  17.       List<AddRequest> alist = adds.get(node);  
  18.       if (alist == null) {  
  19.         alist = new ArrayList<AddRequest>(2);  
  20.         adds.put(node, alist);  
  21.       }  
  22.       alist.add(addRequest);  
  23.     }  
  24.     //執行快取adds的請求
  25.     flushAdds(maxBufferedAddsPerServer);  
  26.   }  

其它的doDelete,addCommit的請求邏輯的處理都相差不多

DistributedUpdateProcessor

這個是solrCloud主要的一個更新處理鏈,使用cloud模式的時候必要的一個處理鏈,負責分散式更新的邏輯處理

一個重要的hash演算法,作為更新記錄具體分配到哪個shard的演算法

這演算法應該會在後期重構並設計為外掛方式 ,可被使用者自定議的hash演算法替換。

  1. privateint hash(AddUpdateCommandcmd) {  
  2.     String hashableId = cmd.getHashableId();  
  3.     return Hash.murmurhash3_x86_32(hashableId, 0, hashableId.length(), 0);  
  4.   }  
  5.   privateint hash(DeleteUpdateCommandcmd) {  
  6.     return Hash.murmurhash3_x86_32(cmd.getId(), 0, cmd.getId().length(), 0);  
  7.   }  

其中cmd.getHashableId()方法返回的主要是文件的主鍵的值

通過hash值定位更新到哪個shard

  1. private String getShard(int hash, String collection, ClusterState clusterState) {  
  2.     return clusterState.getShard(hash, collection);  
  3.   }  

 通過取到collection對應的RangeInfo,計算該hash值座落在哪個Range,就可以計算到相應的shard

  1. public String getShard(int hash, String collection) {  
  2.    RangeInfo rangInfo = getRanges(collection);  
  3.    int cnt = 0;  
  4.    for (Range range : rangInfo.ranges) {  
  5.      if (range.includes(hash)) {  
  6.        return rangInfo.shardList.get(cnt);  
  7.      }  
  8.      cnt++;  
  9.    }  
  10.    thrownew IllegalStateException("The HashPartitioner failed");  
  11.  }  

HashPartitioner

做為切分為多個範圍的Range,主要實現方法如下:

  1. public List<Range> partitionRange(int partitions, int min, int max) {  
  2.     assert max >= min;  
  3.     if (partitions == 0return Collections.EMPTY_LIST;  
  4.     long range = (long)max - (long)min;  
  5.     long srange = Math.max(1, range / partitions);  
  6.     List<Range> ranges = new ArrayList<Range>(partitions);  
  7.     long start = min;  
  8.     long end = start;  
  9.     while (end < max) {  
  10.       end = start + srange;  
  11.       // make last range always end exactly on MAX_VALUE
  12.       if (ranges.size() == partitions - 1) {  
  13.         end = max;  
  14.       }  
  15.       ranges.add(new Range((int)start, (int)end));  
  16.       start = end + 1L;  
  17.     }  
  18.     return ranges;  
  19.   }  

指定了某個範圍[min,max]切分為多個partitions的Ranges;切分的範圍是按平均的切分。

Range類封裝了主鍵hash值範圍【min,max】

RangeInfo封裝一個collection下所有shard資訊對應的Range,主要實現方法如下:

  1. private RangeInfo addRangeInfo(String collection) {  
  2.     List<Range> ranges;  
  3.     RangeInfo rangeInfo= new RangeInfo();  
  4.     Map<String,Slice> slices = getSlices(collection);  
  5.     if (slices == null) {  
  6.       thrownew SolrException(ErrorCode.BAD_REQUEST, "Can not find collection "
  7.           + collection + " in " + this);  
  8.     }  
  9.     Set<String> shards = slices.keySet();  
  10.     ArrayList<String> shardList = new ArrayList<String>(shards.size());  
  11.     shardList.addAll(shards);  
  12.     Collections.sort(shardList);    
  13.     ranges = hp.partitionRange(shards.size(), Integer.MIN_VALUE, Integer.MAX_VALUE);  
  14.     rangeInfo.ranges = ranges;  
  15.     rangeInfo.shardList = shardList;  
  16.     rangeInfos.put(collection, rangeInfo);  
  17.     return rangeInfo;  
  18.   }  

從上面方法的實現可以看到,會先將所有shard的名稱排序,然後根據shard的大小切分相應的多個的範圍 ,每一個shard在排序完的位置 有對應的範圍Range,兩者的資訊存放在RangeInfo.

不用擔心,上面按整數最小值 ,最大值的平均切分的範圍會導致分配不勻的情況,

可能你會擔心如果我的主鍵值是整數,那主鍵的hash值會不會跟他的值所對應呢,這樣的話,會讓hash出來的資料先填滿小的shard,其它shard不夠勻稱。其實設計者本身使用的hash演算法是針對任何型別,取的主鍵值也是以位元組陣列去做hash。這個可以自己使用它的hash演算法去校驗。

再來看一下DistributedUpdateProcessor

先看add請求,請求的來源有多種:

privateList<Node> setupRequest(inthash) 

此方法就是為了判斷上面請求來源而決定分發的結點

  1. 請求來自leader轉發:FROMLEADER,那麼就只需要寫到本地ulog,不需要轉發給leader,也不需要轉發給其它replicas
  2. 請求不是來自leader,但自己就是leader,那麼就需要將請求寫到本地,順便分發給其他的replicas.
  3. 請求不是來自leader,但自己又不是leader,也就是該更新請求是最原始的更新請求,那麼需要將請求寫到本地ulog,順便轉發給leader,再由leader分發

所以為了不讓更新請求不會轉發來轉發去。提交索引的時候,只提交給所有leader是最佳選擇。

也就是能預先知道該資料 是要到哪個leader,這個solrj好像有實現。solrcloudserver,分對更新的資料預先做分發請求。

先來講一下增加的更新邏輯

  1. @Override
  2.   publicvoid processAdd(AddUpdateCommand cmd) throws IOException {  
  3.     int hash = 0;  
  4.     if (zkEnabled) {//cloud模式下
  5.       zkCheck();//檢查zk連線狀態
  6.       hash = hash(cmd);//取得更新請求hash值,再決定hash到哪一個shard
  7.       //判斷更新請求來源,決定需要轉發的nodes
  8.       nodes = setupRequest(hash);  
  9.     } else {  
  10.       isLeader = getNonZkLeaderAssumption(req);  
  11.     }  
  12.     boolean dropCmd = false;  
  13.     if (!forwardToLeader) {