solrCloud 索引更新邏輯學習筆記
ZkCoreNodeProps 封裝了一個node的相關資訊,包括base_url,core_name,state,node_name,core_url,isLeader
SolrCmdDistributor
solr分散式更新的一個重要實現工具類,因為它本身的只負責分散式的請求處理,並沒有很多的業務邏輯。
- staticAdjustableSemaphoresemaphore = new AdjustableSemaphore(8);
限制同時併發的請求最多數。從建構函式看可以跟結點數相關,但最大是16.
-
public SolrCmdDistributor(intnumHosts, ThreadPoolExecutorexecutor) {
- int maxPermits = Math.max(16, numHosts * 16);
- // limits how many tasks can actually execute at once
- if (maxPermits != semaphore.getMaxPermits()) {
- semaphore.setMaxPermits(maxPermits);
- }
- completionService = new ExecutorCompletionService<Request>(executor);
-
pending = new
- }
- privatefinalMap<Node,List<AddRequest>> adds = new HashMap<Node,List<AddRequest>>();
- privatefinalMap<Node,List<DeleteRequest>> deletes = new HashMap<Node,List<DeleteRequest>>();
這兩個欄位主要是實現用於快取更新請求
執行快取的請求,呼叫下面方法
- publicvoid finish() {
- // piggyback on any outstanding adds or deletes if possible.
- flushAdds(1);
- flushDeletes(1);
- checkResponses(true);
- }
提交請求
- void submit(UpdateRequestExt ureq, Node node) {
- Request sreq = new Request();
- sreq.node = node;
- sreq.ureq = ureq;
- submit(sreq);
- }
然後是檢查響應結果,呼叫
void checkResponses(booleanblock) 作為檢查上一次提交的請求響應。當請求需要重試的時候,會預設重試最大次數10次
將最終結果返回到響應結果裡,有異常也會記錄下來。
分散式增加更新
- publicvoid distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException {
- //執行前都會去掉之前還會檢查可能沒響應完的請求,不等待,直接刪除舊的請求。
- checkResponses(false);
- // 確保所有刪除的請求被執行
- flushDeletes(1);
- //克隆更新請求重用
- AddUpdateCommand clone = new AddUpdateCommand(null);
- clone.solrDoc = cmd.solrDoc;
- clone.commitWithin = cmd.commitWithin;
- clone.overwrite = cmd.overwrite;
- clone.setVersion(cmd.getVersion());
- AddRequest addRequest = new AddRequest();
- addRequest.cmd = clone;
- addRequest.params = params;
- //增加對每個節點的請求到快取adds裡
- for (Nodenode : nodes) {
- List<AddRequest> alist = adds.get(node);
- if (alist == null) {
- alist = new ArrayList<AddRequest>(2);
- adds.put(node, alist);
- }
- alist.add(addRequest);
- }
- //執行快取adds的請求
- flushAdds(maxBufferedAddsPerServer);
- }
其它的doDelete,addCommit的請求邏輯的處理都相差不多
DistributedUpdateProcessor
這個是solrCloud主要的一個更新處理鏈,使用cloud模式的時候必要的一個處理鏈,負責分散式更新的邏輯處理
一個重要的hash演算法,作為更新記錄具體分配到哪個shard的演算法
這演算法應該會在後期重構並設計為外掛方式 ,可被使用者自定議的hash演算法替換。
- privateint hash(AddUpdateCommandcmd) {
- String hashableId = cmd.getHashableId();
- return Hash.murmurhash3_x86_32(hashableId, 0, hashableId.length(), 0);
- }
- privateint hash(DeleteUpdateCommandcmd) {
- return Hash.murmurhash3_x86_32(cmd.getId(), 0, cmd.getId().length(), 0);
- }
其中cmd.getHashableId()方法返回的主要是文件的主鍵的值
通過hash值定位更新到哪個shard
- private String getShard(int hash, String collection, ClusterState clusterState) {
- return clusterState.getShard(hash, collection);
- }
通過取到collection對應的RangeInfo,計算該hash值座落在哪個Range,就可以計算到相應的shard
- public String getShard(int hash, String collection) {
- RangeInfo rangInfo = getRanges(collection);
- int cnt = 0;
- for (Range range : rangInfo.ranges) {
- if (range.includes(hash)) {
- return rangInfo.shardList.get(cnt);
- }
- cnt++;
- }
- thrownew IllegalStateException("The HashPartitioner failed");
- }
HashPartitioner
做為切分為多個範圍的Range,主要實現方法如下:
- public List<Range> partitionRange(int partitions, int min, int max) {
- assert max >= min;
- if (partitions == 0) return Collections.EMPTY_LIST;
- long range = (long)max - (long)min;
- long srange = Math.max(1, range / partitions);
- List<Range> ranges = new ArrayList<Range>(partitions);
- long start = min;
- long end = start;
- while (end < max) {
- end = start + srange;
- // make last range always end exactly on MAX_VALUE
- if (ranges.size() == partitions - 1) {
- end = max;
- }
- ranges.add(new Range((int)start, (int)end));
- start = end + 1L;
- }
- return ranges;
- }
指定了某個範圍[min,max]切分為多個partitions的Ranges;切分的範圍是按平均的切分。
Range類封裝了主鍵hash值範圍【min,max】
RangeInfo封裝一個collection下所有shard資訊對應的Range,主要實現方法如下:
- private RangeInfo addRangeInfo(String collection) {
- List<Range> ranges;
- RangeInfo rangeInfo= new RangeInfo();
- Map<String,Slice> slices = getSlices(collection);
- if (slices == null) {
- thrownew SolrException(ErrorCode.BAD_REQUEST, "Can not find collection "
- + collection + " in " + this);
- }
- Set<String> shards = slices.keySet();
- ArrayList<String> shardList = new ArrayList<String>(shards.size());
- shardList.addAll(shards);
- Collections.sort(shardList);
- ranges = hp.partitionRange(shards.size(), Integer.MIN_VALUE, Integer.MAX_VALUE);
- rangeInfo.ranges = ranges;
- rangeInfo.shardList = shardList;
- rangeInfos.put(collection, rangeInfo);
- return rangeInfo;
- }
從上面方法的實現可以看到,會先將所有shard的名稱排序,然後根據shard的大小切分相應的多個的範圍 ,每一個shard在排序完的位置 有對應的範圍Range,兩者的資訊存放在RangeInfo.
不用擔心,上面按整數最小值 ,最大值的平均切分的範圍會導致分配不勻的情況,
可能你會擔心如果我的主鍵值是整數,那主鍵的hash值會不會跟他的值所對應呢,這樣的話,會讓hash出來的資料先填滿小的shard,其它shard不夠勻稱。其實設計者本身使用的hash演算法是針對任何型別,取的主鍵值也是以位元組陣列去做hash。這個可以自己使用它的hash演算法去校驗。
再來看一下DistributedUpdateProcessor
先看add請求,請求的來源有多種:
privateList<Node> setupRequest(inthash)
此方法就是為了判斷上面請求來源而決定分發的結點
- 請求來自leader轉發:FROMLEADER,那麼就只需要寫到本地ulog,不需要轉發給leader,也不需要轉發給其它replicas
- 請求不是來自leader,但自己就是leader,那麼就需要將請求寫到本地,順便分發給其他的replicas.
- 請求不是來自leader,但自己又不是leader,也就是該更新請求是最原始的更新請求,那麼需要將請求寫到本地ulog,順便轉發給leader,再由leader分發
所以為了不讓更新請求不會轉發來轉發去。提交索引的時候,只提交給所有leader是最佳選擇。
也就是能預先知道該資料 是要到哪個leader,這個solrj好像有實現。solrcloudserver,分對更新的資料預先做分發請求。
先來講一下增加的更新邏輯
- @Override
- publicvoid processAdd(AddUpdateCommand cmd) throws IOException {
- int hash = 0;
- if (zkEnabled) {//cloud模式下
- zkCheck();//檢查zk連線狀態
- hash = hash(cmd);//取得更新請求hash值,再決定hash到哪一個shard
- //判斷更新請求來源,決定需要轉發的nodes
- nodes = setupRequest(hash);
- } else {
- isLeader = getNonZkLeaderAssumption(req);
- }
- boolean dropCmd = false;
- if (!forwardToLeader) {