阿里雲MQ快速入門指導
官方文件
本文主要描述從開通 MQ 服務、建立 MQ 資源,到使用 MQ SDK 進行訊息收發的完整流程,旨在以最簡單明瞭的方式引導您快速上手 MQ,為進一步使用和熟悉 MQ 的功能提供入門。
訊息收發部分以 TCP 協議下呼叫 Java SDK 為例來演示。
MQ 快速接入流程圖:
步驟一:開通服務
請按照以下步驟開通 MQ 服務:
如果您已經開通 MQ 服務,請直接登入
步驟二:建立資源
在 MQ 訊息系統中,訊息生產者將訊息傳送到某個指定的訊息主題(Topic) ,而訊息消費者則通過訂閱該指定的 Topic 來獲取和消費訊息。
一個新的應用接入 MQ 需要先建立相關的 MQ 資源,包括:
注意:當您刪除資源,比如 Topic、訊息生產者、訊息消費者的時候,相關的資源也會在 10 分鐘內進行清理。
建立訊息主題(Topic)
訊息主題(Topic)是 MQ 裡對訊息進行的一級歸類,比如可以建立“Topic_Trade”這一主題用來識別交易類訊息。 使用MQ的第一步需要先為您的應用建立 Topic。
您可以按照以下步驟建立 Topic:
-
登入MQ 控制檯,預設進入Topic 管理頁面。
-
在頁面上方選擇相應的地域(比如公網域),然後單擊建立 Topic按鈕。
注意:
- 如果只是測試,或者需要在本地(非阿里雲 ECS 伺服器)使用 MQ 服務,請將 Topic 建立在公網環境。 生產端和消費端可以部署在本地或者部署在任意地域的 ECS 上,前提是本地伺服器或者相應的 ECS 需要能夠訪問公網。
- 如果在生產環境使用 MQ 服務,需要將應用程式部署在阿里雲 ECS 上,同時 Topic 也需要在應用程式所在的區域(即所部署的 ECS 區域)進行建立。
- Topic 不能跨域使用。 比如 Topic 建立在“華北 2”這個域,那麼訊息生產端和消費端也必須執行在“華北 2”的 ECS 上。
- 有關域的詳細介紹請參見 ECS 文件中的地域和可用區。
-
在建立 Topic對話方塊輸入 Topic 名稱及備註,單擊確定。 您建立的 Topic 將出現在 Topic 列表中。
注意:Topic 名稱必須全域性唯一。 如果名稱已經被其他使用者使用,您將無法建立相同名稱的 Topic。
訊息型別
訊息型別分為以下幾種:
- Kafka 訊息:相容 MQ 及 Kafka 協議(kafka-client-0.10 及以上版本)的訊息型別。
- 無序訊息:不保證先入先出(FIFO)的嚴格順序,包含普通訊息、定時/延時訊息、事務訊息。 建議建立不同的 Topic 來發送不同型別的訊息,例如用 Topic A 傳送普通訊息,Topic B 傳送事務訊息, Topic C 傳送延時/定時訊息。
- 全域性順序訊息:所有訊息嚴格按照 FIFO 的嚴格順序進行生產和消費。
- 分割槽順序訊息:訊息根據 sharding key 進行分割槽,提高整體併發度與使用效能。 同一個分割槽的訊息嚴格按照 FIFO 的嚴格順序進行生產和消費。
建立生產者(Producer ID)
建立好 Topic 後,要為這個 Topic 建立訊息生產端的資源,即建立 Producer ID。 一個 Topic 只能對應一個 Producer ID。
請按照以下步驟為您的 Topic 建立 Producer ID:
-
在 MQ 控制檯左側選單欄選擇Topic 管理。
-
在 Topic 列表中找到您剛剛建立的 Topic,單擊操作選項中的建立生產者。
-
在建立生產者對話方塊輸入 Producer ID,單擊確定。
注意:
- Producer ID 必須全域性唯一。 如果名稱已存在,您將無法建立相同名稱的 Producer ID。
- Topic 對應的生產端必須和這個 Topic 在同一個域,比如您在“公網”域建立了“Topic_open”,那麼和“Topic_open”對應的 Producer ID 也必須在同一個域。
- Producer ID 和 Topic 的關係是 1:N,即一個 Topic 只能繫結一個 Producer ID, 但是同一個 Producer ID 可以對應多個 Topic。
建立消費者(Consumer ID)
建立完訊息生產端後,您需要為 Topic 申請相應的訊息消費資源,即建立 Consumer ID。
請按以下步驟建立 Consumer ID:
-
在 MQ 控制檯左側選單欄選擇 Topic 管理。
-
找到您建立的 Topic,單擊右側操作選項裡的建立消費者。
-
在建立消費者對話方塊輸入 Consumer ID,單擊確定。
注意:
- Consumer ID 必須全域性唯一。 如果名稱已存在,您將無法建立相同名稱的 Consumer ID。
- Consumer ID 必須和對應的 Topic 在同一個域,比如“公網”域的“Topic_open”可繫結同在“公網”域的 Consumer ID “CID_123”,而“華北1”域內的“Topic_huabei1”則不能繫結該Consumer ID。
- Consumer ID 和 Topic 的關係是 N:N。 同一個 Consumer ID 可以訂閱多個 Topic,同一個 Topic 也可以對應多個 Consumer ID。
建立阿里雲 AccessKey 和 SecretKey
在呼叫 SDK/API 進行訊息傳送和訂閱的時候,除了需要指定建立的 Topic, Producer ID 以及 Consumer ID 以外,還需輸入您在 RAM 控制檯建立的身份驗證資訊,即 Access Key ID 和 Acess Key Secret。
關於如何建立 AccessKey 和 SecretKey, 請參閱建立AccessKey。
步驟三:獲取接入域名
在控制檯建立 Topic 資源後,您還需要通過控制檯獲取 Producer ID 的 TCP 接入域名和 Consumer ID 的 TCP 接入域名。
請按照以下步驟獲取 Producer ID 的接入域名或 Consumer ID 的接入域名:
-
在 MQ 控制檯左側選單欄,選擇生產者管理或消費者管理。
-
找到您建立的 Producer ID 或 Consumer ID,單擊右側操作列中的獲取接入點。
-
在獲取接入點提示框,單擊複製。
完成以上準備工作後,您就可以執行示例程式碼,用 MQ 進行訊息傳送和訂閱了。
步驟四:傳送訊息
您可以通過控制檯傳送訊息或者呼叫 SDK/API 傳送訊息。
-
控制檯傳送訊息:用於快速驗證 Topic 資源的可用性。
-
呼叫 SDK/API 傳送訊息:用於生產環境下使用 MQ。
通過控制檯傳送訊息
控制檯傳送訊息步驟如下:
-
在 MQ 控制檯的左側選單欄單擊生產者管理。
-
在列表中找到您剛剛建立的 Topic,單擊右側操作欄裡的傳送。
-
在傳送訊息對話方塊輸入訊息的具體內容,單擊確定。控制檯會返回訊息傳送成功通知以及相應的 Message ID。
呼叫 SDK/API 傳送訊息
在生產環境使用 MQ,建議呼叫 SDK/API 來進行訊息傳送。本文以 TCP 協議下呼叫 Java SDK 為例進行說明。如果需要使用其他協議或者開發語言,請參見相關幫助文件。
呼叫 TCP Java SDK 傳送訊息
-
通過下面兩種方式可以引入依賴(任選一種):
-
根據以下說明設定相關引數,執行示例程式碼:
說明:關於 TCP 接入點域名,請進入 MQ 控制檯的生產者管理頁面,在 PID 右側操作列單擊獲取接入點按鈕獲取。
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ProducerTest {
public static void main(String[] args) {
Properties properties = new Properties();
// 您在 MQ 控制檯建立的 Producer ID
properties.put(PropertyKeyConst.ProducerId, "XXX");
// 鑑權用 AccessKey,在阿里雲伺服器管理控制檯建立
properties.put(PropertyKeyConst.AccessKey,"XXX");
// 鑑權用 SecretKey,在阿里雲伺服器管理控制檯建立
properties.put(PropertyKeyConst.SecretKey, "XXX");
// 設定 TCP 接入域名,進入 MQ 控制檯的生產者管理頁面,在右側操作欄單擊獲取接入點獲取
// 此處以公有云公網地域接入點為例
properties.put(PropertyKeyConst.ONSAddr,
"http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
Producer producer = ONSFactory.createProducer(properties);
// 在傳送訊息前,必須呼叫 start 方法來啟動 Producer,只需呼叫一次即可
producer.start();
//迴圈傳送訊息
while(true){
Message msg = new Message( //
// 在控制檯建立的 Topic,即該訊息所屬的 Topic 名稱
"TopicTestMQ",
// Message Tag,
// 可理解為 Gmail 中的標籤,對訊息進行再歸類,方便 Consumer 指定過濾條件在 MQ 伺服器過濾
"TagA",
// Message Body
// 任何二進位制形式的資料, MQ 不做任何干預,
// 需要 Producer 與 Consumer 協商好一致的序列化和反序列化方式
"Hello MQ".getBytes());
// 設定代表訊息的業務關鍵屬性,請儘可能全域性唯一,以方便您在無法正常收到訊息情況下,可通過 MQ 控制檯查詢訊息並補發
// 注意:不設定也不會影響訊息正常收發
msg.setKey("ORDERID_100");
// 傳送訊息,只要不拋異常就是成功
// 列印 Message ID,以便用於訊息傳送狀態查詢
SendResult sendResult = producer.send(msg);
System.out.println("Send Message success. Message ID is: " + sendResult.getMessageId());
}
// 在應用退出前,可以銷燬 Producer 物件
// 注意:如果不銷燬也沒有問題
producer.shutdown();
}
}
檢視訊息是否傳送成功
訊息傳送後,您可以在控制檯檢視訊息傳送狀態,步驟如下:
-
在 MQ 控制檯左側選單欄中單擊訊息查詢。
-
在訊息查詢頁面,選擇按 Message ID 查詢標籤頁。
-
在搜尋框中輸入傳送訊息後返回的 Message ID,單擊搜尋查詢訊息傳送狀態。
“儲存時間”表示 MQ 服務端儲存這條訊息的時間。如果查詢到此訊息,表示訊息已經成功傳送到服務端。
注意:此步驟演示的是第一次使用 MQ 的場景,此時訂閱端從未啟動過,所以訊息狀態顯示暫無消費資料。要啟動訂閱端並進行訊息訂閱請繼續下一步操作訂閱訊息。更多訊息狀態請參見訊息查詢。
步驟五:訂閱訊息
訊息傳送成功後,需要啟動訂閱方進行訊息訂閱。本文以 TCP Java SDK 為例,介紹如何通過呼叫相關協議及開發語言的 SDK/API 來完成訊息訂閱。
呼叫 TCP Java SDK 訂閱訊息
您可以執行以下示例程式碼來啟動訂閱端,並測試訂閱訊息的功能。請按照說明正確設定相關引數。目前控制檯提供了 Java,C++, .NET的示例程式碼。
說明:關於 TCP 接入點域名,請進入 MQ 控制檯的消費者管理頁面,在 CID 右側操作列單擊獲取接入點按鈕獲取。
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ConsumerTest {
public static void main(String[] args) {
Properties properties = new Properties();
// 您在 MQ 控制檯建立的 Consumer ID
properties.put(PropertyKeyConst.ConsumerId, "XXX");
// 鑑權用 AccessKey,在阿里雲伺服器管理控制檯建立
properties.put(PropertyKeyConst.AccessKey, "XXX");
// 鑑權用 SecretKey,在阿里雲伺服器管理控制檯建立
properties.put(PropertyKeyConst.SecretKey, "XXX");
// 設定 TCP 接入域名,進入 MQ 控制檯的消費者管理頁面,在右側操作欄單擊獲取接入點獲取
// 此處以公有云公網地域接入點為例
properties.put(PropertyKeyConst.ONSAddr,
"http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicTestMQ", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
檢視訊息訂閱是否成功
完成上述步驟後,您可以在 MQ 控制檯檢視訂閱端是否啟動成功,即訊息訂閱是否成功。目前控制檯檢視消費者狀態僅支援 TCP 客戶端,暫不支援 HTTP 以及 MQTT 客戶端。
-
在 MQ 控制檯左側選單欄單擊消費者管理。
-
找到要檢視的 Topic,單擊右側操作選項裡的消費者狀態。 如果是否線上顯示為是,則說明訂閱端已成功啟動。如果消費者狀態是否線上顯示為否,說明消費端沒有啟動或者啟動失敗。
完成以上所有步驟後,您就成功接入了 MQ 服務,可以用 MQ 進行訊息傳送和訂閱了。