詳解RocketMQ中的Producer
上一篇部落格講解了如何安裝RocketMQ,並且也簡單的介紹了一下相關RocketMq的概念,那麼這篇部落格,來剖析一下MQ中的producer的角色,看看它是來幹什麼的?
上圖就是MQ中Producer的有關結構圖,下面來著重分析一下每個類的用途
1.MQAdmin:作為MQ應用層最底層的類,為我們提供了所有公共的方法,常用的有如下
根據key、主題名和佇列來建立Topic
void createTopic(final String key, final String newTopic, final int queueNum) throws MQClientException;
查詢訊息佇列中的偏移量
long maxOffset(final MessageQueue mq) throws MQClientException;
根據各種條件來查詢Message資訊
QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
final long end) throws MQClientException, InterruptedException;
2.MQProducer:用來發送生產者中的訊息,包含了start和shutdown以及各種send方法,其中send方法返回值為sendResult,裡面包含著SendStatus也就是傳送的狀態。send 訊息方法,只要不拋異常,就代表傳送成功。但是傳送成功會有多個狀態,在 sendResult 裡定義。
SEND_OK
訊息傳送成功
FLUSH_DISK_TIMEOUT
訊息傳送成功,但是伺服器刷盤超時,訊息已經進入伺服器佇列,只有此時伺服器宕機,訊息才會丟失
FLUSH_SLAVE_TIMEOUT
訊息傳送成功,但是伺服器同步到 Slave 時超時,訊息已經進入伺服器佇列,只有此時伺服器宕機,消
息才會丟失
SLAVE_NOT_AVAILABLE
訊息傳送成功,但是此時 slave 不可用,訊息已經進入伺服器佇列,只有此時伺服器宕機,訊息才會丟
3.ClientConfig:Client端公共的配置資訊,例如心跳數、持久化的時間間隔等
4.DefaultMQProducer:基礎的MQProducer,有一些基本的預設設定,供我們使用。例如預設的佇列數目、預設的超時時間等
下面通過一個例項來了解一下Producer中常用的操作
[java] view plain copy print ?
- <span style="font-family:Comic Sans MS;font-size:18px;">/**
- * @FileName: Producer.java
- * @Package:com.test
- * @Description: TODO
- * @author: LUCKY
- * @date:2015年12月28日 下午2:32:22
- * @version V1.0
- */
- package com.test;
- import java.util.List;
- import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
- import com.alibaba.rocketmq.client.producer.SendCallback;
- import com.alibaba.rocketmq.client.producer.SendResult;
- import com.alibaba.rocketmq.common.message.Message;
- import com.alibaba.rocketmq.common.message.MessageQueue;
- /**
- * @ClassName: Producer
- * @Description: 模擬生產者
- * @author: LUCKY
- * @date:2015年12月28日 下午2:32:22
- */
- public class ProducerTest {
- public static void main(String[] args) throws Exception {
- DefaultMQProducer producer = new DefaultMQProducer("Producer");
- // 必須要設定nameserver地址
- producer.setNamesrvAddr("100.66.154.81:9876");
- try {
- // producer.setClientIP("**");
- //設定例項名稱
- producer.setInstanceName("dd");
- //設定重試的次數
- producer.setRetryTimesWhenSendFailed(3);
- //開啟生產者
- producer.start();
- //建立一條訊息
- Message msg = new Message("PushTopic", "push", "1",
- "內容一".getBytes());
- //傳送訊息
- SendResult result = producer.send(msg);
- //傳送,並觸發回撥函式
- producer.send(msg, new SendCallback() {
- @Override
- //成功的回撥函式
- public void onSuccess(SendResult sendResult) {
- System.out.println(sendResult.getSendStatus());
- System.out.println("成功了");
- }
- @Override
- //出現異常的回撥函式
- public void onException(Throwable e) {
- System.out.println("失敗了"+e.getMessage());
- }
- });
- //獲取某個主題的訊息佇列
- List<MessageQueue> messageQueues = producer
- .fetchPublishMessageQueues("PushTopic");
- System.out.println(messageQueues.size());
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- producer.shutdown();
- }
- }
- }
- </span>
再分享一下我老師大神的人工智慧教程吧。零基礎!通俗易懂!風趣幽默!還帶黃段子!希望你也加入到我們人工智慧的隊伍中來!https://blog.csdn.net/jiangjunshow