1. 程式人生 > >RocketMQ原始碼學習-通訊與協議

RocketMQ原始碼學習-通訊與協議

github clone 最新原始碼,結構如下:
原始碼結構圖
本篇文章要講的通訊與協議部分的原始碼在remoting模組下。remoting模組是複雜網路通訊的模組,為其他需要網路通訊的模組所依賴。在這個模組中,RocketMQ定義了基礎的通訊協議,結合Netty,使得端與端之間的資料互動變得統一而高效。

基本類圖

先來看這個模組的類關係圖:
通訊模組類圖
針對每個類分別做解釋:

  • RemotingService 為最上層介面
  • RemotingClient 繼承自RemotingService,提供了client端介面定義
  • RemotingServer 繼承自RemotingService,提供了server端介面定義
  • NettyRemotingAbstract 使用Netty作為通訊框架的抽象類,包含很多公共的處理邏輯和資料結構
  • NettyRemotingClient 繼承了NettyRemotingAbstract,並實現了RemotingClient介面,作為通訊的client端
  • NettyRemotingServer 繼承了NettyRemotingAbstract,並實現了RemotingServer介面,作為通訊的server端
  • NettyEvent、NettyEncoder、NettyDecoder、RemotingCommand等為通訊框架使用的類

通訊協議

RocketMQ的通訊協議如下:
整個通訊訊息分為四個部分:
通訊協議

  1. 整體訊息長度,佔用四個資格
  2. 序列化型別和訊息頭長度,一共佔用四個位元組,第一個位元組表示序列化型別,後三個位元組表示訊息頭長度
  3. 訊息頭資料
  4. 訊息主體資料

原始碼詳細分析之通訊流程

RemotingService

public interface RemotingService {
    void start();
    void shutdown();
    void registerRPCHook(RPCHook rpcHook);
}

RemotingService定義了三個基本的方法,registerRPCHook()註冊一些鉤子,用來做一些通訊前後的處理。

RemotingClient 和 RemotingServer

public interface RemotingClient extends RemotingService {
	
    void updateNameServerAddressList(final List<String> addrs);
	
    List<String> getNameServerAddressList();
	// 同步呼叫
    RemotingCommand invokeSync(final String addr, final RemotingCommand request,
        final long timeoutMillis) throws InterruptedException, RemotingConnectException,
        RemotingSendRequestException, RemotingTimeoutException;
	// 非同步呼叫
    void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
        RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
	// 單向呼叫
    void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
        RemotingTimeoutException, RemotingSendRequestException;
	
    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
        final ExecutorService executor);
	
    void setCallbackExecutor(final ExecutorService callbackExecutor);

    ExecutorService getCallbackExecutor();

    boolean isChannelWritable(final String addr);
}

public interface RemotingServer extends RemotingService {

    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
        final ExecutorService executor);

    void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);

    int localListenPort();

    Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
	// 同步呼叫
    RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
        final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
        RemotingTimeoutException;
	// 非同步呼叫
    void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback) throws InterruptedException,
        RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
	// 單向呼叫
    void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
        RemotingSendRequestException;

}

比較重要的三個方法:同步呼叫、非同步呼叫和單向呼叫。

NettyRemotingAbstract 部分核心程式碼

先看NettyRemotingAbstract定義的屬性和構造方法

public abstract class NettyRemotingAbstract {
    protected final Semaphore semaphoreOneway; // 單向呼叫訊號量,控制併發
    protected final Semaphore semaphoreAsync; // 非同步呼叫訊號量,控制併發
    protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
        new ConcurrentHashMap<Integer, ResponseFuture>(256); // 處理中的請求,opaque是唯一的請求id
    protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable = // requestCode對應的請求處理Processor和執行緒池
        new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
    protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor; // 預設處理Processor

    public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) {
        this.semaphoreOneway = new Semaphore(permitsOneway, true);
        this.semaphoreAsync = new Semaphore(permitsAsync, true);
    }

上面是定義的屬性,兩個訊號量用來控制單向呼叫和非同步呼叫的併發量。responseTable維護處理中的請求,請求處理完成後會進行剔除,詳見下面的scanResponseTable()方法。processorTable維護requestCode和處理執行緒的對映關係,處理請求時從processorTable根據requestCode查詢出Pair,使用Pair進行處理。構造方法初始化兩個訊號量。Semaphore繼承自AQS,這裡不詳細描述,可以參見AQS原始碼學習

下面是處理請求的邏輯, processMessageReceived()為入口,根據請求型別(request or reponse)做不同處理

   public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
       final RemotingCommand cmd = msg;
       if (cmd != null) {
           switch (cmd.getType()) {
               case REQUEST_COMMAND:
                   processRequestCommand(ctx, cmd);
                   break;
               case RESPONSE_COMMAND:
                   processResponseCommand(ctx, cmd);
                   break;
               default:
                   break;
           }
       }
   }

先來看處理Request的邏輯,使用processorTable中查詢的processor和執行緒池進行邏輯處理,並進行rpcHook的呼叫。

    public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
        final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());// 根據請求型別查詢處理的processor
        final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
        final int opaque = cmd.getOpaque();
        if (pair != null) {
            Runnable run = new Runnable() {
                @Override
                public void run() {
                    try {
                        RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
                        if (rpcHook != null) { // 請求前置處理
                            rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                        }
                        final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
                        if (rpcHook != null) { // 請求後置處理
                            rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                        }
                        if (!cmd.isOnewayRPC()) { // 非單向請求,回寫處理結果
                            if (response != null) {
                                response.setOpaque(opaque);
                                response.markResponseType();
                                try {
                                    ctx.writeAndFlush(response);
                                } catch (Throwable e) {
                                    // 略
                                }
                            } else {
                            }
                        }
                    } catch (Throwable e) {
							// 略
                    }
                }
            };
            if (pair.getObject1().rejectRequest()) { //拒絕請求
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[REJECTREQUEST]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
                return;
            }
            try { // 提交任務到執行緒池
                final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                pair.getObject2().submit(requestTask);
            } catch (RejectedExecutionException e) {
					// 略
            }
        } else {
            String error = " request type " + cmd.getCode() + " not supported";
            final RemotingCommand response =
                RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
            log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
        }
    }

處理Response請求的邏輯,會從responseTable剔除本次請求

    public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
        final int opaque = cmd.getOpaque();
        final ResponseFuture responseFuture = responseTable.get(opaque);
        if (responseFuture != null) { // 從處理中的請求集合中刪除本次請求
            responseFuture.setResponseCommand(cmd);
            responseTable.remove(opaque);
            if (responseFuture.getInvokeCallback() != null) { // 回撥
                executeInvokeCallback(responseFuture);
            } else {
                responseFuture.putResponse(cmd);
                responseFuture.release();
            }
        } else {
            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
            log.warn(cmd.toString());
        }
    }
	// 回撥
    private void executeInvokeCallback(final ResponseFuture responseFuture) {
        boolean runInThisThread = false;
        ExecutorService executor = this.getCallbackExecutor();
        if (executor != null) {
            try {
                executor.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            responseFuture.executeInvokeCallback();
                        } catch (Throwable e) {
                            log.warn("execute callback in executor exception, and callback throw", e);
                        } finally {
                            responseFuture.release();
                        }
                    }
                });
            } catch (Exception e) {
                runInThisThread = true;
                log.warn("execute callback in executor exception, maybe executor busy", e);
            }
        } else {
            runInThisThread = true;
        }
        if (runInThisThread) {
            try {
                responseFuture.executeInvokeCallback();
            } catch (Throwable e) {
                log.warn("executeInvokeCallback Exception", e);
            } finally {
                responseFuture.release();
            }
        }
    }

同步請求實現,實際內部也是用了Netty的非同步方式實現的

    public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
        final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        final int opaque = request.getOpaque();

        try {
            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
            this.responseTable.put(opaque, responseFuture);
            final SocketAddress addr = channel.remoteAddress();
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    } else {
                        responseFuture.setSendRequestOK(false);
                    }

                    responseTable.remove(opaque);
                    responseFuture.setCause(f.cause());
                    responseFuture.putResponse(null);
                    log.warn("send a request command to channel <" + addr + "> failed.");
                }
            });

            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            if (null == responseCommand) {
                if (responseFuture.isSendRequestOK()) {
                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                        responseFuture.getCause());
                } else {
                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }

            return responseCommand;
        } finally {
            this.responseTable.remove(opaque);
        }
    }

非同步請求實現。需要獲取請求的許可

    public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        long beginStartTime = System.currentTimeMillis();
        final int opaque = request.getOpaque();
        boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
            long costTime = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTime) {
                throw new RemotingTooMuchRequestException("invokeAsyncImpl call timeout");
            }

            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
            this.responseTable.put(opaque, responseFuture);
            try {
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        if (f.isSuccess()) {
                            responseFuture.setSendRequestOK(true);
                            return;
                        }
                        requestFail(opaque);
                        log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                    }
                });
            } catch (Exception e) {
                responseFuture.release();
                log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel)<