RocketMQ原始碼學習-通訊與協議
從github clone 最新原始碼,結構如下:
本篇文章要講的通訊與協議部分的原始碼在remoting模組下。remoting模組是複雜網路通訊的模組,為其他需要網路通訊的模組所依賴。在這個模組中,RocketMQ定義了基礎的通訊協議,結合Netty,使得端與端之間的資料互動變得統一而高效。
基本類圖
先來看這個模組的類關係圖:
針對每個類分別做解釋:
- RemotingService 為最上層介面
- RemotingClient 繼承自RemotingService,提供了client端介面定義
- RemotingServer 繼承自RemotingService,提供了server端介面定義
- NettyRemotingAbstract 使用Netty作為通訊框架的抽象類,包含很多公共的處理邏輯和資料結構
- NettyRemotingClient 繼承了NettyRemotingAbstract,並實現了RemotingClient介面,作為通訊的client端
- NettyRemotingServer 繼承了NettyRemotingAbstract,並實現了RemotingServer介面,作為通訊的server端
- NettyEvent、NettyEncoder、NettyDecoder、RemotingCommand等為通訊框架使用的類
通訊協議
RocketMQ的通訊協議如下:
整個通訊訊息分為四個部分:
- 整體訊息長度,佔用四個資格
- 序列化型別和訊息頭長度,一共佔用四個位元組,第一個位元組表示序列化型別,後三個位元組表示訊息頭長度
- 訊息頭資料
- 訊息主體資料
原始碼詳細分析之通訊流程
RemotingService
public interface RemotingService {
void start();
void shutdown();
void registerRPCHook(RPCHook rpcHook);
}
RemotingService定義了三個基本的方法,registerRPCHook()註冊一些鉤子,用來做一些通訊前後的處理。
RemotingClient 和 RemotingServer
public interface RemotingClient extends RemotingService {
void updateNameServerAddressList(final List<String> addrs);
List<String> getNameServerAddressList();
// 同步呼叫
RemotingCommand invokeSync(final String addr, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException;
// 非同步呼叫
void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
// 單向呼叫
void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException;
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
void setCallbackExecutor(final ExecutorService callbackExecutor);
ExecutorService getCallbackExecutor();
boolean isChannelWritable(final String addr);
}
public interface RemotingServer extends RemotingService {
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
int localListenPort();
Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
// 同步呼叫
RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
RemotingTimeoutException;
// 非同步呼叫
void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
// 單向呼叫
void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException;
}
比較重要的三個方法:同步呼叫、非同步呼叫和單向呼叫。
NettyRemotingAbstract 部分核心程式碼
先看NettyRemotingAbstract定義的屬性和構造方法
public abstract class NettyRemotingAbstract {
protected final Semaphore semaphoreOneway; // 單向呼叫訊號量,控制併發
protected final Semaphore semaphoreAsync; // 非同步呼叫訊號量,控制併發
protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
new ConcurrentHashMap<Integer, ResponseFuture>(256); // 處理中的請求,opaque是唯一的請求id
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable = // requestCode對應的請求處理Processor和執行緒池
new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor; // 預設處理Processor
public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) {
this.semaphoreOneway = new Semaphore(permitsOneway, true);
this.semaphoreAsync = new Semaphore(permitsAsync, true);
}
上面是定義的屬性,兩個訊號量用來控制單向呼叫和非同步呼叫的併發量。responseTable維護處理中的請求,請求處理完成後會進行剔除,詳見下面的scanResponseTable()方法。processorTable維護requestCode和處理執行緒的對映關係,處理請求時從processorTable根據requestCode查詢出Pair,使用Pair進行處理。構造方法初始化兩個訊號量。Semaphore繼承自AQS,這裡不詳細描述,可以參見AQS原始碼學習。
下面是處理請求的邏輯, processMessageReceived()為入口,根據請求型別(request or reponse)做不同處理
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
先來看處理Request的邏輯,使用processorTable中查詢的processor和執行緒池進行邏輯處理,並進行rpcHook的呼叫。
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());// 根據請求型別查詢處理的processor
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
if (rpcHook != null) { // 請求前置處理
rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
}
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
if (rpcHook != null) { // 請求後置處理
rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
}
if (!cmd.isOnewayRPC()) { // 非單向請求,回寫處理結果
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
// 略
}
} else {
}
}
} catch (Throwable e) {
// 略
}
}
};
if (pair.getObject1().rejectRequest()) { //拒絕請求
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}
try { // 提交任務到執行緒池
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
// 略
}
} else {
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}
處理Response請求的邏輯,會從responseTable剔除本次請求
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) { // 從處理中的請求集合中刪除本次請求
responseFuture.setResponseCommand(cmd);
responseTable.remove(opaque);
if (responseFuture.getInvokeCallback() != null) { // 回撥
executeInvokeCallback(responseFuture);
} else {
responseFuture.putResponse(cmd);
responseFuture.release();
}
} else {
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
}
}
// 回撥
private void executeInvokeCallback(final ResponseFuture responseFuture) {
boolean runInThisThread = false;
ExecutorService executor = this.getCallbackExecutor();
if (executor != null) {
try {
executor.submit(new Runnable() {
@Override
public void run() {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("execute callback in executor exception, and callback throw", e);
} finally {
responseFuture.release();
}
}
});
} catch (Exception e) {
runInThisThread = true;
log.warn("execute callback in executor exception, maybe executor busy", e);
}
} else {
runInThisThread = true;
}
if (runInThisThread) {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("executeInvokeCallback Exception", e);
} finally {
responseFuture.release();
}
}
}
同步請求實現,實際內部也是用了Netty的非同步方式實現的
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque();
try {
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}
非同步請求實現。需要獲取請求的許可
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
long beginStartTime = System.currentTimeMillis();
final int opaque = request.getOpaque();
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTooMuchRequestException("invokeAsyncImpl call timeout");
}
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
this.responseTable.put(opaque, responseFuture);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
}
requestFail(opaque);
log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
}
});
} catch (Exception e) {
responseFuture.release();
log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel)<