rocketmq原始碼分析(1)client模組--生產者的啟動
過程:從定義DefaultProducer到DefaultProducer.start()。
一.測試程式碼
測試程式碼:
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");//1.例項化 producer.setNamesrvAddr("101.200.143.74:9876;123.56.70.138:9876");//2.設定namesrv producer.start();//3.producer開始 SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//設定日期格式 String currentTime=df.format(new Date()); for (int i = 0; i < 10000000; i++) try { Thread.sleep(1000); Message msg = new Message("topic-1",// topic currentTime.getBytes(RemotingHelper.DEFAULT_CHARSET));// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); }
}
本節討論從定義DefaultProducer到DefaultProducer.start()。
二.時序圖
最重要的兩個物件DefaultMQProducerImpl和mQClientFactory(類MQClientInstance)。
0.
DefaultMQProducer producer = new DefaultMQProducer(“ProducerGroupName”),例項化並且DefaultMQProducer與DefaultMQProducerImpl做關聯,相互引用對方物件。
呼叫:
public DefaultMQProducer(final String producerGroup) {
this(producerGroup, null);
}
然後:
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
defaultMQProducerImpl是DefaultMQProducer的屬性,且final 和 transient(不能序列化)。同時把 DefaultMQProducer例項化的物件給defaultMQProducerImpl。這樣類DefaultMQProducer和類DefaultMQProducerImpl實現了相互引用。
defaultMQProducerImpl內部:
屬性:private final DefaultMQProducer defaultMQProducer;
上面的構造方法:
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook;
}
1.defaultMQproducer.start():DefaultMQProducer呼叫DefaultMQProducerImpl的start.
2.DefaultMQProducerImpl.start():生產者開始,做了對生產者引數的驗證。()
1)呼叫this.defaultMQProducerImpl.start();//defaultMQProducerImpl是DefaultMQProducer的屬性,且final 和 transient(不能序列化)。
2)呼叫過載方法start(true):
public void start() throws MQClientException {
this.start(true);
}
3)分析start(true):
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
//類MQClientFactory的物件mQClientFactory註冊生產者。
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());//維護topic與訂閱資訊TopicPublishInfo()的ConcurrentHashMap.
if (startFactory) {
mQClientFactory.start();//mq客戶端開始工作
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "//
+ this.serviceState//
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
類初始化private ServiceState serviceState = ServiceState.CREATE_JUST所以進入 switch,this.serviceState=CREATE_JUST。服務狀態變為this.serviceState = ServiceState.START_FAILED;
進入方法this.checkConfig():
Validators.checkGroup(this.defaultMQProducer.getProducerGroup());首先驗證ProducerGroup名稱的 有效性。包括,非空,正則是否合法,最長字元不能超過255.
不能是生產組名“DEFAULT_PRODUCER”和“CLIENT_INNER_PRODUCER”,這個是系統內部內建的方法組名。
this.defaultMQProducer.changeInstanceNameToPID():
把當前客戶端producer名改成pid.
ClientConfig是DefaultMQProducer的父類,changeInstanceNameToPID()是類ClientConfig的方法。
獲取了jvm程序id作為producer的intancename名。
String clientId = clientConfig.buildMQClientId();
為client分配id,規則:客戶端外網ip@jvm程序id。
根據client取factoryTable內的mqclientInstance例項。
MQClientInstance instance = this.factoryTable.get(clientId);
3.1用MQClientManager.getInstance()建立MQClientManager例項:
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
3.2用方法MQClientManager.getAndCreateMQClientInstance建立類MQClientInstance的例項mQClientFactory:
在這裡例項化一個MQClientInstance.
方法體: public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
} else {
// TODO log
}
}
return instance;
}
instance =new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
類MQClientInstance的構造方法(初始化所有MQClientInstance內的類,):
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.clientConfig = clientConfig;//克隆的clientConfig賦給MQClientInstance
this.instanceIndex = instanceIndex;//此instance的索引給MQClientInstance
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
//client端與name端通訊服務
if (this.clientConfig.getNamesrvAddr() != null) {
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
log.info(“user specified name server address: {}”, this.clientConfig.getNamesrvAddr());
} this.clientId = clientId;
this.mQAdminImpl = new MQAdminImpl(this);//後臺管理服務
this.pullMessageService = new PullMessageService(this);//拉服務
this.rebalanceService = new RebalanceService(this);//負載均衡服務
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
this.defaultMQProducer.resetClientConfig(clientConfig);
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
log.info("created a new client Instance, FactoryIndex: {} ClinetID: {} {} {}, serializeType={}", //
this.instanceIndex, //
this.clientId, //
this.clientConfig, //
MQVersion.getVersionDesc(MQVersion.CurrentVersion), RemotingCommand.getSerializeTypeConfigInThisServer());
}Address更新nameserver:
類MQClient
public void updateNameServerAddressList(final String addrs) {
List lst = new ArrayList();
String[] addrArray = addrs.split(“;”);
if (addrArray != null) {
for (String addr : addrArray) {
lst.add(addr);
} this.remotingClient.updateNameServerAddressList(lst);//更新用來通訊的客戶端的 //NameServerAddressLis
}
}
3.3 mQClientFactory的registerProducer():
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
4.1 mQClientFactory的start():
mQClientFactory開始啟動所有的任務。
4.2.this.mQClientAPIImpl.start();
啟動client與name節點的通訊服務。
4.3 this.startScheduledTask();
啟動所有排程服務
4.4 this.pullMessageService.start();
啟動拉服務
4.5 this.rebalanceservice()
啟動負載均衡服務
4.6 this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
啟動推服務