1. 程式人生 > >6. SOFAJRaft原始碼分析— 透過RheaKV看線性一致性讀

6. SOFAJRaft原始碼分析— 透過RheaKV看線性一致性讀

開篇

其實這篇文章我本來想在講完選舉的時候就開始講線性一致性讀的,但是感覺直接講沒頭沒尾的看起來比比較困難,所以就有了RheaKV的系列,這是RheaKV,終於可以講一下SOFAJRaft的線性一致性讀是怎麼做到了的。所謂線性一致性,一個簡單的例子是在 T1 的時間寫入一個值,那麼在 T1 之後讀一定能讀到這個值,不可能讀到 T1 之前的值。

其中部分內容參考SOFAJRaft文件:
SOFAJRaft 線性一致讀實現剖析 | SOFAJRaft 實現原理
SOFAJRaft 實現原理 - SOFAJRaft-RheaKV 是如何使用 Raft 的

RheaKV讀取資料

RheaKV的讀取資料的入口是DefaultRheaKVStore的bGet。

DefaultRheaKVStore#bGet

public byte[] bGet(final String key) {
    return FutureHelper.get(get(key), this.futureTimeoutMillis);
}

bGet方法中會一直呼叫到DefaultRheaKVStore的一個get方法中:
DefaultRheaKVStore#get

private CompletableFuture<byte[]> get(final byte[] key, final boolean readOnlySafe,
                                      final CompletableFuture<byte[]> future, final boolean tryBatching) {
    //校驗started狀態
    checkState();
    Requires.requireNonNull(key, "key");
    if (tryBatching) {
        final GetBatching getBatching = readOnlySafe ? this.getBatchingOnlySafe : this.getBatching;
        if (getBatching != null && getBatching.apply(key, future)) {
            return future;
        }
    }
    internalGet(key, readOnlySafe, future, this.failoverRetries, null, this.onlyLeaderRead);
    return future;
}

get方法會根據傳入的引數來判斷是否採用批處理的方式來讀取資料,readOnlySafe表示是否開啟執行緒一致性讀,由於我們呼叫的是get方法,所以readOnlySafe和tryBatching都會返回true。
所以這裡會呼叫getBatchingOnlySafe的apply方法,將key和future傳入。
getBatchingOnlySafe是在我們初始化DefaultRheaKVStore的時候初始化的:
DefaultRheaKVStore#init

.....
this.getBatchingOnlySafe = new GetBatching(KeyEvent::new, "get_batching_only_safe",
        new GetBatchingHandler("get_only_safe", true));
.....

在初始化getBatchingOnlySafe的時候傳入的處理器是GetBatchingHandler。

然後我們回到getBatchingOnlySafe#apply中,看看這個方法做了什麼:

public boolean apply(final byte[] message, final CompletableFuture<byte[]> future) {
    //GetBatchingHandler
    return this.ringBuffer.tryPublishEvent((event, sequence) -> {
        event.reset();
        event.key = message;
        event.future = future;
    });
}

apply方法會向Disruptor傳送一個事件進行非同步處理,並把我們的key封裝到event的key中。getBatchingOnlySafe的處理器是GetBatchingHandler。

批量獲取資料

GetBatchingHandler#onEvent

public void onEvent(final KeyEvent event, final long sequence, final boolean endOfBatch) throws Exception {
    this.events.add(event);
    this.cachedBytes += event.key.length;
    final int size = this.events.size();
    //校驗一下資料量,沒有達到MaxReadBytes並且不是最後一個event,那麼直接返回
    if (!endOfBatch && size < batchingOpts.getBatchSize() && this.cachedBytes < batchingOpts.getMaxReadBytes()) {
        return;
    }

    if (size == 1) {
        reset();
        try {
            //如果只是一個get請求,那麼不需要進行批量處理
            get(event.key, this.readOnlySafe, event.future, false);
        } catch (final Throwable t) {
            exceptionally(t, event.future);
        }
    } else {
        //初始化一個剛剛好大小的集合
        final List<byte[]> keys = Lists.newArrayListWithCapacity(size);
        final CompletableFuture<byte[]>[] futures = new CompletableFuture[size];
        for (int i = 0; i < size; i++) {
            final KeyEvent e = this.events.get(i);
            keys.add(e.key);
            futures[i] = e.future;
        }
        //遍歷完events資料到entries之後,重置
        reset();
        try {
            multiGet(keys, this.readOnlySafe).whenComplete((result, throwable) -> {
                //非同步回撥處理資料
                if (throwable == null) {
                    for (int i = 0; i < futures.length; i++) {
                        final ByteArray realKey = ByteArray.wrap(keys.get(i));
                        futures[i].complete(result.get(realKey));
                    }
                    return;
                }
                exceptionally(throwable, futures);
            });
        } catch (final Throwable t) {
            exceptionally(t, futures);
        }
    }
}
}

onEvent方法首先會校驗一下當前的event數量有沒有達到閾值以及當前的event是不是Disruptor中最後一個event;然後會根據不同的events集合中的數量來走不同的實現,這裡做了一個優化,如果是隻有一條資料那麼不會走批處理;最後將所有的key放入到keys集合中並呼叫multiGet進行批處理。

multiGet方法會呼叫internalMultiGet返回一個Future,從而實現非同步的返回結果。
DefaultRheaKVStore#internalMultiGet

private FutureGroup<Map<ByteArray, byte[]>> internalMultiGet(final List<byte[]> keys, final boolean readOnlySafe,
                                                             final int retriesLeft, final Throwable lastCause) {
    //因為不同的key是存放在不同的region中的,所以一個region會對應多個key,封裝到map中
    final Map<Region, List<byte[]>> regionMap = this.pdClient
            .findRegionsByKeys(keys, ApiExceptionHelper.isInvalidEpoch(lastCause));
    //返回值
    final List<CompletableFuture<Map<ByteArray, byte[]>>> futures =
            Lists.newArrayListWithCapacity(regionMap.size());
    //lastCause傳入為null
    final Errors lastError = lastCause == null ? null : Errors.forException(lastCause);

    for (final Map.Entry<Region, List<byte[]>> entry : regionMap.entrySet()) {
        final Region region = entry.getKey();
        final List<byte[]> subKeys = entry.getValue();
        //重試次數減1,設定一個重試函式
        final RetryCallable<Map<ByteArray, byte[]>> retryCallable = retryCause -> internalMultiGet(subKeys,
                readOnlySafe, retriesLeft - 1, retryCause);
        final MapFailoverFuture<ByteArray, byte[]> future = new MapFailoverFuture<>(retriesLeft, retryCallable);
        //傳送MultiGetRequest請求,獲取資料
        internalRegionMultiGet(region, subKeys, readOnlySafe, future, retriesLeft, lastError, this.onlyLeaderRead);
        futures.add(future);
    }
    return new FutureGroup<>(futures);
}

internalMultiGet裡會根據key去組裝region,不同的key會對應不同的region,資料時存在region中的,所以要從不同的region中獲取資料,region和key是一對多的關係所以這裡會封裝成一個map。然後會遍歷regionMap,每個region所對應的資料作為一個批次呼叫到internalRegionMultiGet方法中,根據不同的情況獲取資料。

DefaultRheaKVStore#internalRegionMultiGet

private void internalRegionMultiGet(final Region region, final List<byte[]> subKeys, final boolean readOnlySafe,
                                    final CompletableFuture<Map<ByteArray, byte[]>> future, final int retriesLeft,
                                    final Errors lastCause, final boolean requireLeader) {
    //因為當前的是client,所以這裡會是null
    final RegionEngine regionEngine = getRegionEngine(region.getId(), requireLeader);
    // require leader on retry
    //設定重試函式
    final RetryRunner retryRunner = retryCause -> internalRegionMultiGet(region, subKeys, readOnlySafe, future,
            retriesLeft - 1, retryCause, true);
    final FailoverClosure<Map<ByteArray, byte[]>> closure = new FailoverClosureImpl<>(future,
            false, retriesLeft, retryRunner);
    if (regionEngine != null) {
        if (ensureOnValidEpoch(region, regionEngine, closure)) {
            //如果不是null,那麼會獲取rawKVStore,並從中獲取資料
            final RawKVStore rawKVStore = getRawKVStore(regionEngine);
            if (this.kvDispatcher == null) {
                rawKVStore.multiGet(subKeys, readOnlySafe, closure);
            } else {
                //如果是kvDispatcher不為空,那麼放入到kvDispatcher中非同步執行
                this.kvDispatcher.execute(() -> rawKVStore.multiGet(subKeys, readOnlySafe, closure));
            }
        }
    } else {
        final MultiGetRequest request = new MultiGetRequest();
        request.setKeys(subKeys);
        request.setReadOnlySafe(readOnlySafe);
        request.setRegionId(region.getId());
        request.setRegionEpoch(region.getRegionEpoch());
        //呼叫rpc請求
        this.rheaKVRpcService.callAsyncWithRpc(request, closure, lastCause, requireLeader);
    }
}

因為我們這裡是client端呼叫internalRegionMultiGet方法的,所以是沒有設定regionEngine的,那麼會直接向server的當前region所對應的leader節點發送一個MultiGetRequest請求。

因為上面的這些方法基本上和put是一致的,我們已經在5. SOFAJRaft原始碼分析— RheaKV中如何存放資料?講過了,所以這裡不重複的講了。

server端處理MultiGetRequest請求

MultiGetRequest請求會被KVCommandProcessor所處理,KVCommandProcessor裡會根據請求的magic方法返回值來判斷是用什麼方式來進行處理。我們這裡會呼叫到DefaultRegionKVService的handleMultiGetRequest方法中處理請求。

public void handleMultiGetRequest(final MultiGetRequest request,
                                  final RequestProcessClosure<BaseRequest, BaseResponse<?>> closure) {
    final MultiGetResponse response = new MultiGetResponse();
    response.setRegionId(getRegionId());
    response.setRegionEpoch(getRegionEpoch());
    try {
        KVParameterRequires.requireSameEpoch(request, getRegionEpoch());
        final List<byte[]> keys = KVParameterRequires.requireNonEmpty(request.getKeys(), "multiGet.keys");
        //呼叫MetricsRawKVStore的multiGet方法
        this.rawKVStore.multiGet(keys, request.isReadOnlySafe(), new BaseKVStoreClosure() {

            @SuppressWarnings("unchecked")
            @Override
            public void run(final Status status) {
                if (status.isOk()) {
                    response.setValue((Map<ByteArray, byte[]>) getData());
                } else {
                    setFailure(request, response, status, getError());
                }
                closure.sendResponse(response);
            }
        });
    } catch (final Throwable t) {
        LOG.error("Failed to handle: {}, {}.", request, StackTraceUtil.stackTrace(t));
        response.setError(Errors.forException(t));
        closure.sendResponse(response);
    }
}

handleMultiGetRequest方法會呼叫MetricsRawKVStore的multiGet方法來批量獲取資料。

MetricsRawKVStore#multiGet

public void multiGet(final List<byte[]> keys, final boolean readOnlySafe, final KVStoreClosure closure) {
    //例項化MetricsKVClosureAdapter物件
    final KVStoreClosure c = metricsAdapter(closure, MULTI_GET, keys.size(), 0);
    //呼叫RaftRawKVStore的multiGet方法
    this.rawKVStore.multiGet(keys, readOnlySafe, c);
}

multiGet方法會傳入一個MetricsKVClosureAdapter例項,通過這個例項實現非同步回撥response。然後呼叫RaftRawKVStore的multiGet方法。

RaftRawKVStore#multiGet

public void multiGet(final List<byte[]> keys, final boolean readOnlySafe, final KVStoreClosure closure) {
    if (!readOnlySafe) {
        this.kvStore.multiGet(keys, false, closure);
        return;
    }
    // KV 儲存實現線性一致讀
    // 呼叫 readIndex 方法,等待回撥執行
    this.node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {

        @Override
        public void run(final Status status, final long index, final byte[] reqCtx) {
            //如果狀態返回成功,
            if (status.isOk()) {
                RaftRawKVStore.this.kvStore.multiGet(keys, true, closure);
                return;
            }
            //readIndex 讀取失敗嘗試應用鍵值讀操作申請任務於 Leader 節點的狀態機 KVStoreStateMachine
            RaftRawKVStore.this.readIndexExecutor.execute(() -> {
                if (isLeader()) {
                    LOG.warn("Fail to [multiGet] with 'ReadIndex': {}, try to applying to the state machine.",
                            status);
                    // If 'read index' read fails, try to applying to the state machine at the leader node
                    applyOperation(KVOperation.createMultiGet(keys), closure);
                } else {
                    LOG.warn("Fail to [multiGet] with 'ReadIndex': {}.", status);
                    // Client will retry to leader node
                    new KVClosureAdapter(closure, null).run(status);
                }
            });
        }
    });
}

multiGet呼叫node的readIndex方法進行一致性讀操作,並設定回撥,如果返回成功那麼就直接呼叫RocksRawKVStore讀取資料,如果返回不是成功那麼申請任務於 Leader 節點的狀態機 KVStoreStateMachine。

線性一致性讀readIndex

所謂線性一致讀,一個簡單的例子是在 t1 的時刻我們寫入了一個值,那麼在 t1 之後,我們一定能讀到這個值,不可能讀到 t1 之前的舊值(想想 Java 中的 volatile 關鍵字,即線性一致讀就是在分散式系統中實現 Java volatile 語義)。簡而言之是需要在分散式環境中實現 Java volatile 語義效果,即當 Client 向叢集發起寫操作的請求並且獲得成功響應之後,該寫操作的結果要對所有後來的讀請求可見。和 volatile 的區別在於 volatile 是實現執行緒之間的可見,而 SOFAJRaft 需要實現 Server 之間的可見。

SOFAJRaft提供的線性一致讀是基於 Raft 協議的 ReadIndex 實現用 ;Node#readIndex(byte [] requestContext, ReadIndexClosure done) 發起線性一致讀請求,當安全讀取時傳入的 Closure 將被呼叫,正常情況從狀態機中讀取資料返回給客戶端。

Node#readIndex

public void readIndex(final byte[] requestContext, final ReadIndexClosure done) {
    if (this.shutdownLatch != null) {
        //非同步執行回撥
        Utils.runClosureInThread(done, new Status(RaftError.ENODESHUTDOWN, "Node is shutting down."));
        throw new IllegalStateException("Node is shutting down");
    }
    Requires.requireNonNull(done, "Null closure");
    //EMPTY_BYTES
    this.readOnlyService.addRequest(requestContext, done);
}

readIndex會呼叫ReadOnlyServiceImpl#addRequest將requestContext和回撥方法done傳入,requestContext傳入的是BytesUtil.EMPTY_BYTES
接著往下看

ReadOnlyServiceImpl#addRequest

public void addRequest(final byte[] reqCtx, final ReadIndexClosure closure) {
    if (this.shutdownLatch != null) {
        Utils.runClosureInThread(closure, new Status(RaftError.EHOSTDOWN, "Was stopped"));
        throw new IllegalStateException("Service already shutdown.");
    }
    try {
        EventTranslator<ReadIndexEvent> translator = (event, sequence) -> {
            event.done = closure;
            //EMPTY_BYTES
            event.requestContext = new Bytes(reqCtx);
            event.startTime = Utils.monotonicMs();
        };
        int retryTimes = 0;
        while (true) {
            //ReadIndexEventHandler
            if (this.readIndexQueue.tryPublishEvent(translator)) {
                break;
            } else {
                retryTimes++;
                if (retryTimes > MAX_ADD_REQUEST_RETRY_TIMES) {
                    Utils.runClosureInThread(closure,
                        new Status(RaftError.EBUSY, "Node is busy, has too many read-only requests."));
                    this.nodeMetrics.recordTimes("read-index-overload-times", 1);
                    LOG.warn("Node {} ReadOnlyServiceImpl readIndexQueue is overload.", this.node.getNodeId());
                    return;
                }
                ThreadHelper.onSpinWait();
            }
        }
    } catch (final Exception e) {
        Utils.runClosureInThread(closure, new Status(RaftError.EPERM, "Node is down."));
    }
}

addRequest方法裡會將傳入的reqCtx和closure封裝成一個時間,傳入到readIndexQueue佇列中,事件釋出成功後會交由ReadIndexEventHandler處理器處理,釋出失敗會進行重試,最多重試3次。

ReadIndexEventHandler

private class ReadIndexEventHandler implements EventHandler<ReadIndexEvent> {
    // task list for batch
    private final List<ReadIndexEvent> events = new ArrayList<>(
                                                  ReadOnlyServiceImpl.this.raftOptions.getApplyBatch());

    @Override
    public void onEvent(final ReadIndexEvent newEvent, final long sequence, final boolean endOfBatch)
                                                                                                     throws Exception {
        if (newEvent.shutdownLatch != null) {
            executeReadIndexEvents(this.events);
            this.events.clear();
            newEvent.shutdownLatch.countDown();
            return;
        }

        this.events.add(newEvent);
        //批量執行
        if (this.events.size() >= ReadOnlyServiceImpl.this.raftOptions.getApplyBatch() || endOfBatch) {
            executeReadIndexEvents(this.events);
            this.events.clear();
        }
    }
}

ReadIndexEventHandler是ReadOnlyServiceImpl裡面的內部類,裡面有一個全域性的events集合用來做事件的批處理,如果當前的event已經達到了32個或是整個Disruptor佇列裡最後一個那麼會呼叫ReadOnlyServiceImpl的executeReadIndexEvents方法進行事件的批處理。

ReadOnlyServiceImpl#executeReadIndexEvents

private void executeReadIndexEvents(final List<ReadIndexEvent> events) {
    if (events.isEmpty()) {
        return;
    }
    //初始化ReadIndexRequest
    final ReadIndexRequest.Builder rb = ReadIndexRequest.newBuilder() //
        .setGroupId(this.node.getGroupId()) //
        .setServerId(this.node.getServerId().toString());

    final List<ReadIndexState> states = new ArrayList<>(events.size());

    for (final ReadIndexEvent event : events) {
        rb.addEntries(ZeroByteStringHelper.wrap(event.requestContext.get()));
        states.add(new ReadIndexState(event.requestContext, event.done, event.startTime));
    }
    final ReadIndexRequest request = rb.build();

    this.node.handleReadIndexRequest(request, new ReadIndexResponseClosure(states, request));
}

executeReadIndexEvents封裝好ReadIndexRequest請求和將ReadIndexState集合封裝到ReadIndexResponseClosure中,為後續的操作做裝備

NodeImpl#handleReadIndexRequest

public void handleReadIndexRequest(final ReadIndexRequest request, final RpcResponseClosure<ReadIndexResponse> done) {
    final long startMs = Utils.monotonicMs();
    this.readLock.lock();
    try {
        switch (this.state) {
            case STATE_LEADER:
                readLeader(request, ReadIndexResponse.newBuilder(), done);
                break;
            case STATE_FOLLOWER:
                readFollower(request, done);
                break;
            case STATE_TRANSFERRING:
                done.run(new Status(RaftError.EBUSY, "Is transferring leadership."));
                break;
            default:
                done.run(new Status(RaftError.EPERM, "Invalid state for readIndex: %s.", this.state));
                break;
        }
    } finally {
        this.readLock.unlock();
        this.metrics.recordLatency("handle-read-index", Utils.monotonicMs() - startMs);
        this.metrics.recordSize("handle-read-index-entries", request.getEntriesCount());
    }
}

因為線性一致讀在任何叢集內的節點發起,並不需要強制要求放到 Leader 節點上,允許在 Follower 節點執行,因此大大降低 Leader 的讀取壓力。
當在Follower節點執行一致性讀的時候實際上Follower 節點呼叫 RpcService#readIndex(leaderId.getEndpoint(), newRequest, -1, closure) 方法向 Leader 傳送 ReadIndex 請求,交由Leader節點實現一致性讀。所以我這裡主要介紹Leader的一致性讀。

繼續往下走呼叫NodeImpl的readLeader方法
NodeImpl#readLeader

private void readLeader(final ReadIndexRequest request, final ReadIndexResponse.Builder respBuilder,
                        final RpcResponseClosure<ReadIndexResponse> closure) {
    //1. 獲取叢集節點中多數選票數是多少
    final int quorum = getQuorum();
    if (quorum <= 1) {
        // Only one peer, fast path.
        //如果叢集中只有一個節點,那麼直接呼叫回撥函式,返回成功
        respBuilder.setSuccess(true) //
                .setIndex(this.ballotBox.getLastCommittedIndex());
        closure.setResponse(respBuilder.build());
        closure.run(Status.OK());
        return;
    }

    final long lastCommittedIndex = this.ballotBox.getLastCommittedIndex();
    //2. 任期必須相等
    //日誌管理器 LogManager 基於投票箱 BallotBox 的 lastCommittedIndex 獲取任期檢查是否等於當前任期
    // 如果不等於當前任期表示此 Leader 節點未在其任期內提交任何日誌,需要拒絕只讀請求;
    if (this.logManager.getTerm(lastCommittedIndex) != this.currTerm) {
        // Reject read only request when this leader has not committed any log entry at its term
        closure
                .run(new Status(
                        RaftError.EAGAIN,
                        "ReadIndex request rejected because leader has not committed any log entry at its term, " +
                         "logIndex=%d, currTerm=%d.",
                        lastCommittedIndex, this.currTerm));
        return;
    }
    respBuilder.setIndex(lastCommittedIndex);

    if (request.getPeerId() != null) {
        // request from follower, check if the follower is in current conf.
        final PeerId peer = new PeerId();
        peer.parse(request.getServerId());
        //3. 來自 Follower 的請求需要檢查 Follower 是否在當前配置
        if (!this.conf.contains(peer)) {
            closure
                    .run(new Status(RaftError.EPERM, "Peer %s is not in current configuration: {}.", peer,
                     this.conf));
            return;
        }
    }

    ReadOnlyOption readOnlyOpt = this.raftOptions.getReadOnlyOptions();
    //4. 如果使用的是ReadOnlyLeaseBased,確認leader是否是在在租約有效時間內
    if (readOnlyOpt == ReadOnlyOption.ReadOnlyLeaseBased && !isLeaderLeaseValid()) {
        // If leader lease timeout, we must change option to ReadOnlySafe
        readOnlyOpt = ReadOnlyOption.ReadOnlySafe;
    }

    switch (readOnlyOpt) {
        //5
        case ReadOnlySafe:
            final List<PeerId> peers = this.conf.getConf().getPeers();
            Requires.requireTrue(peers != null && !peers.isEmpty(), "Empty peers");
            //設定心跳的響應回撥函式
            final ReadIndexHeartbeatResponseClosure heartbeatDone = new ReadIndexHeartbeatResponseClosure(closure,
                    respBuilder, quorum, peers.size());
            // Send heartbeat requests to followers
            //向 Followers 節點發起一輪 Heartbeat,如果半數以上節點返回對應的
            // Heartbeat Response,那麼 Leader就能夠確定現在自己仍然是 Leader
            for (final PeerId peer : peers) {
                if (peer.equals(this.serverId)) {
                    continue;
                }
                this.replicatorGroup.sendHeartbeat(peer, heartbeatDone);
            }
            break;
        //6. 因為在租約期內不會發生選舉,確保 Leader 不會變化
        //所以直接返回回撥結果
        case ReadOnlyLeaseBased:
            // Responses to followers and local node.
            respBuilder.setSuccess(true);
            closure.setResponse(respBuilder.build());
            closure.run(Status.OK());
            break;
    }
}
  1. 獲取叢集節點中多數選票數是多少,即叢集節點的1/2+1,如果當前的叢集裡只有一個節點,那麼直接返回成功,並呼叫回撥方法
  2. 校驗 Raft 叢集節點數量以及 lastCommittedIndex 所屬任期符合預期,那麼響應構造器設定其索引為投票箱 BallotBox 的 lastCommittedIndex
  3. 來自 Follower 的請求需要檢查 Follower 是否在當前配置,如果不在當前配置中直接呼叫回撥方法設定異常
  4. 獲取 ReadIndex 請求級別 ReadOnlyOption 配置,ReadOnlyOption 引數預設值為 ReadOnlySafe。如果設定的是ReadOnlyLeaseBased,那麼會呼叫isLeaderLeaseValid檢查leader是否是在在租約有效時間內
  5. 配置為ReadOnlySafe 呼叫 Replicator#sendHeartbeat(rid, closure) 方法向 Followers 節點發送 Heartbeat 心跳請求,傳送心跳成功執行 ReadIndexHeartbeatResponseClosure 心跳響應回撥;ReadIndex 心跳響應回撥檢查是否超過半數節點包括 Leader 節點自身投票贊成,半數以上節點返回客戶端Heartbeat 請求成功響應,即 applyIndex 超過 ReadIndex 說明已經同步到 ReadIndex 對應的 Log 能夠提供 Linearizable Read
  6. 配置為ReadOnlyLeaseBased,因為Leader 租約有效期間認為當前 Leader 是 Raft Group 內的唯一有效 Leader,所以忽略 ReadIndex 傳送 Heartbeat 確認身份步驟,直接返回 Follower 節點和本地節點 Read 請求成功響應。Leader 節點繼續等待狀態機執行,直到 applyIndex 超過 ReadIndex 安全提供 Linearizable Read

無論是ReadOnlySafe還是ReadOnlyLeaseBased,最後傳送成功響應都會呼叫ReadIndexResponseClosure的run方法。

ReadIndexResponseClosure#run

public void run(final Status status) {
    //fail
    //傳入的狀態不是ok,響應失敗
    if (!status.isOk()) {
        notifyFail(status);
        return;
    }
    final ReadIndexResponse readIndexResponse = getResponse();
    //Fail
    //response沒有響應成功,響應失敗
    if (!readIndexResponse.getSuccess()) {
        notifyFail(new Status(-1, "Fail to run ReadIndex task, maybe the leader stepped down."));
        return;
    }
    // Success
    //一致性讀成功
    final ReadIndexStatus readIndexStatus = new ReadIndexStatus(this.states, this.request,
        readIndexResponse.getIndex());
    for (final ReadIndexState state : this.states) {
        // Records current commit log index.
        //設定當前提交的index
        state.setIndex(readIndexResponse.getIndex());
    }

    boolean doUnlock = true;
    ReadOnlyServiceImpl.this.lock.lock();
    try {
        //校驗applyIndex 是否超過 ReadIndex
        if (readIndexStatus.isApplied(ReadOnlyServiceImpl.this.fsmCaller.getLastAppliedIndex())) {
            // Already applied, notify readIndex request.
            ReadOnlyServiceImpl.this.lock.unlock();
            doUnlock = false;
            //已經同步到 ReadIndex 對應的 Log 能夠提供 Linearizable Read
            notifySuccess(readIndexStatus);
        } else {
            // Not applied, add it to pending-notify cache.
            ReadOnlyServiceImpl.this.pendingNotifyStatus
                .computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)) //
                .add(readIndexStatus);
        }
    } finally {
        if (doUnlock) {
            ReadOnlyServiceImpl.this.lock.unlock();
        }
    }
}

Run方法首先會校驗一下是否需要響應失敗,如果響應成功,那麼會將所有封裝的ReadIndexState更新一下index,然後校驗一下applyIndex 是否超過 ReadIndex,超過了ReadIndex代表所有已經複製到多數派上的 Log(可視為寫操作)被視為安全的 Log,該 Log 所體現的資料就能對客戶端 Client 可見。

ReadOnlyServiceImpl#notifySuccess

private void notifySuccess(final ReadIndexStatus status) {
    final long nowMs = Utils.monotonicMs();
    final List<ReadIndexState> states = status.getStates();
    final int taskCount = states.size();
    for (int i = 0; i < taskCount; i++) {
        final ReadIndexState task = states.get(i);
        final ReadIndexClosure done = task.getDone(); // stack copy
        if (done != null) {
            this.nodeMetrics.recordLatency("read-index", nowMs - task.getStartTimeMs());
            done.setResult(task.getIndex(), task.getRequestContext().get());
            done.run(Status.OK());
        }
    }
}

如果是響應成功,那麼會呼叫notifySuccess方法,會將status裡封裝的ReadIndexState集合遍歷一遍,呼叫當中的run方法。

這個run方法會呼叫到我們在multiGet中設定的run方法中
RaftRawKVStore#multiGet

public void multiGet(final List<byte[]> keys, final boolean readOnlySafe, final KVStoreClosure closure) {
    if (!readOnlySafe) {
        this.kvStore.multiGet(keys, false, closure);
        return;
    }
    // KV 儲存實現線性一致讀
    // 呼叫 readIndex 方法,等待回撥執行
    this.node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {

        @Override
        public void run(final Status status, final long index, final byte[] reqCtx) {
            //如果狀態返回成功,
            if (status.isOk()) {
                RaftRawKVStore.this.kvStore.multiGet(keys, true, closure);
                return;
            }
            //readIndex 讀取失敗嘗試應用鍵值讀操作申請任務於 Leader 節點的狀態機 KVStoreStateMachine
            RaftRawKVStore.this.readIndexExecutor.execute(() -> {
                if (isLeader()) {
                    LOG.warn("Fail to [multiGet] with 'ReadIndex': {}, try to applying to the state machine.",
                            status);
                    // If 'read index' read fails, try to applying to the state machine at the leader node
                    applyOperation(KVOperation.createMultiGet(keys), closure);
                } else {
                    LOG.warn("Fail to [multiGet] with 'ReadIndex': {}.", status);
                    // Client will retry to leader node
                    new KVClosureAdapter(closure, null).run(status);
                }
            });
        }
    });

這個run方法會呼叫RaftRawKVStore的multiGet從RocksDB中直接獲取資料。

總結

我們這篇文章從RheaKVStore的客戶端get方法一直講到,RheaKVStore服務端使用JRaft實現線性一致性讀,並講解了線性一致性讀是怎麼實現的,通過這個例子大家應該對線性一致性讀有了一個相對不錯的理解了