5. SOFAJRaft原始碼分析— RheaKV中如何存放資料?
概述
上一篇講了RheaKV是如何進行初始化的,因為RheaKV主要是用來做KV儲存的,RheaKV讀寫的是相當的複雜,一起寫會篇幅太長,所以這一篇主要來講一下RheaKV中如何存放資料。
我們這裡使用一個客戶端的例子來開始本次的講解:
public static void main(final String[] args) throws Exception { final Client client = new Client(); client.init(); //get(client.getRheaKVStore()); RheaKVStore rheaKVStore = client.getRheaKVStore(); final byte[] key = writeUtf8("hello"); final byte[] value = writeUtf8("world"); rheaKVStore.bPut(key, value); client.shutdown(); }
我們從這個main方法中啟動我們的例項,呼叫rheaKVStore.bPut(key, value)方法將資料放入到RheaKV中。
public class Client { private final RheaKVStore rheaKVStore = new DefaultRheaKVStore(); public void init() { final List<RegionRouteTableOptions> regionRouteTableOptionsList = MultiRegionRouteTableOptionsConfigured .newConfigured() // .withInitialServerList(-1L /* default id */, Configs.ALL_NODE_ADDRESSES) // .config(); final PlacementDriverOptions pdOpts = PlacementDriverOptionsConfigured.newConfigured() // .withFake(true) // .withRegionRouteTableOptionsList(regionRouteTableOptionsList) // .config(); final RheaKVStoreOptions opts = RheaKVStoreOptionsConfigured.newConfigured() // .withClusterName(Configs.CLUSTER_NAME) // .withPlacementDriverOptions(pdOpts) // .config(); System.out.println(opts); rheaKVStore.init(opts); } public void shutdown() { this.rheaKVStore.shutdown(); } public RheaKVStore getRheaKVStore() { return rheaKVStore; } } public class Configs { public static String ALL_NODE_ADDRESSES = "127.0.0.1:8181,127.0.0.1:8182,127.0.0.1:8183"; public static String CLUSTER_NAME = "rhea_example"; }
Client在呼叫init方法初始化rheaKVStore的時候和我們上一節中講的server例子很像,區別是少了StoreEngineOptions的設定和多配置了一個regionRouteTableOptionsList例項。
bPut存入資料
我們這裡存入資料會呼叫DefaultRheaKVStore的bPut方法:
DefaultRheaKVStore#bPut
public Boolean bPut(final byte[] key, final byte[] value) { return FutureHelper.get(put(key, value), this.futureTimeoutMillis); }
bPut方法裡面主要的存放資料的操作在put方法裡面做的,put方法會返回一個CompletableFuture給FutureHelper的get方法呼叫,並且在bPut方法裡面會放入一個超時時間,在init方法中初始化的,預設是5秒。
接下來我們進入到put方法中:
DefaultRheaKVStore#put
public CompletableFuture<Boolean> put(final byte[] key, final byte[] value) {
Requires.requireNonNull(key, "key");
Requires.requireNonNull(value, "value");
//是否嘗試進行批量的put
return put(key, value, new CompletableFuture<>(), true);
}
這裡會呼叫put的過載的方法,第三個引數是表示傳入一個空的回撥函式,第四個引數表示採用Batch 批量儲存
DefaultRheaKVStore#put
private CompletableFuture<Boolean> put(final byte[] key, final byte[] value,
final CompletableFuture<Boolean> future, final boolean tryBatching) {
//校驗一下是否已經init初始化了
checkState();
if (tryBatching) {
//putBatching例項在init方法中被初始化
final PutBatching putBatching = this.putBatching;
if (putBatching != null && putBatching.apply(new KVEntry(key, value), future)) {
//由於我們傳入的是一個空的例項,所以這裡直接返回
return future;
}
}
//直接存入資料
internalPut(key, value, future, this.failoverRetries, null);
return future;
}
checkState方法會去校驗started這個屬性有沒有被設定,如果呼叫過DefaultRheaKVStore的init方法進行初始化過,那麼會設定started為ture。
這裡還會呼叫init方法裡面初始化過的putBatching例項,我們下面看看putBatching例項做了什麼。
putBatching批量存入資料
putBatching在init例項初始化的時候會傳入一個PutBatchingHandler作為處理器:
this.putBatching = new PutBatching(KVEvent::new, "put_batching",
new PutBatchingHandler("put"));
我們下面看看PutBatching的構造方法:
public PutBatching(EventFactory<KVEvent> factory, String name, PutBatchingHandler handler) {
super(factory, batchingOpts.getBufSize(), name, handler);
}
這裡由於PutBatching繼承了Batching這個抽象類,所以在例項化的時候直接呼叫父類的構造器例項化:
public Batching(EventFactory<T> factory, int bufSize, String name, EventHandler<T> handler) {
this.name = name;
this.disruptor = new Disruptor<>(factory, bufSize, new NamedThreadFactory(name, true));
this.disruptor.handleEventsWith(handler);
this.disruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(name));
this.ringBuffer = this.disruptor.start();
}
在Batching構造器裡面會初始化一個Disruptor例項,並將我們傳入的PutBatchingHandler處理器作為Disruptor的處理器,所有傳入PutBatching的資料都會經過PutBatchingHandler來處理。
我們下面看看PutBatchingHandler是怎麼處理資料的:
PutBatchingHandler#onEvent
public void onEvent(final KVEvent event, final long sequence, final boolean endOfBatch) throws Exception {
//1.把傳入的時間加入到集合中
this.events.add(event);
//加上key和value的長度
this.cachedBytes += event.kvEntry.length();
final int size = this.events.size();
//BatchSize等於100 ,並且maxWriteBytes位元組數32768
//2. 如果不是最後一個event,也沒有這麼多數量的資料,那麼就不傳送
if (!endOfBatch && size < batchingOpts.getBatchSize() && this.cachedBytes < batchingOpts.getMaxWriteBytes()) {
return;
}
//3.如果傳入的size為1,那麼就重新呼叫put方法放入到Batching裡面
if (size == 1) {
//重置events和cachedBytes
reset();
final KVEntry kv = event.kvEntry;
try {
put(kv.getKey(), kv.getValue(), event.future, false);
} catch (final Throwable t) {
exceptionally(t, event.future);
}
// 4.如果size不為1,那麼把資料遍歷到集合裡面批量處理
} else {
//初始化一個長度為size的list
final List<KVEntry> entries = Lists.newArrayListWithCapacity(size);
final CompletableFuture<Boolean>[] futures = new CompletableFuture[size];
for (int i = 0; i < size; i++) {
final KVEvent e = this.events.get(i);
entries.add(e.kvEntry);
//使用CompletableFuture構建非同步應用
futures[i] = e.future;
}
//遍歷完events資料到entries之後,重置
reset();
try {
//當put方法完成後執行whenComplete中的內容
put(entries).whenComplete((result, throwable) -> {
//如果沒有丟擲異常,那麼通知所有future已經執行完畢了
if (throwable == null) {
for (int i = 0; i < futures.length; i++) {
futures[i].complete(result);
}
return;
}
exceptionally(throwable, futures);
});
} catch (final Throwable t) {
exceptionally(t, futures);
}
}
}
- 進入這個方法的時候會把這個event加入到events集合中,然後把彙總長度和events的size
- 由於所有的event都是發往Disruptor,然後分發到PutBatchingHandler進行處理,所以可以通過endOfBatch引數判斷這個分發過來的event是不是最後一個,如果不是最後一個,並且總共的event數量沒有超過預設的100,cachedBytes沒有超過32768,那麼就直接返回,等湊夠了批次再處理
- 走到這個判斷,說明只有一條資料過來,那麼就重新呼叫put方法,設定tryBatching為false,那麼會直接走internalPut方法
- 如果size不等於1,那麼就會把所有的event都加入到集合裡面,然後呼叫put方法批量處理,當處理完之後呼叫whenComplete方法對返回的結果進行一場或回撥處理
往RheaKV中批量put設值
下面我來講一下PutBatchingHandler#onEvent中的put(entries)這個方法是怎麼處理批量資料的,這個方法會呼叫到DefaultRheaKVStore的put方法。
DefaultRheaKVStore#put
public CompletableFuture<Boolean> put(final List<KVEntry> entries) {
//檢查狀態
checkState();
Requires.requireNonNull(entries, "entries");
Requires.requireTrue(!entries.isEmpty(), "entries empty");
//存放資料
final FutureGroup<Boolean> futureGroup = internalPut(entries, this.failoverRetries, null);
//處理返回狀態
return FutureHelper.joinBooleans(futureGroup);
}
該方法會呼叫internalPut進行設值操作。
DefaultRheaKVStore#internalPut
private FutureGroup<Boolean> internalPut(final List<KVEntry> entries, final int retriesLeft,
final Throwable lastCause) {
//組裝Region和KVEntry的對映關係
final Map<Region, List<KVEntry>> regionMap = this.pdClient
.findRegionsByKvEntries(entries, ApiExceptionHelper.isInvalidEpoch(lastCause));
final List<CompletableFuture<Boolean>> futures = Lists.newArrayListWithCapacity(regionMap.size());
final Errors lastError = lastCause == null ? null : Errors.forException(lastCause);
for (final Map.Entry<Region, List<KVEntry>> entry : regionMap.entrySet()) {
final Region region = entry.getKey();
final List<KVEntry> subEntries = entry.getValue();
//設定重試回撥函式,並將重試次數減一
final RetryCallable<Boolean> retryCallable = retryCause -> internalPut(subEntries, retriesLeft - 1,
retryCause);
final BoolFailoverFuture future = new BoolFailoverFuture(retriesLeft, retryCallable);
//把資料存放到region中
internalRegionPut(region, subEntries, future, retriesLeft, lastError);
futures.add(future);
}
return new FutureGroup<>(futures);
}
因為一個Store裡面會有很多的Region,所以這個方法首先會去組裝Region和KVEntry的關係,確定這個KVEntry是屬於哪個Region的。
然後設定好回撥函式後呼叫internalRegionPut方法將subEntries存入到Region中。
組裝Region和KVEntry的對映關係
我們下面看看是怎麼組裝的:
pdClient是FakePlacementDriverClient的例項,繼承了AbstractPlacementDriverClient,所以呼叫的是父類的findRegionsByKvEntries方法
AbstractPlacementDriverClient#findRegionsByKvEntries
public Map<Region, List<KVEntry>> findRegionsByKvEntries(final List<KVEntry> kvEntries, final boolean forceRefresh) {
if (forceRefresh) {
refreshRouteTable();
}
//regionRouteTable裡面存了region的路由資訊
return this.regionRouteTable.findRegionsByKvEntries(kvEntries);
}
因為我們這裡是用的FakePlacementDriverClient,所以refreshRouteTable返回的是一個空方法,所以往下走是呼叫RegionRouteTable的findRegionsByKvEntries的方法
RegionRouteTable#findRegionsByKvEntries
public Map<Region, List<KVEntry>> findRegionsByKvEntries(final List<KVEntry> kvEntries) {
Requires.requireNonNull(kvEntries, "kvEntries");
//例項化一個map
final Map<Region, List<KVEntry>> regionMap = Maps.newHashMap();
final StampedLock stampedLock = this.stampedLock;
final long stamp = stampedLock.readLock();
try {
for (final KVEntry kvEntry : kvEntries) {
//根據kvEntry的key去找和region的startKey最接近的region
final Region region = findRegionByKeyWithoutLock(kvEntry.getKey());
//設定region和KVEntry的對映關係
regionMap.computeIfAbsent(region, k -> Lists.newArrayList()).add(kvEntry);
}
return regionMap;
} finally {
stampedLock.unlockRead(stamp);
}
}
private Region findRegionByKeyWithoutLock(final byte[] key) {
// return the greatest key less than or equal to the given key
//rangeTable裡面存的是region的startKey,value是regionId
// 這裡返回小於等於key的第一個元素
final Map.Entry<byte[], Long> entry = this.rangeTable.floorEntry(key);
if (entry == null) {
reportFail(key);
throw reject(key, "fail to find region by key");
}
//regionTable裡面存的regionId,value是region
return this.regionTable.get(entry.getValue());
}
findRegionsByKvEntries方法會遍歷所有的KVEntry集合,然後呼叫findRegionByKeyWithoutLock去rangeTable裡面找合適的region,由於rangeTable是一個treemap,所以呼叫了floorEntry返回的是小於等於key的第一個region。
然後將region放入到regionMap裡,key是regionMap,value是一個KVEntry集合。
regionRouteTable裡面的資料是在DefaultRheaKVStore初始化的時候傳入的,不記得的同學我給出了初始化路由表的過程:
DefaultRheaKVStore#init->FakePlacementDriverClient#init->
AbstractPlacementDriverClient#init->AbstractPlacementDriverClient#initRouteTableByRegion->regionRouteTable#addOrUpdateRegion
資料存放到相應的region中
我們接著DefaultRheaKVStore的internalPut的方法往下看到internalRegionPut方法,這個方法是真正儲存資料的地方:
DefaultRheaKVStore#internalRegionPut
private void internalRegionPut(final Region region, final List<KVEntry> subEntries,
final CompletableFuture<Boolean> future, final int retriesLeft,
final Errors lastCause) {
//獲取regionEngine
final RegionEngine regionEngine = getRegionEngine(region.getId(), true);
//重試函式,會回調當前的方法
final RetryRunner retryRunner = retryCause -> internalRegionPut(region, subEntries, future,
retriesLeft - 1, retryCause);
final FailoverClosure<Boolean> closure = new FailoverClosureImpl<>(future, false, retriesLeft,
retryRunner);
if (regionEngine != null) {
if (ensureOnValidEpoch(region, regionEngine, closure)) {
//獲取MetricsRawKVStore
final RawKVStore rawKVStore = getRawKVStore(regionEngine);
//在init方法中根據useParallelKVExecutor屬性決定是不是空
if (this.kvDispatcher == null) {
//呼叫RockDB的api進行插入
rawKVStore.put(subEntries, closure);
} else {
//把put操作分發到kvDispatcher中非同步執行
this.kvDispatcher.execute(() -> rawKVStore.put(subEntries, closure));
}
}
} else {
//如果當前節點不是leader,那麼則返回的regionEngine為null
//那麼發起rpc呼叫到leader節點中
final BatchPutRequest request = new BatchPutRequest();
request.setKvEntries(subEntries);
request.setRegionId(region.getId());
request.setRegionEpoch(region.getRegionEpoch());
this.rheaKVRpcService.callAsyncWithRpc(request, closure, lastCause);
}
}
這個方法首先呼叫getRegionEngine獲取regionEngine,因為我們這裡是client節點,沒有初始化RegionEngine,所以這裡獲取的為空,會直接通過rpc請求傳送,然後交由KVCommandProcessor進行處理。
如果當前的節點是server,並且該RegionEngine是leader,那麼會呼叫rawKVStore然後呼叫put方法插入到RockDB中。
我們最後再看看rheaKVRpcService傳送的rpc請求是怎麼被處理的。
向服務端傳送BatchPutRequest請求插入資料
向服務端傳送put請求是通過呼叫DefaultRheaKVRpcService的callAsyncWithRpc方法發起的:
DefaultRheaKVRpcService#callAsyncWithRpc
public <V> CompletableFuture<V> callAsyncWithRpc(final BaseRequest request, final FailoverClosure<V> closure,
final Errors lastCause) {
return callAsyncWithRpc(request, closure, lastCause, true);
}
public <V> CompletableFuture<V> callAsyncWithRpc(final BaseRequest request, final FailoverClosure<V> closure,
final Errors lastCause, final boolean requireLeader) {
final boolean forceRefresh = ErrorsHelper.isInvalidPeer(lastCause);
//獲取leader的endpoint
final Endpoint endpoint = getRpcEndpoint(request.getRegionId(), forceRefresh, this.rpcTimeoutMillis,
requireLeader);
//發起rpc呼叫
internalCallAsyncWithRpc(endpoint, request, closure);
return closure.future();
}
在這個方法裡會呼叫getRpcEndpoint方法來獲取region所對應server的endpoint,然後對這個節點呼叫rpc請求。呼叫rpc請求都是sofa的bolt框架進行呼叫的,所以下面我們重點看怎麼獲取endpoint
DefaultRheaKVRpcService#getRpcEndpoint
public Endpoint getRpcEndpoint(final long regionId, final boolean forceRefresh, final long timeoutMillis,
final boolean requireLeader) {
if (requireLeader) {
//獲取leader
return getLeader(regionId, forceRefresh, timeoutMillis);
} else {
//輪詢獲取一個不是自己的節點
return getLuckyPeer(regionId, forceRefresh, timeoutMillis);
}
}
這裡有兩個分支,一個是獲取leader節點,一個是輪詢獲取節點。由於這兩個方法挺有意思的,所以我們下面兩個方法都講一下
根據regionId獲取leader節點
根據regionId獲取leader節點是由getLeader方法觸發的,在我們呼叫DefaultRheaKVStore的init方法例項化DefaultRheaKVRpcService的時候會重寫getLeader方法:
DefaultRheaKVStore#init
this.rheaKVRpcService = new DefaultRheaKVRpcService(this.pdClient, selfEndpoint) {
@Override
public Endpoint getLeader(final long regionId, final boolean forceRefresh, final long timeoutMillis) {
final Endpoint leader = getLeaderByRegionEngine(regionId);
if (leader != null) {
return leader;
}
return super.getLeader(regionId, forceRefresh, timeoutMillis);
}
};
重寫的getLeader方法會呼叫getLeaderByRegionEngine方法區根據regionId找Endpoint,如果找不到,那麼會呼叫父類的getLeader方法。
DefaultRheaKVStore#getLeaderByRegionEngine
private Endpoint getLeaderByRegionEngine(final long regionId) {
final RegionEngine regionEngine = getRegionEngine(regionId);
if (regionEngine != null) {
final PeerId leader = regionEngine.getLeaderId();
if (leader != null) {
final String raftGroupId = JRaftHelper.getJRaftGroupId(this.pdClient.getClusterName(), regionId);
RouteTable.getInstance().updateLeader(raftGroupId, leader);
return leader.getEndpoint();
}
}
return null;
}
這個方法這裡會獲取RegionEngine,但是我們這裡是client節點,是沒有初始化RegionEngine的,所以這裡就會返回null,接著返回到上一級中呼叫父類的getLeader方法。
DefaultRheaKVRpcService#getLeader
public Endpoint getLeader(final long regionId, final boolean forceRefresh, final long timeoutMillis) {
return this.pdClient.getLeader(regionId, forceRefresh, timeoutMillis);
}
這裡會呼叫pdClient的getLeader方法,這裡我們傳入的pdClient是FakePlacementDriverClient,它繼承了AbstractPlacementDriverClient,所以會呼叫到父類的getLeader方法中。
AbstractPlacementDriverClient#getLeader
public Endpoint getLeader(final long regionId, final boolean forceRefresh, final long timeoutMillis) {
//這裡會根據clusterName和regionId拼接出raftGroupId
final String raftGroupId = JRaftHelper.getJRaftGroupId(this.clusterName, regionId);
//去路由表裡找這個叢集的leader
PeerId leader = getLeader(raftGroupId, forceRefresh, timeoutMillis);
if (leader == null && !forceRefresh) {
// Could not found leader from cache, try again and force refresh cache
// 如果第一次沒有找到,那麼執行強制重新整理的方法再找一次
leader = getLeader(raftGroupId, true, timeoutMillis);
}
if (leader == null) {
throw new RouteTableException("no leader in group: " + raftGroupId);
}
return leader.getEndpoint();
}
這個方法裡面會根據clusterName和regionId拼接raftGroupId,如果傳入的clusterName為demo,regionId為1,那麼拼接出來的raftGroupId就是:demo--1
。
然後會去呼叫getLeader獲取leader的PeerId,第一次呼叫這個方法傳入的forceRefresh為false,表示不用重新整理,如果返回的為null,那麼會執行強制重新整理再去找一次。
AbstractPlacementDriverClient#getLeader
protected PeerId getLeader(final String raftGroupId, final boolean forceRefresh, final long timeoutMillis) {
final RouteTable routeTable = RouteTable.getInstance();
//是否要強制重新整理路由表
if (forceRefresh) {
final long deadline = System.currentTimeMillis() + timeoutMillis;
final StringBuilder error = new StringBuilder();
// A newly launched raft group may not have been successful in the election,
// or in the 'leader-transfer' state, it needs to be re-tried
Throwable lastCause = null;
for (;;) {
try {
//重新整理節點路由表
final Status st = routeTable.refreshLeader(this.cliClientService, raftGroupId, 2000);
if (st.isOk()) {
break;
}
error.append(st.toString());
} catch (final InterruptedException e) {
ThrowUtil.throwException(e);
} catch (final Throwable t) {
lastCause = t;
error.append(t.getMessage());
}
//如果還沒有到截止時間,那麼sleep10毫秒之後再重新整理
if (System.currentTimeMillis() < deadline) {
LOG.debug("Fail to find leader, retry again, {}.", error);
error.append(", ");
try {
Thread.sleep(10);
} catch (final InterruptedException e) {
ThrowUtil.throwException(e);
}
// 到了截止時間,那麼丟擲異常
} else {
throw lastCause != null ? new RouteTableException(error.toString(), lastCause)
: new RouteTableException(error.toString());
}
}
}
//返回路由表裡面的leader
return routeTable.selectLeader(raftGroupId);
}
如果要執行強制重新整理,那麼會計算一下超時時間,然後呼叫死迴圈,在迴圈體裡面會去重新整理路由表,如果沒有重新整理成功也沒有超時,那麼會sleep10毫秒重新再刷。
RouteTable#refreshLeader
public Status refreshLeader(final CliClientService cliClientService, final String groupId, final int timeoutMs)
throws InterruptedException,
TimeoutException {
Requires.requireTrue(!StringUtils.isBlank(groupId), "Blank group id");
Requires.requireTrue(timeoutMs > 0, "Invalid timeout: " + timeoutMs);
//根據叢集的id去獲取叢集的配置資訊,裡面包括叢集的ip和埠號
final Configuration conf = getConfiguration(groupId);
if (conf == null) {
return new Status(RaftError.ENOENT,
"Group %s is not registered in RouteTable, forgot to call updateConfiguration?", groupId);
}
final Status st = Status.OK();
final CliRequests.GetLeaderRequest.Builder rb = CliRequests.GetLeaderRequest.newBuilder();
rb.setGroupId(groupId);
//傳送獲取leader節點的請求
final CliRequests.GetLeaderRequest request = rb.build();
TimeoutException timeoutException = null;
for (final PeerId peer : conf) {
//如果連線不上,先設定狀態為error,然後continue
if (!cliClientService.connect(peer.getEndpoint())) {
if (st.isOk()) {
st.setError(-1, "Fail to init channel to %s", peer);
} else {
final String savedMsg = st.getErrorMsg();
st.setError(-1, "%s, Fail to init channel to %s", savedMsg, peer);
}
continue;
}
//向這個節點發送獲取leader的GetLeaderRequest請求
final Future<Message> result = cliClientService.getLeader(peer.getEndpoint(), request, null);
try {
final Message msg = result.get(timeoutMs, TimeUnit.MILLISECONDS);
//異常情況的處理
if (msg instanceof RpcRequests.ErrorResponse) {
if (st.isOk()) {
st.setError(-1, ((RpcRequests.ErrorResponse) msg).getErrorMsg());
} else {
final String savedMsg = st.getErrorMsg();
st.setError(-1, "%s, %s", savedMsg, ((RpcRequests.ErrorResponse) msg).getErrorMsg());
}
} else {
final CliRequests.GetLeaderResponse response = (CliRequests.GetLeaderResponse) msg;
//重置leader
updateLeader(groupId, response.getLeaderId());
return Status.OK();
}
} catch (final TimeoutException e) {
timeoutException = e;
} catch (final ExecutionException e) {
if (st.isOk()) {
st.setError(-1, e.getMessage());
} else {
final String savedMsg = st.getErrorMsg();
st.setError(-1, "%s, %s", savedMsg, e.getMessage());
}
}
}
if (timeoutException != null) {
throw timeoutException;
}
return st;
}
大家不要一開始就被這樣的長的方法給迷惑住了,這個方法實際上非常的簡單:
- 根據groupId獲取叢集節點的配置資訊,其中包括了其他節點的ip和埠號
- 遍歷conf裡面的叢集節點
- 嘗試連線被遍歷的節點,如果連線不上直接continue換到下一個節點
- 向這個節點發送GetLeaderRequest請求,如果在超時時間內可以返回正常的響應,那麼就呼叫updateLeader更新leader資訊
updateLeader方法相當節點,裡面就是更新一下路由表的leader屬性,我們這裡看看server是怎麼處理GetLeaderRequest請求的
GetLeaderRequest由GetLeaderRequestProcessor處理器來進行處理。
GetLeaderRequestProcessor#processRequest
public Message processRequest(GetLeaderRequest request, RpcRequestClosure done) {
List<Node> nodes = new ArrayList<>();
String groupId = getGroupId(request);
//如果請求是指定某個PeerId
//那麼則則去叢集裡找到指定Peer所對應的node
if (request.hasPeerId()) {
String peerIdStr = getPeerId(request);
PeerId peer = new PeerId();
if (peer.parse(peerIdStr)) {
Status st = new Status();
nodes.add(getNode(groupId, peer, st));
if (!st.isOk()) {
return RpcResponseFactory.newResponse(st);
}
} else {
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Fail to parse peer id %", peerIdStr);
}
} else {
//獲取叢集所有的節點
nodes = NodeManager.getInstance().getNodesByGroupId(groupId);
}
if (nodes == null || nodes.isEmpty()) {
return RpcResponseFactory.newResponse(RaftError.ENOENT, "No nodes in group %s", groupId);
}
//遍歷叢集node,獲取leaderId
for (Node node : nodes) {
PeerId leader = node.getLeaderId();
if (leader != null && !leader.isEmpty()) {
return GetLeaderResponse.newBuilder().setLeaderId(leader.toString()).build();
}
}
return RpcResponseFactory.newResponse(RaftError.EAGAIN, "Unknown leader");
}
這裡由於我們穿過來的request並沒有攜帶PeerId,所以不會去獲取指定的peer對應node節點的leaderId,而是會去找到叢集groupId對應的所有節點,然後遍歷節點找到對應的leaderId。
getLuckyPeer輪詢獲取一個節點
在上面我們講完了getLeader是怎麼實現的,下面我們講一下getLuckyPeer這個方法裡面是怎麼操作的。
public Endpoint getLuckyPeer(final long regionId, final boolean forceRefresh, final long timeoutMillis) {
return this.pdClient.getLuckyPeer(regionId, forceRefresh, timeoutMillis, this.selfEndpoint);
}
這裡和getLeader方法一樣會呼叫到AbstractPlacementDriverClient的getLuckyPeer方法中
AbstractPlacementDriverClient#getLuckyPeer
public Endpoint getLuckyPeer(final long regionId, final boolean forceRefresh, final long timeoutMillis,
final Endpoint unExpect) {
final String raftGroupId = JRaftHelper.getJRaftGroupId(this.clusterName, regionId);
final RouteTable routeTable = RouteTable.getInstance();
//是否要強制重新整理一下最新的叢集節點資訊
if (forceRefresh) {
final long deadline = System.currentTimeMillis() + timeoutMillis;
final StringBuilder error = new StringBuilder();
// A newly launched raft group may not have been successful in the election,
// or in the 'leader-transfer' state, it needs to be re-tried
for (;;) {
try {
final Status st = routeTable.refreshConfiguration(this.cliClientService, raftGroupId, 5000);
if (st.isOk()) {
break;
}
error.append(st.toString());
} catch (final InterruptedException e) {
ThrowUtil.throwException(e);
} catch (final TimeoutException e) {
error.append(e.getMessage());
}
if (System.currentTimeMillis() < deadline) {
LOG.debug("Fail to get peers, retry again, {}.", error);
error.append(", ");
try {
Thread.sleep(5);
} catch (final InterruptedException e) {
ThrowUtil.throwException(e);
}
} else {
throw new RouteTableException(error.toString());
}
}
}
final Configuration configs = routeTable.getConfiguration(raftGroupId);
if (configs == null) {
throw new RouteTableException("empty configs in group: " + raftGroupId);
}
final List<PeerId> peerList = configs.getPeers();
if (peerList == null || peerList.isEmpty()) {
throw new RouteTableException("empty peers in group: " + raftGroupId);
}
//如果這個叢集裡只有一個節點了,那麼直接返回就好了
final int size = peerList.size();
if (size == 1) {
return peerList.get(0).getEndpoint();
}
//獲取負載均衡器,這裡用的是輪詢策略
final RoundRobinLoadBalancer balancer = RoundRobinLoadBalancer.getInstance(regionId);
for (int i = 0; i < size; i++) {
final PeerId candidate = balancer.select(peerList);
final Endpoint luckyOne = candidate.getEndpoint();
if (!luckyOne.equals(unExpect)) {
return luckyOne;
}
}
throw new RouteTableException("have no choice in group(peers): " + raftGroupId);
}
這個方法裡面也有一個是否要強制重新整理的判斷,和getLeader方法一樣,不再贅述。然後會判斷一下叢集裡面如果不止一個有效節點,那麼會呼叫輪詢策略來選取節點,這個輪詢的操作十分簡單,就是一個全域性的index每次呼叫加一,然後和傳入的peerList集合的size取模。
到這裡DefaultRheaKVRpcService的callAsyncWithRpc方法就差不多講解完畢了,然後會向server端發起請求,在KVCommandProcessor處理BatchPutRequest請求。
Server端處理BatchPutRequest請求
BatchPutRequest的請求在KVCommandProcessor中被處理。
KVCommandProcessor#handleRequest
public void handleRequest(final BizContext bizCtx, final AsyncContext asyncCtx, final T request) {
Requires.requireNonNull(request, "request");
final RequestProcessClosure<BaseRequest, BaseResponse<?>> closure = new RequestProcessClosure<>(request,
bizCtx, asyncCtx);
//根據傳入的RegionId去找到對應的RegionKVService
//每個 RegionKVService 對應一個 Region,只處理本身 Region 範疇內的請求
final RegionKVService regionKVService = this.storeEngine.getRegionKVService(request.getRegionId());
if (regionKVService == null) {
//如果不存在則返回空
final NoRegionFoundResponse noRegion = new NoRegionFoundResponse();
noRegion.setRegionId(request.getRegionId());
noRegion.setError(Errors.NO_REGION_FOUND);
noRegion.setValue(false);
closure.sendResponse(noRegion);
return;
}
switch (request.magic()) {
case BaseRequest.PUT:
regionKVService.handlePutRequest((PutRequest) request, closure);
break;
case BaseRequest.BATCH_PUT:
regionKVService.handleBatchPutRequest((BatchPutRequest) request, closure);
break;
.....
default:
throw new RheaRuntimeException("Unsupported request type: " + request.getClass().getName());
}
}
handleRequest首先會根據RegionId去找RegionKVService,RegionKVService在初始化RegionEngine的時候會註冊到regionKVServiceTable中。
然後根據請求的型別判斷request是什麼請求。這裡我們省略其他請求,只看BATCH_PUT是怎麼做的。
在往下講程式碼之前,我先來給個流程呼叫指指路:
BATCH_PUT對應會呼叫到DefaultRegionKVService的handleBatchPutRequest方法中 。
DefaultRegionKVService#handleBatchPutRequest
public void handlePutRequest(final PutRequest request,
final RequestProcessClosure<BaseRequest, BaseResponse<?>> closure) {
//設定一個響應response
final PutResponse response = new PutResponse();
response.setRegionId(getRegionId());
response.setRegionEpoch(getRegionEpoch());
try {
KVParameterRequires.requireSameEpoch(request, getRegionEpoch());
final byte[] key = KVParameterRequires.requireNonNull(request.getKey(), "put.key");
final byte[] value = KVParameterRequires.requireNonNull(request.getValue(), "put.value");
//這個例項是MetricsRawKVStore
this.rawKVStore.put(key, value, new BaseKVStoreClosure() {
//設定回撥函式
@Override
public void run(final Status status) {
if (status.isOk()) {
response.setValue((Boolean) getData());
} else {
setFailure(request, response, status, getError());
}
closure.sendResponse(response);
}
});
} catch (final Throwable t) {
LOG.error("Failed to handle: {}, {}.", request, StackTraceUtil.stackTrace(t));
response.setError(Errors.forException(t));
closure.sendResponse(response);
}
}
handlePutRequest方法十分地簡單,通過獲取key和value之後呼叫MetricsRawKVStore的put方法,傳入key和value並設定回撥函式。
MetricsRawKVStore#put
public void put(final byte[] key, final byte[] value, final KVStoreClosure closure) {
final KVStoreClosure c = metricsAdapter(closure, PUT, 1, value.length);
//rawKVStore是RaftRawKVStore的例項
this.rawKVStore.put(key, value, c);
}
put方法會繼續呼叫RaftRawKVStore的put方法。
RaftRawKVStore#put
public void put(final byte[] key, final byte[] value, final KVStoreClosure closure) {
applyOperation(KVOperation.createPut(key, value), closure);
}
Put方法會呼叫KVOperation的靜態方法建立一個型別為put的KVOperation例項,然後呼叫applyOperation方法。
RaftRawKVStore#applyOperation
private void applyOperation(final KVOperation op, final KVStoreClosure closure) {
//這裡必須保證 Leader 節點操作申請任務
if (!isLeader()) {
closure.setError(Errors.NOT_LEADER);
closure.run(new Status(RaftError.EPERM, "Not leader"));
return;
}
final Task task = new Task();
//封裝資料
task.setData(ByteBuffer.wrap(Serializers.getDefault().writeObject(op)));
//封裝回調方法
task.setDone(new KVClosureAdapter(closure, op));
//呼叫NodeImpl的apply方法
this.node.apply(task);
}
applyOperation方法裡面會校驗是不是leader,如果不是leader那麼就不能執行任務申請的操作。然後例項化一個Task例項,設定資料和回撥Adapter後呼叫NodeImple的apply釋出任務。
NodeImpl#apply
public void apply(final Task task) {
//檢查Node是不是被關閉了
if (this.shutdownLatch != null) {
Utils.runClosureInThread(task.getDone(), new Status(RaftError.ENODESHUTDOWN, "Node is shutting down."));
throw new IllegalStateException("Node is shutting down");
}
//校驗不能為空
Requires.requireNonNull(task, "Null task");
//將task裡面的資料放入到LogEntry中
final LogEntry entry = new LogEntry();
entry.setData(task.getData());
//重試次數
int retryTimes = 0;
try {
//例項化一個Disruptor事件
final EventTranslator<LogEntryAndClosure> translator = (event, sequence) -> {
event.reset();
event.done = task.getDone();
event.entry = entry;
event.expectedTerm = task.getExpectedTerm();
};
while (true) {
//釋出事件後交給LogEntryAndClosureHandler事件處理器處理
if (this.applyQueue.tryPublishEvent(translator)) {
break;
} else {
retryTimes++;
//最多重試3次
if (retryTimes > MAX_APPLY_RETRY_TIMES) {
//不成功則進行回撥,通知處理狀態
Utils.runClosureInThread(task.getDone(),
new Status(RaftError.EBUSY, "Node is busy, has too many tasks."));
LOG.warn("Node {} applyQueue is overload.", getNodeId());
this.metrics.recordTimes("apply-task-overload-times", 1);
return;
}
ThreadHelper.onSpinWait();
}
}
} catch (final Exception e) {
Utils.runClosureInThread(task.getDone(), new Status(RaftError.EPERM, "Node is down."));
}
}
在apply方法裡面會將資料封裝到LogEntry例項中,然後將LogEntry打包成一個Disruptor事件釋出到applyQueue佇列裡面去。applyQueue佇列在NodeImpl的init方法裡面初始化,並設定處理器為LogEntryAndClosureHandler。
LogEntryAndClosureHandler#onEvent
private final List<LogEntryAndClosure> tasks = new ArrayList<>(NodeImpl.this.raftOptions.getApplyBatch());
@Override
public void onEvent(final LogEntryAndClosure event, final long sequence, final boolean endOfBatch)
throws Exception {
//如果接收到了要關閉的請求
if (event.shutdownLatch != null) {
//tasks佇列裡面的任務又不為空,那麼先處理佇列裡面的資料
if (!this.tasks.isEmpty()) {
//處理tasks
executeApplyingTasks(this.tasks);
}
final int num = GLOBAL_NUM_NODES.decrementAndGet();
LOG.info("The number of active nodes decrement to {}.", num);
event.shutdownLatch.countDown();
return;
}
//將新的event加入到tasks中
this.tasks.add(event);
//因為設定了32為一個批次,所以如果tasks裡面的任務達到了32或者已經是最後一個event,
// 那麼就執行tasks集合裡面的資料
if (this.tasks.size() >= NodeImpl.this.raftOptions.getApplyBatch() || endOfBatch) {
executeApplyingTasks(this.tasks);
this.tasks.clear();
}
}
onEvent方法會校驗收到的事件是否是請求關閉佇列,如果是的話,那麼會先把tasks集合裡面的資料執行完畢再返回。如果是正常的事件,那麼校驗一下tasks集合裡面的個數是不是已經到達了32個,或者是不是已經是最後一個事件了,那麼會執行executeApplyingTasks進行批量處理資料。
NodeImpl#executeApplyingTasks
private void executeApplyingTasks(final List<LogEntryAndClosure> tasks) {
this.writeLock.lock();
try {
final int size = tasks.size();
//如果當前節點不是leader,那麼就不往下進行
if (this.state != State.STATE_LEADER) {
final Status st = new Status();
if (this.state != State.STATE_TRANSFERRING) {
st.setError(RaftError.EPERM, "Is not leader.");
} else {
st.setError(RaftError.EBUSY, "Is transferring leadership.");
}
LOG.debug("Node {} can't apply, status={}.", getNodeId(), st);
//處理所有的LogEntryAndClosure,傳送回撥響應
for (int i = 0; i < size; i++) {
Utils.runClosureInThread(tasks.get(i).done, st);
}
return;
}
final List<LogEntry> entries = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
final LogEntryAndClosure task = tasks.get(i);
//如果任其不對,那麼直接呼叫回撥函式傳送Error
if (task.expectedTerm != -1 && task.expectedTerm != this.currTerm) {
LOG.debug("Node {} can't apply task whose expectedTerm={} doesn't match currTerm={}.", getNodeId(),
task.expectedTerm, this.currTerm);
if (task.done != null) {
final Status st = new Status(RaftError.EPERM, "expected_term=%d doesn't match current_term=%d",
task.expectedTerm, this.currTerm);
Utils.runClosureInThread(task.done, st);
}
continue;
}
//儲存應用上下文
if (!this.ballotBox.appendPendingTask(this.conf.getConf(),
this.conf.isStable() ? null : this.conf.getOldConf(), task.done)) {
Utils.runClosureInThread(task.done, new Status(RaftError.EINTERNAL, "Fail to append task."));
continue;
}
// set task entry info before adding to list.
task.entry.getId().setTerm(this.currTerm);
//設定entry的型別為ENTRY_TYPE_DATA
task.entry.setType(EnumOutter.EntryType.ENTRY_TYPE_DATA);
entries.add(task.entry);
}
//批量提交申請任務日誌寫入 RocksDB
this.logManager.appendEntries(entries, new LeaderStableClosure(entries));
// update conf.first
this.conf = this.logManager.checkAndSetConfiguration(this.conf);
} finally {
this.writeLock.unlock();
}
}
executeApplyingTasks中會校驗當前的節點是不是leader,因為Raft 副本節點 Node 執行申請任務檢查當前狀態是否為 STATE_LEADER,必須保證 Leader 節點操作申請任務。
迴圈遍歷節點服務事件判斷任務的預估任期是否等於當前節點任期,Leader 沒有發生變更的階段內提交的日誌擁有相同的 Term 編號,節點 Node 任期滿足預期則 Raft 協議投票箱 BallotBox 呼叫 appendPendingTask(conf, oldConf, done) 日誌複製之前儲存應用上下文,即基於當前節點配置以及原始配置建立選票 Ballot 新增到選票雙向佇列 pendingMetaQueue。
然後日誌管理器 LogManager 呼叫底層日誌儲存 LogStorage#appendEntries(entries) 批量提交申請任務日誌寫入 RocksDB。
接下來通過 Node#apply(task) 提交的申請任務最終將會複製應用到所有 Raft 節點上的狀態機,RheaKV 狀態機通過繼承 StateMachineAdapter 狀態機介面卡的 KVStoreStateMachine 表示。
Raft 狀態機 KVStoreStateMachine 呼叫 onApply(iterator) 方法按照提交順序應用任務列表到狀態機。
KVStoreStateMachine 狀態機迭代狀態輸出列表積攢鍵值狀態列表批量申請 RocksRawKVStore 呼叫 batch(kvStates) 方法執行相應鍵值操作儲存到 RocksDB。
總結
這一篇是相當的長流程也是非常的複雜,裡面的各個地方程式碼寫的都非常的縝密。我們主要介紹了putBatching皮處理器是怎麼使用Disruptor批量的處理資料,從而做到提升整體的吞吐量。還講解了在發起請求的時候是如何獲取server端的endpoint的。然後還了解了BatchPutRequest請求是怎麼被server處理的,以及在程式碼中怎麼體現通過Batch + 全非同步機制大幅度提升吞吐的