1. 程式人生 > >rocketmq原始碼分析(1)client模組--生產者的啟動

rocketmq原始碼分析(1)client模組--生產者的啟動

過程:從定義DefaultProducer到DefaultProducer.start()。
一.測試程式碼
測試程式碼:
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {

    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");//1.例項化
    producer.setNamesrvAddr("101.200.143.74:9876;123.56.70.138:9876");//2.設定namesrv
    producer.start();//3.producer開始
    SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//設定日期格式
    String currentTime=df.format(new Date());

    for (int i = 0; i < 10000000; i++)
        try {
                Thread.sleep(1000);
                Message msg = new Message("topic-1",// topic
                        currentTime.getBytes(RemotingHelper.DEFAULT_CHARSET));// body
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);

        } catch (Exception e) {
            e.printStackTrace();
        }

    producer.shutdown();
}

}
本節討論從定義DefaultProducer到DefaultProducer.start()。

二.時序圖
這裡寫圖片描述
最重要的兩個物件DefaultMQProducerImpl和mQClientFactory(類MQClientInstance)。

0.
DefaultMQProducer producer = new DefaultMQProducer(“ProducerGroupName”),例項化並且DefaultMQProducer與DefaultMQProducerImpl做關聯,相互引用對方物件。
呼叫:
public DefaultMQProducer(final String producerGroup) {
this(producerGroup, null);
}
然後:
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
defaultMQProducerImpl是DefaultMQProducer的屬性,且final 和 transient(不能序列化)。同時把 DefaultMQProducer例項化的物件給defaultMQProducerImpl。這樣類DefaultMQProducer和類DefaultMQProducerImpl實現了相互引用。
defaultMQProducerImpl內部:
屬性:private final DefaultMQProducer defaultMQProducer;

上面的構造方法:
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook;
}

1.defaultMQproducer.start():DefaultMQProducer呼叫DefaultMQProducerImpl的start.

2.DefaultMQProducerImpl.start():生產者開始,做了對生產者引數的驗證。()
1)呼叫this.defaultMQProducerImpl.start();//defaultMQProducerImpl是DefaultMQProducer的屬性,且final 和 transient(不能序列化)。

2)呼叫過載方法start(true):
public void start() throws MQClientException {
this.start(true);
}
3)分析start(true):

   public void start(final boolean startFactory) throws MQClientException {
     switch (this.serviceState) {
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;
            this.checkConfig();
            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                this.defaultMQProducer.changeInstanceNameToPID();
            }
            this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
//類MQClientFactory的物件mQClientFactory註冊生產者。
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
            }

            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());//維護topic與訂閱資訊TopicPublishInfo()的ConcurrentHashMap.

            if (startFactory) {
                mQClientFactory.start();//mq客戶端開始工作
            }

            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once, "//
                    + this.serviceState//
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
        default:
            break;
    }

    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
} 

類初始化private ServiceState serviceState = ServiceState.CREATE_JUST所以進入 switch,this.serviceState=CREATE_JUST。服務狀態變為this.serviceState = ServiceState.START_FAILED;
進入方法this.checkConfig():
Validators.checkGroup(this.defaultMQProducer.getProducerGroup());首先驗證ProducerGroup名稱的 有效性。包括,非空,正則是否合法,最長字元不能超過255.
不能是生產組名“DEFAULT_PRODUCER”和“CLIENT_INNER_PRODUCER”,這個是系統內部內建的方法組名。
this.defaultMQProducer.changeInstanceNameToPID():
把當前客戶端producer名改成pid.
ClientConfig是DefaultMQProducer的父類,changeInstanceNameToPID()是類ClientConfig的方法。
獲取了jvm程序id作為producer的intancename名。
String clientId = clientConfig.buildMQClientId();
為client分配id,規則:客戶端外網ip@jvm程序id。
根據client取factoryTable內的mqclientInstance例項。
MQClientInstance instance = this.factoryTable.get(clientId);
3.1用MQClientManager.getInstance()建立MQClientManager例項:
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);

3.2用方法MQClientManager.getAndCreateMQClientInstance建立類MQClientInstance的例項mQClientFactory:
在這裡例項化一個MQClientInstance.
方法體: public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
} else {
// TODO log
}
}
return instance;
}

instance =new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
類MQClientInstance的構造方法(初始化所有MQClientInstance內的類,):
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.clientConfig = clientConfig;//克隆的clientConfig賦給MQClientInstance
this.instanceIndex = instanceIndex;//此instance的索引給MQClientInstance
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
//client端與name端通訊服務
if (this.clientConfig.getNamesrvAddr() != null) {
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
log.info(“user specified name server address: {}”, this.clientConfig.getNamesrvAddr());
} this.clientId = clientId;

this.mQAdminImpl = new MQAdminImpl(this);//後臺管理服務

this.pullMessageService = new PullMessageService(this);//拉服務

this.rebalanceService = new RebalanceService(this);//負載均衡服務

this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
this.defaultMQProducer.resetClientConfig(clientConfig);

this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);

log.info("created a new client Instance, FactoryIndex: {} ClinetID: {} {} {}, serializeType={}", //
        this.instanceIndex, //
        this.clientId, //
        this.clientConfig, //
        MQVersion.getVersionDesc(MQVersion.CurrentVersion), RemotingCommand.getSerializeTypeConfigInThisServer());

}Address更新nameserver:
類MQClient
public void updateNameServerAddressList(final String addrs) {
List lst = new ArrayList();
String[] addrArray = addrs.split(“;”);
if (addrArray != null) {
for (String addr : addrArray) {
lst.add(addr);
} this.remotingClient.updateNameServerAddressList(lst);//更新用來通訊的客戶端的 //NameServerAddressLis
}
}
3.3 mQClientFactory的registerProducer():

   boolean registerOK =  mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);

4.1 mQClientFactory的start():

 mQClientFactory開始啟動所有的任務。

4.2.this.mQClientAPIImpl.start();
啟動client與name節點的通訊服務。
4.3 this.startScheduledTask();
啟動所有排程服務
4.4 this.pullMessageService.start();
啟動拉服務
4.5 this.rebalanceservice()
啟動負載均衡服務
4.6 this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
啟動推服務