1. 程式人生 > >說說MQ之RocketMQ(三)

說說MQ之RocketMQ(三)

RocketMQ 的主備模式

按之前所說,只有 RocketMQ 的多主多從非同步複製是可以生產使用的,因此只在這個場景下測試。另外,訊息採用 Push 順序模式消費。

假設叢集採用2主2備的模式,需要啟動4個 Broker,配置檔案如下,

brokerName=broker-a
brokerId=0
listenPort=10911
storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-async
storePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async/commitlog
brokerRole=ASYNC_MASTER
brokerName=broker-a
brokerId=1
listenPort=10921
storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-async-slave
storePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async-slave/commitlog
brokerRole=SLAVE
brokerName=broker-b
brokerId=0
listenPort=20911
storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-b-async
storePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-b-async/commitlog
brokerRole=ASYNC_MASTER
brokerRole=ASYNC_MASTER
brokerName=broker-b
brokerId=1
listenPort=20921
storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-b-async-slave
storePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-b-async-slave/commitlog
brokerRole=SLAVE

另外,每個機構共通的配置項如下,

brokerClusterName=DefaultCluster
brokerIP1=192.168.232.23
namesrvAddr=192.168.232.23:9876
deleteWhen=04
fileReservedTime=120
flushDiskType=ASYNC_FLUSH

其他設定均採用預設。啟動 NameServer 和所有 Broker,並試執行一下 Producer,然後看一下 TestTopic1 當前的情況,

$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1
{
        "brokerDatas":[
                {
                        "brokerAddrs":{0:"192.168.232.23:20911",1:"192.168.232.23:20921"
                        },
                        "brokerName":"broker-b"
                },
                {
                        "brokerAddrs":{0:"192.168.232.23:10911",1:"192.168.232.23:10921"
                        },
                        "brokerName":"broker-a"
                }
        ],
        "filterServerTable":{},
        "queueDatas":[
                {
                        "brokerName":"broker-a",
                        "perm":6,
                        "readQueueNums":4,
                        "topicSynFlag":0,
                        "writeQueueNums":4
                },
                {
                        "brokerName":"broker-b",
                        "perm":6,
                        "readQueueNums":4,
                        "topicSynFlag":0,
                        "writeQueueNums":4
                }
        ]
}

可見,TestTopic1 在2個 Broker 上,且每個 Broker 備機也在執行。下面開始主備切換的實驗,分別啟動 Consumer 和 Producer 程序,訊息採用 Pull 順序模式消費。在訊息傳送接收過程中,使用 kill -9 停掉 broker-a 的主程序,模擬突然宕機。此時,TestTopic1 的狀態如下,

$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1
{
        "brokerDatas":[
                {
                        "brokerAddrs":{0:"192.168.232.23:20911",1:"192.168.232.23:20921"
                        },
                        "brokerName":"broker-b"
                },
                {
                        "brokerAddrs":{1:"192.168.232.23:10921"
                        },
                        "brokerName":"broker-a"
                }
        ],
        "filterServerTable":{},
        "queueDatas":[
                {
                        "brokerName":"broker-a",
                        "perm":6,
                        "readQueueNums":4,
                        "topicSynFlag":0,
                        "writeQueueNums":4
                },
                {
                        "brokerName":"broker-b",
                        "perm":6,
                        "readQueueNums":4,
                        "topicSynFlag":0,
                        "writeQueueNums":4
                }
        ]
}

broker-a 的節點已經減少為只有1個從節點。然後啟動broker-a 的主節點,模擬恢復,再看一下 TestTopic1 的狀態,

$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1
{
        "brokerDatas":[
                {
                        "brokerAddrs":{0:"192.168.232.23:20911",1:"192.168.232.23:20921"
                        },
                        "brokerName":"broker-b"
                },
                {
                        "brokerAddrs":{0:"192.168.232.23:10911",1:"192.168.232.23:10921"
                        },
                        "brokerName":"broker-a"
                }
        ],
        "filterServerTable":{},
        "queueDatas":[
                {
                        "brokerName":"broker-a",
                        "perm":6,
                        "readQueueNums":4,
                        "topicSynFlag":0,
                        "writeQueueNums":4
                },
                {
                        "brokerName":"broker-b",
                        "perm":6,
                        "readQueueNums":4,
                        "topicSynFlag":0,
                        "writeQueueNums":4
                }
        ]
}

此時,RocketMQ 已經恢復。

再來看看 Producer 和 Consumer 的日誌,先看 Producer 的,如下,

......
00578SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000126F08, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2], queueOffset=141]
00579SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000126F9F, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3], queueOffset=141]
00580SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078D47, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=700]
00581SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078DDE, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=700]
00582SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078E75, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=699]
00583SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078F0C, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=699]
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
	at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
	at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
	at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
	at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
	at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
	at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
	at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
	at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
	at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
	at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
	at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
	at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
00588SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078FA3, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=701]
00589SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007903A, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=701]
00590SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000790D1, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=700]
00591SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079168, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=700]
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
	at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
	at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
	at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
	at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
	at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
	at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
	at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
	at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
	at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
	at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
	at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
	at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
00596SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000791FF, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=702]
00597SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079296, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=702]
00598SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007932D, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=701]
00599SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000793C4, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=701]
00600SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007945B, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=703]
00601SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000794F2, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=703]
00602SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079589, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=702]
00603SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079620, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=702]
......
01389SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000965BE, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=900]
01390SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000096655, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=899]
01391SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000966EC, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=899]
01392SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000127036, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0], queueOffset=143]
01393SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F00000000001270CD, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1], queueOffset=141]
01394SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000127164, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2], queueOffset=142]
01395SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F00000000001271FB, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3], queueOffset=142]
01396SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000096783, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=901]
01397SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000009681A, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=901]

日誌中顯示,在傳送完00583條訊息之後,開始發生異常 connect to <192.168.232.23:10911> failed,原因應該是 broker-a 的主節點被 kill 掉。之後,從00596條訊息開始,RocketMQ 又恢復正常,原因是 broker-b 已經開始提供服務,承擔了所有的工作。然後,又重新啟動了 broker-a 主節點,由於該節點的加入,從01392條訊息開始,broker-a 又開始恢復工作。實驗中可以驗證,RocketMQ 所謂的多主多備模式,實際上,備機被弱化到無以復加,在主節點宕機的時候,備機無法接替主機的工作,而只是將尚未傳送的資料傳送出去,由剩下的主節點接替工作。也就是說,N 主 N 備的 RocketMQ 叢集中,總共有 2N 臺機器,實際工作的只有 N 臺,如果有一臺掛了,就只有 N-1 臺工作了,機器的利用率太低了。

再來看一下 Consumer 的日誌,如下,

RocketMQ 00551PullResult [pullStatus=FOUND, nextBeginOffset=696, minOffset=0, maxOffset=696, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=695, sysFlag=0, bornTimestamp=1469175032446, bornHost=/192.168.234.98:51987, storeTimestamp=1469175020973, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007859C, commitLogOffset=492956, bodyCRC=943070764, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=696, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00559PullResult [pullStatus=FOUND, nextBeginOffset=697, minOffset=0, maxOffset=697, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=696, sysFlag=0, bornTimestamp=1469175032720, bornHost=/192.168.234.98:51987, storeTimestamp=1469175021247, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF00000000000787F8, commitLogOffset=493560, bodyCRC=921540126, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=697, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00567PullResult [pullStatus=FOUND, nextBeginOffset=698, minOffset=0, maxOffset=698, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=697, sysFlag=0, bornTimestamp=1469175033005, bornHost=/192.168.234.98:51987, storeTimestamp=1469175021533, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000078A54, commitLogOffset=494164, bodyCRC=2054744282, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=698, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00575PullResult [pullStatus=FOUND, nextBeginOffset=699, minOffset=0, maxOffset=699, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=698, sysFlag=0, bornTimestamp=1469175033286, bornHost=/192.168.234.98:51987, storeTimestamp=1469175021814, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000078CB0, commitLogOffset=494768, bodyCRC=225294519, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=699, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00583PullResult [pullStatus=FOUND, nextBeginOffset=700, minOffset=0, maxOffset=700, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=699, sysFlag=0, bornTimestamp=1469175033586, bornHost=/192.168.234.98:51987, storeTimestamp=1469175022113, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000078F0C, commitLogOffset=495372, bodyCRC=1670775117, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=700, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00591PullResult [pullStatus=FOUND, nextBeginOffset=701, minOffset=0, maxOffset=701, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=700, sysFlag=0, bornTimestamp=1469175037890, bornHost=/192.168.234.98:51987, storeTimestamp=1469175026418, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079168, commitLogOffset=495976, bodyCRC=344150304, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=701, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00599PullResult [pullStatus=FOUND, nextBeginOffset=702, minOffset=0, maxOffset=702, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=701, sysFlag=0, bornTimestamp=1469175042200, bornHost=/192.168.234.98:51987, storeTimestamp=1469175030734, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF00000000000793C4, commitLogOffset=496580, bodyCRC=442030354, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=702, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00603PullResult [pullStatus=FOUND, nextBeginOffset=703, minOffset=0, maxOffset=703, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=702, sysFlag=0, bornTimestamp=1469175042345, bornHost=/192.168.234.98:51987, storeTimestamp=1469175030872, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079620, commitLogOffset=497184, bodyCRC=688469276, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=703, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00607PullResult [pullStatus=FOUND, nextBeginOffset=704, minOffset=0, maxOffset=704, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=703, sysFlag=0, bornTimestamp=1469175042481, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031008, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007987C, commitLogOffset=497788, bodyCRC=778367237, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=704, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00611PullResult [pullStatus=FOUND, nextBeginOffset=705, minOffset=0, maxOffset=705, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=704, sysFlag=0, bornTimestamp=1469175042615, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031143, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079AD8, commitLogOffset=498392, bodyCRC=1578919281, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=705, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00615PullResult [pullStatus=FOUND, nextBeginOffset=706, minOffset=0, maxOffset=706, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=705, sysFlag=0, bornTimestamp=1469175042753, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031280, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079D34, commitLogOffset=498996, bodyCRC=1500619112, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=706, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00619PullResult [pullStatus=FOUND, nextBeginOffset=707, minOffset=0, maxOffset=707, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=706, sysFlag=0, bornTimestamp=1469175042887, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031414, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079F90, commitLogOffset=499600, bodyCRC=1355279683, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=707, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00623PullResult [pullStatus=FOUND, nextBeginOffset=708, minOffset=0, maxOffset=708, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=707, sysFlag=0, bornTimestamp=1469175043021, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031548, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A1EC, commitLogOffset=500204, bodyCRC=457136030, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=708, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00627PullResult [pullStatus=FOUND, nextBeginOffset=709, minOffset=0, maxOffset=709, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=708, sysFlag=0, bornTimestamp=1469175043154, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031681, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A448, commitLogOffset=500808, bodyCRC=475173767, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=709, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00631PullResult [pullStatus=FOUND, nextBeginOffset=710, minOffset=0, maxOffset=710, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=709, sysFlag=0, bornTimestamp=1469175043299, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031826, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A6A4, commitLogOffset=501412, bodyCRC=1814693875, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=710, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00635PullResult [pullStatus=FOUND, nextBeginOffset=711, minOffset=0, maxOffset=711, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=710, sysFlag=0, bornTimestamp=1469175043435, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031962, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A900, commitLogOffset=502016, bodyCRC=1799865322, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=711, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
	at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(MQClientAPIImpl.java:518)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessage(MQClientAPIImpl.java:433)
	at com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(PullAPIWrapper.java:237)
	at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullSyncImpl(DefaultMQPullConsumerImpl.java:304)
	at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullBlockIfNotFound(DefaultMQPullConsumerImpl.java:425)
	at com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer.pullBlockIfNotFound(DefaultMQPullConsumer.java:321)
	at com.comstar.demo.rocketmq.simple.PullConsumer.main(PullConsumer.java:56)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
	at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(MQClientAPIImpl.java:518)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessage(MQClientAPIImpl.java:433)
	at com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(PullAPIWrapper.java:237)
	at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullSyncImpl(DefaultMQPullConsumerImpl.java:304)
	at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullBlockIfNotFound(DefaultMQPullConsumerImpl.java:425)
	at com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer.pullBlockIfNotFound(DefaultMQPullConsumer.java:321)
	at com.comstar.demo.rocketmq.simple.PullConsumer.main(PullConsumer.java:56)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
	at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(MQClientAPIImpl.java:518)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessage(MQClientAPIImpl.java:433)
	at com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(PullAPIWrapper.java:237)
	at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullSyncImpl(DefaultMQPullConsumerImpl.java:304)
	at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullBlockIfNotFound(DefaultMQPullConsumerImpl.java:425)
	at com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer.pullBlockIfNotFound(DefaultMQPullConsumer.java:321)
	at com.comstar.demo.rocketmq.simple.PullConsumer.main(PullConsumer.java:56)
Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=148, queueOffset=0, sysFlag=0, bornTimestamp=1468572196808, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191827, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011C60, commitLogOffset=72800, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]
Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=148, queueOffset=1, sysFlag=0, bornTimestamp=1468572196876, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191895, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011EB0, commitLogOffset=73392, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]
Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=148, queueOffset=2, sysFlag=0, bornTimestamp=1468572196903, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191928, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000012100, commitLogOffset=73984, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]
RocketMQ 00001PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=3, sysFlag=0, bornTimestamp=1468572718149, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713175, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000001222B, commitLogOffset=74283, bodyCRC=1133127810, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]
RocketMQ 00005PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=4, sysFlag=0, bornTimestamp=1468572718178, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713210, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000012487, commitLogOffset=74887, bodyCRC=1156050075, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]
......
[queueId=1, storeSize=151, queueOffset=22, sysFlag=0, bornTimestamp=1469170324786, bornHost=/192.168.234.98:49814, storeTimestamp=1469170313333, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010D3AA, commitLogOffset=1102762, bodyCRC=1707898805, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00477PullResult [pullStatus=FOUND, nextBeginOffset=62, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=23, sysFlag=0, bornTimestamp=1469170325237, bornHost=/192.168.234.98:49814, storeTimestamp=1469170313771, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010D606, commitLogOffset=1103366, bodyCRC=1654764460, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00481PullResult [pullStatus=FOUND, nextBeginOffset=62, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=24, sysFlag=0, bornTimestamp=1469170325652, bornHost=/192.168.234.98:49814, storeTimestamp=1469170314163, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010D862, commitLogOffset=1103970, bodyCRC=207227478, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00485PullResult [pullStatus=FOUND, nextBeginOffset=62, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=25, sysFlag=0, bornTimestamp=1469170326066, bornHost=/192.168.234.98:49814, storeTimestamp=1469170314595, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010DABE, commitLogOffset=1104574, bodyCRC=188206671, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
......
RocketMQ 01370PullResult [pullStatus=FOUND, nextBeginOffset=895, minOffset=0, maxOffset=895, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=894, sysFlag=0, bornTimestamp=1469175070573, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059101, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000095A89, commitLogOffset=613001, bodyCRC=1094080495, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=895, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 01374PullResult [pullStatus=FOUND, nextBeginOffset=896, minOffset=0, maxOffset=896, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=895, sysFlag=0, bornTimestamp=1469175070712, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059251, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000095CE5, commitLogOffset=613605, bodyCRC=1180406774, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=896, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 01378PullResult [pullStatus=FOUND, nextBeginOffset=897, minOffset=0, maxOffset=897, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=896, sysFlag=0, bornTimestamp=1469175070899, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059427, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000095F41, commitLogOffset=614209, bodyCRC=1340989405, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=897, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 01382PullResult [pullStatus=FOUND, nextBeginOffset=898, minOffset=0, maxOffset=898, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=897, sysFlag=0, bornTimestamp=1469175071054, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059582, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000009619D, commitLogOffset=614813, bodyCRC=681585164, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=898, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 01386PullResult [pullStatus=FOUND, nextBeginOffset=899, minOffset=0, maxOffset=899, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=898, sysFlag=0, bornTimestamp=1469175071203, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059731, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF00000000000963F9, commitLogOffset=615417, bodyCRC=802024981, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=899, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 01390PullResult [pullStatus=FOUND, nextBeginOffset=900, minOffset=0, maxOffset=900, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=899, sysFlag=0, bornTimestamp=1469175071338, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059866, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000096655, commitLogOffset=616021, bodyCRC=1605728865, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=900, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=0, sysFlag=0, bornTimestamp=1468571752640, bornHost=/192.168.234.98:56433, storeTimestamp=1468571747895, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011B38, commitLogOffset=72504, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]
Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=1, sysFlag=0, bornTimestamp=1468572196772, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191803, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011BCC, commitLogOffset=72652, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]
Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=2, sysFlag=0, bornTimestamp=1468572196865, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191886, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011E1C, commitLogOffset=73244, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]
Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=3, sysFlag=0, bornTimestamp=1468572196899, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191917, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000001206C, commitLogOffset=73836, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]
RocketMQ 00000PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=151, queueOffset=4, sysFlag=0, bornTimestamp=1468572718127, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713166, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000012194, commitLogOffset=74132, bodyCRC=881661972, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]
RocketMQ 00004PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=151, queueOffset=5, sysFlag=0, bornTimestamp=1468572718170, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713197, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F00000000000123F0, commitLogOffset=74736, bodyCRC=870374413, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]
......
RocketMQ 00560PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=140, sysFlag=0, bornTimestamp=1469175032756, bornHost=/192.168.234.98:51986, storeTimestamp=1469175021285, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000126922, commitLogOffset=1206562, bodyCRC=1679588729, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00568PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=141, sysFlag=0, bornTimestamp=1469175033043, bornHost=/192.168.234.98:51986, storeTimestamp=1469175021570, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000126B7E, commitLogOffset=1207166, bodyCRC=1791489355, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00576PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=142, sysFlag=0, bornTimestamp=1469175033320, bornHost=/192.168.234.98:51986, storeTimestamp=1469175021848, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000126DDA, commitLogOffset=1207770, bodyCRC=342157581, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 01392PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=143, sysFlag=0, bornTimestamp=1469175071411, bornHost=/192.168.234.98:52034, storeTimestamp=1469175059951, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000127036, commitLogOffset=1208374, bodyCRC=834345805, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 01400PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=144, sysFlag=0, bornTimestamp=1469175071746, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060289, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000127292, commitLogOffset=1208978, bodyCRC=188274605, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 01408PullResult [pullStatus=FOUND, nextBeginOffset=211, minOffset=0, maxOffset=211, msgFoundList=1]MessageExt [queueId=0, storeSize=151, queueOffset=145, sysFlag=0, bornTimestamp=1469175072078, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060614, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F00000000001274EE, commitLogOffset=1209582, bodyCRC=98787231, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=211, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 01416PullResult [pullStatus=FOUND, nextBeginOffset=214, minOffset=0, maxOffset=214, msgFoundList=3]MessageExt [queueId=0, storeSize=151, queueOffset=146, sysFlag=0, bornTimestamp=1469175072405, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060934, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000012774A, commitLogOffset=1210186, bodyCRC=2067809241, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=214, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]

可以看到,Consumer 在 broker-a 宕機時間的附近,也出現了異常,connect to <192.168.232.23:10911> failed。雖然還能保持分割槽上的順序性,但是已經某種程度上出現了一些紊亂,例如,將我在實驗之前的資料給取了出來(Hello MetaQ的訊息)。可是,我在實驗前,明明做過刪除這個 Topic 的動作,看來 RocketMQ 所謂的刪除,並未刪除 Topic 的資料。之後,broker-a 主機重啟之後,又恢復正常。

RocketMQ Pull模式消費需要手動管理 offset 和指定分割槽,這個在呼叫的時候不覺得,實際執行的時候才會發現每次總是消費一個分割槽,消費完之後,才開始消費下一個分割槽,而下一個分割槽可能已經堆積了很多訊息了,手動做訊息分配又比較費事。或許,Push 順序模式消費才是更好的選擇。

另外還有幾個比較異常的情況,實驗中有幾次出現了 CODE: 17 DESC: topic[TopicTest1] not exist, apply first please! 這樣的錯誤,實際上,這時候我只是關掉了 Producer;還有,sh mqadmin updateTopic –n 192.168.232.23:9876 –c DefaultCluster –t TopicTest1 明明文件中說可以用來新增 Topic,而實際上不行。

補充一下:之後,我又使用 Push 順序模式消費重做了上述實驗,結論差不多。只是因為有多執行緒的原因,日誌看起來偶爾有錯位,這個問題不大,可以解決。而且,在關閉重啟 Broker 的附近,往往伴隨著多次的訊息重發,不過,RocketMQ 也不保證訊息只收到一次就是了。訊息重複的問題,Kafka 要比 RocketMQ 顯得不那麼嚴重一些。Push 順序模式消費不需要指定 offset,不需要指定分割槽,第二次啟動可以自動從前一次的 offset 後開始消費。功能上這個與 Kafka 的 Consumer 更類似,雖然 RocketMQ 採用的是非同步模式。

RocketMQ 最佳實踐

實際上,RocketMQ 自己就有一份《RocketMQ 最佳實踐》的文件,裡面提到了一些系統設計的問題,例如消費者要冪等,一個應用對應一個 Topic,如此等等。這些經驗不僅僅是對 RocketMQ 有用,對 Kafka 也頗有借鑑意義。

後記

這裡談談我對選擇 RocketMQ 還是 Kafka 的個人建議。以上已經做了多處 RocketMQ 和 Kafka 的對比,我個人覺得,Kafka 是一個不斷髮展中的系統,開源社群比 RocketMQ 要大,也要更活躍一些;另外,Kafka 最新版本已經有了同步複製,訊息可靠性更有保障;還有,Kafka 的分割槽機制,幾乎實現了自動負載均衡,這絕對是個殺手級特性;RocketMQ 雖然提供了很多易用的功能,遠超出 Kafka,但這些功能並不一定都能用得上,而且多數可以繞過。相比之下,Kafka 的基本功能更加吸引我,再處理故障恢復的時候,細節上要勝過 RocketMQ。當然,如果是 A 公司內部,或者所在公司使用了 A 公司的雲產品,那麼 RocketMQ 的企業級特性更多一些,或許我會選擇 RocketMQ。