Rocket MQ傳送訊息的三種方式初析
前言
MQ 傳送訊息有三種實現方式:可靠同步傳送、可靠非同步傳送、單向(Oneway)傳送。基於版本4.2.0+。注意:順序訊息只支援可靠同步傳送。
可靠同步傳送
原理:同步傳送是指訊息傳送方發出資料後,會在收到接收方發回響應之後才發下一個數據包的通訊方式。
場景:此種方式應用場景非常廣泛,例如重要通知郵件、報名簡訊通知、營銷簡訊系統等。
可靠非同步傳送
原理:非同步傳送是指傳送方發出資料後,不等接收方發回響應,接著傳送下個數據包的通訊方式。 MQ 的非同步傳送,需要使用者實現非同步傳送回撥介面(SendCallback)。訊息傳送方在傳送了一條訊息後,不需要等待伺服器響應即可返回,進行第二條訊息傳送。傳送方通過回撥介面接收伺服器響應,並對響應結果進行處理。
場景:非同步傳送一般用於鏈路耗時較長,對 RT 響應時間較為敏感的業務場景,例如使用者視訊上傳後通知啟動轉碼服務,轉碼完成後通知推送轉碼結果等。
單向(Oneway)傳送
原理:單向(Oneway)傳送特點為傳送方只負責傳送訊息,不等待伺服器迴應且沒有回撥函式觸發,即只發送請求不等待應答。 此方式傳送訊息的過程耗時非常短,一般在微秒級別。
場景:適用於某些耗時非常短,但對可靠性要求並不高的場景,例如日誌收集。
三者的特點和主要區別。
傳送方式 | 傳送 TPS | 傳送結果反饋 | 可靠性 |
---|---|---|---|
同步傳送 | 快 | 有 | 不丟失 |
非同步傳送 | 快 | 有 | 不丟失 |
單向傳送 | 最快 | 無 | 可能丟失 |
code 實戰
同步傳送
public class ProducerTest { public static void main(String[] args) { Properties properties = new Properties(); Producer producer = ONSFactory.createProducer(properties); // 在傳送訊息前,必須呼叫 start 方法來啟動 Producer,只需呼叫一次即可 producer.start(); //迴圈傳送訊息 for (int i = 0; i < 100; i++){ Message msg = new Message( // // Message 所屬的 Topic "TopicTestMQ", // Message Tag 可理解為 Gmail 中的標籤,對訊息進行再歸類,方便 Consumer 指定過濾條件在 MQ 伺服器過濾 "TagA", // Message Body 可以是任何二進位制形式的資料, MQ 不做任何干預, // 需要 Producer 與 Consumer 協商好一致的序列化和反序列化方式 "Hello MQ".getBytes()); // 設定代表訊息的業務關鍵屬性,請儘可能全域性唯一。 // 以方便您在無法正常收到訊息情況下,可通過阿里雲伺服器管理控制檯查詢訊息並補發 // 注意:不設定也不會影響訊息正常收發 msg.setKey("ORDERID_" + i); try { SendResult sendResult = producer.send(msg); // 同步傳送訊息,只要不拋異常就是成功 if (sendResult != null) { System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId()); } } catch (Exception e) { // 訊息傳送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理 System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic()); e.printStackTrace(); } } // 在應用退出前,銷燬 Producer 物件 // 注意:如果不銷燬也沒有問題 producer.shutdown(); } }
非同步傳送
public static void main(String[] args) {
Properties properties = new Properties();
Producer producer = ONSFactory.createProducer(properties);
// 在傳送訊息前,必須呼叫 start 方法來啟動 Producer,只需呼叫一次即可。
producer.start();
Message msg = new Message(
// Message 所屬的 Topic
"TopicTestMQ",
// Message Tag,可理解為 Gmail 中的標籤,對訊息進行再歸類,方便 Consumer 指定過濾條件在 MQ 伺服器過濾
"TagA",
// Message Body,任何二進位制形式的資料,MQ 不做任何干預,需要 Producer 與 Consumer 協商好一致的序列化和反序列化方式
"Hello MQ".getBytes());
// 設定代表訊息的業務關鍵屬性,請儘可能全域性唯一。 以方便您在無法正常收到訊息情況下,可通過 MQ 控制檯查詢訊息並補發。
// 注意:不設定也不會影響訊息正常收發
msg.setKey("ORDERID_100");
// 非同步傳送訊息, 傳送結果通過 callback 返回給客戶端。
producer.sendAsync(msg, new SendCallback() {
@Override
public void onSuccess(final SendResult sendResult) {
// 消費傳送成功
System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
}
@Override
public void onException(OnExceptionContext context) {
// 訊息傳送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理
System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
}
});
// 在 callback 返回之前即可取得 msgId。
System.out.println("send message async. topic=" + msg.getTopic() + ", msgId=" + msg.getMsgID());
// 在應用退出前,銷燬 Producer 物件。 注意:如果不銷燬也沒有問題
producer.shutdown();
}
單向(Oneway)傳送
public static void main(String[] args) {
Properties properties = new Properties();
Producer producer = ONSFactory.createProducer(properties);
// 在傳送訊息前,必須呼叫 start 方法來啟動 Producer,只需呼叫一次即可。
producer.start();
//迴圈傳送訊息
for (int i = 0; i < 100; i++){
Message msg = new Message(
// Message 所屬的 Topic
"TopicTestMQ",
// Message Tag,
// 可理解為 Gmail 中的標籤,對訊息進行再歸類,方便 Consumer 指定過濾條件在 MQ 伺服器過濾
"TagA",
// Message Body
// 任何二進位制形式的資料,MQ 不做任何干預,需要 Producer 與 Consumer 協商好一致的序列化和反序列化方式
"Hello MQ".getBytes());
// 設定代表訊息的業務關鍵屬性,請儘可能全域性唯一。
// 以方便您在無法正常收到訊息情況下,可通過阿里雲伺服器管理控制檯查詢訊息並補發。
// 注意:不設定也不會影響訊息正常收發
msg.setKey("ORDERID_" + i);
// 由於在 oneway 方式傳送訊息時沒有請求應答處理,一旦出現訊息傳送失敗,則會因為沒有重試而導致資料丟失。若資料不可丟,建議選用可靠同步或可靠非同步傳送方式。
producer.sendOneway(msg);
}
// 在應用退出前,銷燬 Producer 物件
// 注意:如果不銷燬也沒有問題
producer.shutdown();
}
相關推薦
Rocket MQ傳送訊息的三種方式初析
前言 MQ 傳送訊息有三種實現方式:可靠同步傳送、可靠非同步傳送、單向(Oneway)傳送。基於版本4.2.0+。注意:順序訊息只支援可靠同步傳送。 可靠同步傳送 原理:同步傳送是指訊息傳送方發出
spring整合apache activemq實現訊息傳送的三種方式程式碼配置例項
我們專案中傳送事件告警要用到訊息佇列,所以學習了下activemq,整理如下: activemq的介紹就不用說了,官網上大家可以詳細的看到。 1.下載並安裝activemq:地址http://activemq.apache.org/activemq-590-rel
php 模擬http傳送請求三種方式(curl,stream流的方式,)
一,curl cURL 是一個用來傳輸資料的工具,支援多種協議,如在 Linux 下用 curl 命令列可以傳送各種 HTTP 請求。PHP 的 cURL 是一個底層的庫,它能根據不同協議跟各種伺服器通訊,HTTP 協議是其中一種。 post請求 public
MQ傳送普通訊息(三種方式)
MQ 傳送普通訊息有三種實現方式:可靠同步傳送、可靠非同步傳送、單向(Oneway)傳送。本文介紹了每種實現的原理、使用場景以及三種實現的異同,同時提供了程式碼示例以供參考。 可靠
RocketMQ(6)---傳送普通訊息(三種方式)
傳送普通訊息(三種方式) RocketMQ 傳送普通訊息有三種實現方式:可靠同步傳送、可靠非同步傳送、單向(Oneway)傳送。 注意 :順序訊息只支援可靠同步傳送。 GitHub地址: https://github.com/yudiandemingzi/SpringBootBlog 一、概念 1、可靠同步
Python 傳送 email 的三種方式
Python傳送email的三種方式,分別為使用登入郵件伺服器、使用smtp服務、呼叫sendmail命令來發送三種方法 本文原文自米撲部落格:Python 傳送 email 的三種方式 Python傳送email比較簡單,可以通過登入郵件服務來發送,linux下也可以使用呼叫sendmail命令來發送,
向伺服器傳送請求的三種方式 and 轉發和重定向的區別
1.三種方式: 1.html超連結 2.form表單 3.AJAX技術 例子:不通過超連結和表單訪問伺服器,還有其他方式:可以通過Ajax技術訪問伺服器 js程式碼: <script type="text/javascript"
向伺服器傳送請求的三種方式
表單提交 兩種提交方式:get、post <form action="get.php" method="get"> 暱稱:<input type="text" na
django-Ajax傳送POST請求(csrf跨站請求的三種方式),檔案的上傳
<h3>Ajax上傳檔案</h3> <p><input type="text" name="username" id="username" placeholder="username"></p> <p><input type="
kafka的訊息傳送的三種模式
1. At most once模式 消費者讀取訊息,更新訊息的offset,然後處理訊息。這種方式的風險是在更新訊息的offset之後,處理訊息結果的輸出之前消費者掛掉,消費者再啟動的時候,從新的offset開始消費訊息,導致處理訊息丟失 2. At least
Ajax之post請求跨站請求csrf_token傳送處理de三種方式
方式一: $.post({ url: '/get_result/', data: { value0: $('#v1').val(),
Android自定義View初體驗,實現圓形TextView的三種方式
自定義view對我來說一直是比較恐懼的,但是萬事開頭難,今天總結一下自己實現圓形TextView的三種方式。 首先來說一下自定義view的三種方式: 一,自繪控制元件: 自繪控制元件就是說介面展示的內容就是我們在ondraw()方法中繪製出來的,繼承Vie
Java三種方式實現傳送xml引數的WebService介面呼叫
專案開發中與第三方系統資料對接遇到的問題,僅用作記錄。 1.使用cxf呼叫(聯調時沒有收到響應資訊) JaxWsDynamicClientFactory clientFactory = JaxWsDynamicClientFactory.newInstance(); lo
程式設計實現簡訊傳送三種方式
2014年09月05日⁄ 綜合⁄ 共 3138字 ⁄ 字號 小 中 大 ⁄ 評論關閉 方案一: 利用sina webservice傳送簡訊 通過程式設計實現簡訊息的傳送是一件比較繁瑣的事情,目前,解決方法是通過計算機和手機的連線,使用手機程式語言編寫相關的簡訊程式來實硬體
調用類的三種方式
set div sharp csharp true ren light setname clas 1.T t; Teacher teach ;//T t = new T t(); teach.SetName("lizl"); teach.Say(); 2 *t T
Java多線程實現的三種方式
get() warning 三種方式 方式 緩存 運行 了解 ren ava Java多線程實現方式主要有三種:繼承Thread類、實現Runnable接口、使用ExecutorService、Callable、Future實現有返回結果的多線程。其中前兩種方式線程執行完後
java數組擴增的三種方式
實現 arrays 數組復制 當我 自身 ++ new 復制 log java數組聲明的時候必須聲明其長度,但當我們想對數組進行擴增的時候該怎麽辦呢? 下面三種方式都可以進行擴增,最後一種也最為方便。 1 /** 2 * 手動循環擴增
java寫入文件的三種方式比較
all mem exc 操作 測試文件 nts sys output println 1.FileOutputStream方式 2.BufferedOutputStream方式 3.FileWriter方式 經過多次測試,發現不緩存的FileOutputStream會比較慢
JAVA實現Base64編碼的三種方式
ack ons static nts bstr clas [] ram trace 摘要: Javabase64編碼的三種方式 有如下三種方式: 方式一:commons-codec.jar Java代碼 1. String base64String="whuang12
SpringMVC返回json數據的三種方式
class error log under itl gmv nbsp sin pri SpringMVC返回json數據的三種方式:http://blog.csdn.net/shan9liang/article/details/42181345 上述第三種方法:可能會出