RocketMQ原始碼解讀系列——2、filtersrv原始碼
阿新 • • 發佈:2018-12-22
1. Broker 所在的機器會啟勱多個 FilterServer 過濾程序
2. Consumer 啟動之後後,會向 FilterServer 上傳一個過濾的 Java 類
3. Consumer 從 FilterServer 拉訊息,FilterServer 將請求轉發給 Broker,FilterServer 從 Broker 收到訊息後,按照
Consumer 上傳的 Java 過濾程式做過濾,過濾完成後返回給 Consumer。
filtersrv的專案結構如下:
工程結構與namesrv主體基本一直,啟動類是FileterStartup.java,核心控制類是FiltersrvController,啟動服務時同樣是呼叫的remoting工程中的NettyRemotingServer.java類,原始碼如下:
而處理各種請求訊息的執行器是DefaultRequestProcessor.java
這些基本步驟與namesrv基本一致。
Startup中的主要方法原始碼:
System.setProperty(RemotingCommand.RemotingVersionKey, Integer.toString(MQVersion.CurrentVersion)); // Socket傳送緩衝區大小 if (null == System.getProperty(NettySystemConfig.SystemPropertySocketSndbufSize)) { NettySystemConfig.SocketSndbufSize = 65535; } // Socket接收緩衝區大小 if (null == System.getProperty(NettySystemConfig.SystemPropertySocketRcvbufSize)) { NettySystemConfig.SocketRcvbufSize = 1024; } try { // 檢測包衝突 PackageConflictDetect.detectFastjson(); // 解析命令列 Options options = ServerUtil.buildCommandlineOptions(new Options()); final CommandLine commandLine = ServerUtil.parseCmdLine("mqfiltersrv", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); return null; } // 初始化配置檔案 final FiltersrvConfig filtersrvConfig = new FiltersrvConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { InputStream in = new BufferedInputStream(new FileInputStream(file)); Properties properties = new Properties(); properties.load(in); MixAll.properties2Object(properties, filtersrvConfig); System.out.println("load config properties file OK, " + file); in.close(); String port = properties.getProperty("listenPort"); if (port != null) { filtersrvConfig.setConnectWhichBroker(String.format("127.0.0.1:%s", port)); } } } // 強制設定為0,自動分配埠號 nettyServerConfig.setListenPort(0); nettyServerConfig.setServerAsyncSemaphoreValue(filtersrvConfig.getFsServerAsyncSemaphoreValue()); nettyServerConfig.setServerCallbackExecutorThreads(filtersrvConfig .getFsServerCallbackExecutorThreads()); nettyServerConfig.setServerWorkerThreads(filtersrvConfig.getFsServerWorkerThreads()); // 列印預設配置 if (commandLine.hasOption('p')) { MixAll.printObjectProperties(null, filtersrvConfig); MixAll.printObjectProperties(null, nettyServerConfig); System.exit(0); } MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), filtersrvConfig); if (null == filtersrvConfig.getRocketmqHome()) { System.out.println("Please set the " + MixAll.ROCKETMQ_HOME_ENV + " variable in your environment to match the location of the RocketMQ installation"); System.exit(-2); } // 初始化Logback LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(filtersrvConfig.getRocketmqHome() + "/conf/logback_filtersrv.xml"); log = LoggerFactory.getLogger(LoggerName.FiltersrvLoggerName); // 初始化服務控制物件 final FiltersrvController controller = new FiltersrvController(filtersrvConfig, nettyServerConfig); boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { private volatile boolean hasShutdown = false; private AtomicInteger shutdownTimes = new AtomicInteger(0); @Override public void run() { synchronized (this) { log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet()); if (!this.hasShutdown) { this.hasShutdown = true; long begineTime = System.currentTimeMillis(); controller.shutdown(); long consumingTimeTotal = System.currentTimeMillis() - begineTime; log.info("shutdown hook over, consuming time total(ms): " + consumingTimeTotal); } } } }, "ShutdownHook")); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null;
FiltersrvController中主要的屬性和方法:
/** * Filter Server配置 */ private final FiltersrvConfig filtersrvConfig; /** * 通訊層配置 */ private final NettyServerConfig nettyServerConfig; /** * 服務端通訊層物件 */ private RemotingServer remotingServer; /** * 服務端網路請求處理執行緒池 */ private ExecutorService remotingExecutor; /** * 過濾器類的快取類--在記憶體中儲存了過濾器類 */ private final FilterClassManager filterClassManager; /** * 訪問Broker的api封裝 */ private final FilterServerOuterAPI filterServerOuterAPI = new FilterServerOuterAPI(); private final DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer( MixAll.FILTERSRV_CONSUMER_GROUP); private volatile String brokerName = null; // 定時執行緒 private final ScheduledExecutorService scheduledExecutorService = Executors .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSScheduledThread")); /** * FilterServer的執行快照管理---當前版本start方法的方法體為空 */ private final FilterServerStatsManager filterServerStatsManager = new FilterServerStatsManager(); public FiltersrvController(FiltersrvConfig filtersrvConfig, NettyServerConfig nettyServerConfig) { this.filtersrvConfig = filtersrvConfig; this.nettyServerConfig = nettyServerConfig; this.filterClassManager = new FilterClassManager(this); } public boolean initialize() { // 列印伺服器配置引數 MixAll.printObjectProperties(log, this.filtersrvConfig); // 初始化通訊層 this.remotingServer = new NettyRemotingServer(this.nettyServerConfig); // 初始化執行緒池 this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); this.registerProcessor(); // 定時向Broker註冊自己 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { FiltersrvController.this.registerFilterServerToBroker(); } }, 3, 10, TimeUnit.SECONDS); // 初始化PullConsumer引數,要比預設引數小。 this.defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(this.defaultMQPullConsumer .getBrokerSuspendMaxTimeMillis() - 1000); this.defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(this.defaultMQPullConsumer .getConsumerTimeoutMillisWhenSuspend() - 1000); this.defaultMQPullConsumer.setNamesrvAddr(this.filtersrvConfig.getNamesrvAddr()); this.defaultMQPullConsumer.setInstanceName(String.valueOf(UtilAll.getPid())); return true; } public String localAddr() { return String.format("%s:%d", this.filtersrvConfig.getFilterServerIP(), this.remotingServer.localListenPort()); } /** * 註冊過濾服務到broker * * @author: yangcheng */ public void registerFilterServerToBroker() { try { RegisterFilterServerResponseHeader responseHeader = this.filterServerOuterAPI.registerFilterServerToBroker( this.filtersrvConfig.getConnectWhichBroker(), this.localAddr()); this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper() .setDefaultBrokerId(responseHeader.getBrokerId()); if (null == this.brokerName) { this.brokerName = responseHeader.getBrokerName(); } log.info("register filter server<{}> to broker<{}> OK, Return: {} {}", // this.localAddr(),// this.filtersrvConfig.getConnectWhichBroker(),// responseHeader.getBrokerName(),// responseHeader.getBrokerId()// ); } catch (Exception e) { log.warn("register filter server Exception", e); // 如果失敗,嘗試自殺 log.warn("access broker failed, kill oneself"); System.exit(-1); } } private void registerProcessor() { this.remotingServer .registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor); } public void start() throws Exception { this.defaultMQPullConsumer.start(); this.remotingServer.start(); this.filterServerOuterAPI.start(); this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper() .setConnectBrokerByUser(true); this.filterClassManager.start(); this.filterServerStatsManager.start(); }
DefaultRequestProcessor
/**
* 引入Controller
*/
private final FiltersrvController filtersrvController;
public DefaultRequestProcessor(FiltersrvController filtersrvController) {
this.filtersrvController = filtersrvController;
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws Exception {
if (log.isDebugEnabled()) {
log.debug("receive request, {} {} {}",//
request.getCode(), //
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
request);
}
switch (request.getCode()) {
/**
* 向Filter Server註冊Class---Consumer發起
*/
case RequestCode.REGISTER_MESSAGE_FILTER_CLASS:
return registerMessageFilterClass(ctx, request);
/**
* 訂閱訊息
*/
case RequestCode.PULL_MESSAGE:
return pullMessageForward(ctx, request);
}
return null;
}
/**
* 向consumer傳送訊息
* @param group
* @param topic
* @param ctx
* @param response
* @param msgList
* @author: yangcheng
*/
private void returnResponse(final String group, final String topic, ChannelHandlerContext ctx,
final RemotingCommand response, final List<MessageExt> msgList) {
if (null != msgList) {
ByteBuffer[] msgBufferList = new ByteBuffer[msgList.size()];
int bodyTotalSize = 0;
for (int i = 0; i < msgList.size(); i++) {
try {
msgBufferList[i] = messageToByteBuffer(msgList.get(i));
bodyTotalSize += msgBufferList[i].capacity();
}
catch (Exception e) {
log.error("messageToByteBuffer UnsupportedEncodingException", e);
}
}
ByteBuffer body = ByteBuffer.allocate(bodyTotalSize);
for (ByteBuffer bb : msgBufferList) {
bb.flip();
body.put(bb);
}
response.setBody(body.array());
// 統計
this.filtersrvController.getFilterServerStatsManager().incGroupGetNums(group, topic,
msgList.size());
this.filtersrvController.getFilterServerStatsManager().incGroupGetSize(group, topic,
bodyTotalSize);
}
try {
ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
log.error("FilterServer response to " + future.channel().remoteAddress() + " failed",
future.cause());
log.error(response.toString());
}
}
});
}
catch (Throwable e) {
log.error("FilterServer process request over, but response failed", e);
log.error(response.toString());
}
}
private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request)
throws Exception {
final RemotingCommand response =
RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
final PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader) response.readCustomHeader();
final PullMessageRequestHeader requestHeader =
(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
// 由於非同步返回,所以必須要設定
response.setOpaque(request.getOpaque());
DefaultMQPullConsumer pullConsumer = this.filtersrvController.getDefaultMQPullConsumer();
final FilterClassInfo findFilterClass =
this.filtersrvController.getFilterClassManager().findFilterClass(
requestHeader.getConsumerGroup(), requestHeader.getTopic());
if (null == findFilterClass) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("Find Filter class failed, not registered");
return response;
}
if (null == findFilterClass.getMessageFilter()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("Find Filter class failed, registered but no class");
return response;
}
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
// 構造從Broker拉訊息的引數
MessageQueue mq = new MessageQueue();
mq.setTopic(requestHeader.getTopic());
mq.setQueueId(requestHeader.getQueueId());
mq.setBrokerName(this.filtersrvController.getBrokerName());
long offset = requestHeader.getQueueOffset();
int maxNums = requestHeader.getMaxMsgNums();
final PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
responseHeader.setMaxOffset(pullResult.getMaxOffset());
responseHeader.setMinOffset(pullResult.getMinOffset());
responseHeader.setNextBeginOffset(pullResult.getNextBeginOffset());
response.setRemark(null);
switch (pullResult.getPullStatus()) {
case FOUND:
response.setCode(ResponseCode.SUCCESS);
List<MessageExt> msgListOK = new ArrayList<MessageExt>();
try {
for (MessageExt msg : pullResult.getMsgFoundList()) {
boolean match = findFilterClass.getMessageFilter().match(msg);
if (match) {
msgListOK.add(msg);
}
}
// 有訊息返回
if (!msgListOK.isEmpty()) {
returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx,
response, msgListOK);
return;
}
// 全部都被過濾掉了
else {
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
}
}
// 只要拋異常,就終止過濾,並返回客戶端異常
catch (Throwable e) {
final String error =
String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ",
requestHeader.getConsumerGroup(), requestHeader.getTopic());
log.error(error, e);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(error + RemotingHelper.exceptionSimpleDesc(e));
returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx,
response, null);
return;
}
break;
case NO_MATCHED_MSG:
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
break;
case NO_NEW_MSG:
response.setCode(ResponseCode.PULL_NOT_FOUND);
break;
case OFFSET_ILLEGAL:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
break;
default:
break;
}
returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response,
null);
}
@Override
public void onException(Throwable e) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("Pull Callback Exception, " + RemotingHelper.exceptionSimpleDesc(e));
returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response,
null);
return;
}
};
pullConsumer.pullBlockIfNotFound(mq, null, offset, maxNums, pullCallback);
return null;
}
private RemotingCommand registerMessageFilterClass(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final RegisterMessageFilterClassRequestHeader requestHeader =
(RegisterMessageFilterClassRequestHeader) request
.decodeCommandCustomHeader(RegisterMessageFilterClassRequestHeader.class);
try {
boolean ok =
this.filtersrvController.getFilterClassManager().registerFilterClass(
requestHeader.getConsumerGroup(),//
requestHeader.getTopic(),//
requestHeader.getClassName(),//
requestHeader.getClassCRC(), //
request.getBody());// Body傳輸的是Java Source,必須UTF-8編碼
if (!ok) {
throw new Exception("registerFilterClass error");
}
}
catch (Exception e) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(RemotingHelper.exceptionSimpleDesc(e));
return response;
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}