1. 程式人生 > >RocketMQ原始碼解讀系列——2、filtersrv原始碼

RocketMQ原始碼解讀系列——2、filtersrv原始碼

1. Broker 所在的機器會啟勱多個 FilterServer 過濾程序
2. Consumer 啟動之後後,會向 FilterServer 上傳一個過濾的 Java 類
3. Consumer 從 FilterServer 拉訊息,FilterServer 將請求轉發給 Broker,FilterServer 從 Broker 收到訊息後,按照
Consumer 上傳的 Java 過濾程式做過濾,過濾完成後返回給 Consumer。
 

filtersrv的專案結構如下:

工程結構與namesrv主體基本一直,啟動類是FileterStartup.java,核心控制類是FiltersrvController,啟動服務時同樣是呼叫的remoting工程中的NettyRemotingServer.java類,原始碼如下:

而處理各種請求訊息的執行器是DefaultRequestProcessor.java

這些基本步驟與namesrv基本一致。

 

Startup中的主要方法原始碼:


        System.setProperty(RemotingCommand.RemotingVersionKey, Integer.toString(MQVersion.CurrentVersion));

        // Socket傳送緩衝區大小
        if (null == System.getProperty(NettySystemConfig.SystemPropertySocketSndbufSize)) {
            NettySystemConfig.SocketSndbufSize = 65535;
        }

        // Socket接收緩衝區大小
        if (null == System.getProperty(NettySystemConfig.SystemPropertySocketRcvbufSize)) {
            NettySystemConfig.SocketRcvbufSize = 1024;
        }

        try {
            // 檢測包衝突
            PackageConflictDetect.detectFastjson();

            // 解析命令列
            Options options = ServerUtil.buildCommandlineOptions(new Options());
            final CommandLine commandLine =
                    ServerUtil.parseCmdLine("mqfiltersrv", args, buildCommandlineOptions(options),
                        new PosixParser());
            if (null == commandLine) {
                System.exit(-1);
                return null;
            }

            // 初始化配置檔案
            final FiltersrvConfig filtersrvConfig = new FiltersrvConfig();
            final NettyServerConfig nettyServerConfig = new NettyServerConfig();

            if (commandLine.hasOption('c')) {
                String file = commandLine.getOptionValue('c');
                if (file != null) {
                    InputStream in = new BufferedInputStream(new FileInputStream(file));
                    Properties properties = new Properties();
                    properties.load(in);
                    MixAll.properties2Object(properties, filtersrvConfig);
                    System.out.println("load config properties file OK, " + file);
                    in.close();

                    String port = properties.getProperty("listenPort");
                    if (port != null) {
                        filtersrvConfig.setConnectWhichBroker(String.format("127.0.0.1:%s", port));
                    }
                }
            }

            // 強制設定為0,自動分配埠號
            nettyServerConfig.setListenPort(0);

            nettyServerConfig.setServerAsyncSemaphoreValue(filtersrvConfig.getFsServerAsyncSemaphoreValue());
            nettyServerConfig.setServerCallbackExecutorThreads(filtersrvConfig
                .getFsServerCallbackExecutorThreads());
            nettyServerConfig.setServerWorkerThreads(filtersrvConfig.getFsServerWorkerThreads());

            // 列印預設配置
            if (commandLine.hasOption('p')) {
                MixAll.printObjectProperties(null, filtersrvConfig);
                MixAll.printObjectProperties(null, nettyServerConfig);
                System.exit(0);
            }

            MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), filtersrvConfig);

            if (null == filtersrvConfig.getRocketmqHome()) {
                System.out.println("Please set the " + MixAll.ROCKETMQ_HOME_ENV
                        + " variable in your environment to match the location of the RocketMQ installation");
                System.exit(-2);
            }

            // 初始化Logback
            LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
            JoranConfigurator configurator = new JoranConfigurator();
            configurator.setContext(lc);
            lc.reset();
            configurator.doConfigure(filtersrvConfig.getRocketmqHome() + "/conf/logback_filtersrv.xml");
            log = LoggerFactory.getLogger(LoggerName.FiltersrvLoggerName);

            // 初始化服務控制物件
            final FiltersrvController controller =
                    new FiltersrvController(filtersrvConfig, nettyServerConfig);
            boolean initResult = controller.initialize();
            if (!initResult) {
                controller.shutdown();
                System.exit(-3);
            }

            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                private volatile boolean hasShutdown = false;
                private AtomicInteger shutdownTimes = new AtomicInteger(0);


                @Override
                public void run() {
                    synchronized (this) {
                        log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet());
                        if (!this.hasShutdown) {
                            this.hasShutdown = true;
                            long begineTime = System.currentTimeMillis();
                            controller.shutdown();
                            long consumingTimeTotal = System.currentTimeMillis() - begineTime;
                            log.info("shutdown hook over, consuming time total(ms): " + consumingTimeTotal);
                        }
                    }
                }
            }, "ShutdownHook"));

            return controller;
        }
        catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    

FiltersrvController中主要的屬性和方法:

/**
     *  Filter Server配置
     */
    private final FiltersrvConfig filtersrvConfig;
    /**
     *  通訊層配置
     */
    private final NettyServerConfig nettyServerConfig;
    /**
     *  服務端通訊層物件
     */
    private RemotingServer remotingServer;
    /**
     *  服務端網路請求處理執行緒池
     */
    private ExecutorService remotingExecutor;
    /**
     * 過濾器類的快取類--在記憶體中儲存了過濾器類
     */
    private final FilterClassManager filterClassManager;

    /**
     *  訪問Broker的api封裝
     */
    private final FilterServerOuterAPI filterServerOuterAPI = new FilterServerOuterAPI();

    private final DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(
        MixAll.FILTERSRV_CONSUMER_GROUP);

    private volatile String brokerName = null;

    // 定時執行緒
    private final ScheduledExecutorService scheduledExecutorService = Executors
        .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSScheduledThread"));
    /**
     * FilterServer的執行快照管理---當前版本start方法的方法體為空
     */
    private final FilterServerStatsManager filterServerStatsManager = new FilterServerStatsManager();


    public FiltersrvController(FiltersrvConfig filtersrvConfig, NettyServerConfig nettyServerConfig) {
        this.filtersrvConfig = filtersrvConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.filterClassManager = new FilterClassManager(this);
    }


    public boolean initialize() {
        // 列印伺服器配置引數
        MixAll.printObjectProperties(log, this.filtersrvConfig);

        // 初始化通訊層
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig);

        // 初始化執行緒池
        this.remotingExecutor =
                Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(),
                    new ThreadFactoryImpl("RemotingExecutorThread_"));

        this.registerProcessor();

        // 定時向Broker註冊自己
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                FiltersrvController.this.registerFilterServerToBroker();
            }
        }, 3, 10, TimeUnit.SECONDS);

        // 初始化PullConsumer引數,要比預設引數小。
        this.defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(this.defaultMQPullConsumer
            .getBrokerSuspendMaxTimeMillis() - 1000);
        this.defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(this.defaultMQPullConsumer
            .getConsumerTimeoutMillisWhenSuspend() - 1000);

        this.defaultMQPullConsumer.setNamesrvAddr(this.filtersrvConfig.getNamesrvAddr());
        this.defaultMQPullConsumer.setInstanceName(String.valueOf(UtilAll.getPid()));

        return true;
    }


    public String localAddr() {
        return String.format("%s:%d", this.filtersrvConfig.getFilterServerIP(),
            this.remotingServer.localListenPort());
    }

    /**
     * 註冊過濾服務到broker
     * 
     * @author: yangcheng
     */
    public void registerFilterServerToBroker() {
        try {
            RegisterFilterServerResponseHeader responseHeader =
                    this.filterServerOuterAPI.registerFilterServerToBroker(
                        this.filtersrvConfig.getConnectWhichBroker(), this.localAddr());
            this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper()
                .setDefaultBrokerId(responseHeader.getBrokerId());

            if (null == this.brokerName) {
                this.brokerName = responseHeader.getBrokerName();
            }

            log.info("register filter server<{}> to broker<{}> OK, Return: {} {}", //
                this.localAddr(),//
                this.filtersrvConfig.getConnectWhichBroker(),//
                responseHeader.getBrokerName(),//
                responseHeader.getBrokerId()//
            );
        }
        catch (Exception e) {
            log.warn("register filter server Exception", e);
            // 如果失敗,嘗試自殺
            log.warn("access broker failed, kill oneself");
            System.exit(-1);
        }
    }


    private void registerProcessor() {
        this.remotingServer
            .registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
    }


    public void start() throws Exception {
        this.defaultMQPullConsumer.start();
        this.remotingServer.start();
        this.filterServerOuterAPI.start();
        this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper()
            .setConnectBrokerByUser(true);
        this.filterClassManager.start();
        this.filterServerStatsManager.start();
    }

DefaultRequestProcessor

 /**
     * 引入Controller
     */
    private final FiltersrvController filtersrvController;


    public DefaultRequestProcessor(FiltersrvController filtersrvController) {
        this.filtersrvController = filtersrvController;
    }


    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
            throws Exception {
        if (log.isDebugEnabled()) {
            log.debug("receive request, {} {} {}",//
                request.getCode(), //
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
                request);
        }

        switch (request.getCode()) {
        /**
         * 向Filter Server註冊Class---Consumer發起
         */
        case RequestCode.REGISTER_MESSAGE_FILTER_CLASS:
            return registerMessageFilterClass(ctx, request);
        /**
         * 訂閱訊息
         */
        case RequestCode.PULL_MESSAGE:
            return pullMessageForward(ctx, request);
        }

        return null;
    }
 /**
     * 向consumer傳送訊息
     * @param group
     * @param topic
     * @param ctx
     * @param response
     * @param msgList
     * @author: yangcheng
     */
    private void returnResponse(final String group, final String topic, ChannelHandlerContext ctx,
            final RemotingCommand response, final List<MessageExt> msgList) {
        if (null != msgList) {
            ByteBuffer[] msgBufferList = new ByteBuffer[msgList.size()];
            int bodyTotalSize = 0;
            for (int i = 0; i < msgList.size(); i++) {
                try {
                    msgBufferList[i] = messageToByteBuffer(msgList.get(i));
                    bodyTotalSize += msgBufferList[i].capacity();
                }
                catch (Exception e) {
                    log.error("messageToByteBuffer UnsupportedEncodingException", e);
                }
            }

            ByteBuffer body = ByteBuffer.allocate(bodyTotalSize);
            for (ByteBuffer bb : msgBufferList) {
                bb.flip();
                body.put(bb);
            }

            response.setBody(body.array());

            // 統計
            this.filtersrvController.getFilterServerStatsManager().incGroupGetNums(group, topic,
                msgList.size());

            this.filtersrvController.getFilterServerStatsManager().incGroupGetSize(group, topic,
                bodyTotalSize);
        }

        try {
            ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        log.error("FilterServer response to " + future.channel().remoteAddress() + " failed",
                            future.cause());
                        log.error(response.toString());
                    }
                }
            });
        }
        catch (Throwable e) {
            log.error("FilterServer process request over, but response failed", e);
            log.error(response.toString());
        }
    }


    private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request)
            throws Exception {
        final RemotingCommand response =
                RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
        final PullMessageResponseHeader responseHeader =
                (PullMessageResponseHeader) response.readCustomHeader();
        final PullMessageRequestHeader requestHeader =
                (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);

        // 由於非同步返回,所以必須要設定
        response.setOpaque(request.getOpaque());

        DefaultMQPullConsumer pullConsumer = this.filtersrvController.getDefaultMQPullConsumer();
        final FilterClassInfo findFilterClass =
                this.filtersrvController.getFilterClassManager().findFilterClass(
                    requestHeader.getConsumerGroup(), requestHeader.getTopic());
        if (null == findFilterClass) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("Find Filter class failed, not registered");
            return response;
        }

        if (null == findFilterClass.getMessageFilter()) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("Find Filter class failed, registered but no class");
            return response;
        }

        responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);

        // 構造從Broker拉訊息的引數
        MessageQueue mq = new MessageQueue();
        mq.setTopic(requestHeader.getTopic());
        mq.setQueueId(requestHeader.getQueueId());
        mq.setBrokerName(this.filtersrvController.getBrokerName());
        long offset = requestHeader.getQueueOffset();
        int maxNums = requestHeader.getMaxMsgNums();

        final PullCallback pullCallback = new PullCallback() {

            @Override
            public void onSuccess(PullResult pullResult) {
                responseHeader.setMaxOffset(pullResult.getMaxOffset());
                responseHeader.setMinOffset(pullResult.getMinOffset());
                responseHeader.setNextBeginOffset(pullResult.getNextBeginOffset());
                response.setRemark(null);

                switch (pullResult.getPullStatus()) {
                case FOUND:
                    response.setCode(ResponseCode.SUCCESS);

                    List<MessageExt> msgListOK = new ArrayList<MessageExt>();
                    try {
                        for (MessageExt msg : pullResult.getMsgFoundList()) {
                            boolean match = findFilterClass.getMessageFilter().match(msg);
                            if (match) {
                                msgListOK.add(msg);
                            }
                        }

                        // 有訊息返回
                        if (!msgListOK.isEmpty()) {
                            returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx,
                                response, msgListOK);
                            return;
                        }
                        // 全部都被過濾掉了
                        else {
                            response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                        }
                    }
                    // 只要拋異常,就終止過濾,並返回客戶端異常
                    catch (Throwable e) {
                        final String error =
                                String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ",
                                    requestHeader.getConsumerGroup(), requestHeader.getTopic());
                        log.error(error, e);

                        response.setCode(ResponseCode.SYSTEM_ERROR);
                        response.setRemark(error + RemotingHelper.exceptionSimpleDesc(e));
                        returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx,
                            response, null);
                        return;
                    }

                    break;
                case NO_MATCHED_MSG:
                    response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                    break;
                case NO_NEW_MSG:
                    response.setCode(ResponseCode.PULL_NOT_FOUND);
                    break;
                case OFFSET_ILLEGAL:
                    response.setCode(ResponseCode.PULL_OFFSET_MOVED);
                    break;
                default:
                    break;
                }

                returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response,
                    null);
            }


            @Override
            public void onException(Throwable e) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("Pull Callback Exception, " + RemotingHelper.exceptionSimpleDesc(e));
                returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response,
                    null);
                return;
            }
        };

        pullConsumer.pullBlockIfNotFound(mq, null, offset, maxNums, pullCallback);

        return null;
    }


    private RemotingCommand registerMessageFilterClass(ChannelHandlerContext ctx, RemotingCommand request)
            throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final RegisterMessageFilterClassRequestHeader requestHeader =
                (RegisterMessageFilterClassRequestHeader) request
                    .decodeCommandCustomHeader(RegisterMessageFilterClassRequestHeader.class);

        try {
            boolean ok =
                    this.filtersrvController.getFilterClassManager().registerFilterClass(
                        requestHeader.getConsumerGroup(),//
                        requestHeader.getTopic(),//
                        requestHeader.getClassName(),//
                        requestHeader.getClassCRC(), //
                        request.getBody());// Body傳輸的是Java Source,必須UTF-8編碼
            if (!ok) {
                throw new Exception("registerFilterClass error");
            }
        }
        catch (Exception e) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark(RemotingHelper.exceptionSimpleDesc(e));
            return response;
        }

        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }