ActiveMQ的queue以及topic兩種訊息處理機制分析
Q來作為jms匯流排,並且給大家介紹了activeMQ的叢集和高可用部署方案,本期給大家再介紹下,如何根據自己的專案需求,更好地使用activeMQ的兩種訊息處理模式。
1 queue與topic的技術特點對比
Topic |
Queue |
|
概要 |
Publish Subscribe messaging 釋出訂閱訊息 |
Point-to-Point 點對點 |
有無狀態 |
topic資料預設不落地,是無狀態的。 |
Queue資料預設會在mq伺服器上以檔案形式儲存,比如Active MQ一般儲存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB儲存。 |
完整性保障 |
並不保證publisher釋出的每條資料,Subscriber都能接受到。 |
Queue保證每條資料都能被receiver接收。 |
訊息是否會丟失 |
一般來說publisher釋出訊息到某一個topic時,只有正在監聽該topic地址的sub能夠接收到訊息;如果沒有sub在監聽,該topic就丟失了。 |
Sender傳送訊息到目標Queue,receiver可以非同步接收這個Queue上的訊息。Queue上的訊息如果暫時沒有receiver來取,也不會丟失。 |
訊息釋出接收策略 |
一對多的訊息釋出接收策略,監聽同一個topic地址的多個sub都能收到publisher傳送的訊息。Sub接收完通知mq伺服器 |
一對一的訊息釋出接收策略,一個sender傳送的訊息,只能有一個receiver接收。receiver接收完後,通知mq伺服器已接收,mq伺服器對queue裡的訊息採取刪除或其他操作。 |
Topic和queue的最大區別在於topic是以廣播的形式,通知所有線上監聽的客戶端有新的訊息,沒有監聽的客戶端將收不到訊息;而queue則是以點對點的形式通知多個處於監聽狀態的客戶端中的一個。
2 topic和queue方式的訊息處理效率比較
通過增加監聽客戶端的併發數來驗證,topic的訊息推送,是否會因為監聽客戶端的併發上升而出現明顯的下降,測試環境的伺服器為ci環境的ActiveMQ,客戶端為我的本機。
從實測的結果來看,topic方式傳送的訊息,傳送和接收的效率,在一個訂閱者和100個訂閱者的前提下沒有明顯差異,但在500個訂閱者(執行緒)併發的前提下,效率差異很明顯(由於500執行緒併發的情況下,我本機的cpu佔用率已高達70-90%,所以無法確認是我本機測試造成的效能瓶頸還是topic訊息傳送方式存在效能瓶頸,造成效率下降如此明顯)。
Topic方式傳送的訊息與queue方式傳送的訊息,傳送和接收的效率,在一個訂閱者和100個訂閱者的前提下沒有明顯差異,但在500個訂閱者併發的前提下,topic方式的效率明顯低於queue。
Queue方式傳送的訊息,在一個訂閱者、100個訂閱者和500個訂閱者的前提下,傳送和接收的效率沒有明顯變化。
Topic實測資料:
傳送者傳送的訊息總數 |
所有訂閱者接收到訊息的總數 |
訊息傳送和接收平均耗時 |
|
單訂閱者 |
100 |
100 |
101ms |
100訂閱者 |
100 |
10000 |
103ms |
500訂閱者 |
100 |
50000 |
14162ms |
Queue實測資料:
傳送者傳送的訊息總數 |
所有訂閱者接收到訊息的總數 |
訊息傳送和接收平均耗時 |
|
單訂閱者 |
100 |
100 |
96ms |
100訂閱者 |
100 |
100 |
96ms |
500訂閱者 |
100 |
100 |
100ms |
3 topic方式的訊息處理示例
3.1 通過客戶端程式碼呼叫來發送一個topic的訊息:
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
publicclass SendTopic {
privatestaticfinalint SEND_NUMBER = 5;
publicstaticvoid sendMessage(Session session, MessageProducer producer)
throws Exception {
for ( int i = 1; i <= SEND_NUMBER; i++) {
TextMessage message = session
.createTextMessage("ActiveMq傳送的訊息" + i);
//傳送訊息到目的地方
System. out.println("傳送訊息:" + "ActiveMq 傳送的訊息" + i);
producer.send(message);
}
}
publicstaticvoid main(String[] args) {
// ConnectionFactory:連線工廠,JMS用它建立連線
ConnectionFactory connectionFactory;
// Connection:JMS客戶端到JMS Provider的連線
Connection connection = null;
// Session:一個傳送或接收訊息的執行緒
Session session;
// Destination:訊息的目的地;訊息傳送給誰.
Destination destination;
// MessageProducer:訊息傳送者
MessageProducer producer;
// TextMessage message;
//構造ConnectionFactory例項物件,此處採用ActiveMq的實現jar
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection. DEFAULT_USER,
ActiveMQConnection. DEFAULT_PASSWORD,
"tcp://10.20.8.198:61616");
try {
//構造從工廠得到連線物件
connection = connectionFactory.createConnection();
//啟動
connection.start();
//獲取操作連線
session = connection.createSession( true, Session. AUTO_ACKNOWLEDGE);
//獲取session注意引數值FirstTopic是一個伺服器的topic(與queue訊息的傳送相比,這裡是唯一的不同)
destination = session.createTopic("FirstTopic");
//得到訊息生成者【傳送者】
producer = session.createProducer(destination);
//設定不持久化,此處學習,實際根據專案決定
producer.setDeliveryMode(DeliveryMode. PERSISTENT);
//構造訊息,此處寫死,專案就是引數,或者方法獲取
sendMessage(session, producer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if ( null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}
3.2 啟動多個客戶端監聽來接收topic的訊息:
publicclass ReceiveTopic implements Runnable {
private StringthreadName;
ReceiveTopic(String threadName) {
this.threadName = threadName;
}
publicvoid run() {
// ConnectionFactory:連線工廠,JMS用它建立連線
ConnectionFactory connectionFactory;
// Connection:JMS客戶端到JMS Provider的連線
Connection connection = null;
// Session:一個傳送或接收訊息的執行緒
Session session;
// Destination:訊息的目的地;訊息傳送給誰.
Destination destination;
//消費者,訊息接收者
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection. DEFAULT_USER,
ActiveMQConnection. DEFAULT_PASSWORD,"tcp://10.20.8.198:61616");
try {
//構造從工廠得到連線物件
connection = connectionFactory.createConnection();
//啟動
connection.start();
//獲取操作連線,預設自動向伺服器傳送接收成功的響應
session = connection.createSession( false, Session. AUTO_ACKNOWLEDGE);
//獲取session注意引數值FirstTopic是一個伺服器的topic
destination = session.createTopic("FirstTopic");
consumer = session.createConsumer(destination);
while ( true) {
//設定接收者接收訊息的時間,為了便於測試,這裡設定為100s
TextMessage message = (TextMessage) consumer
.receive(100 * 1000);
if ( null != message) {
System. out.println("執行緒"+threadName+"收到訊息:" + message.getText());
} else {
continue;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if ( null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
publicstaticvoid main(String[] args) {
//這裡啟動3個執行緒來監聽FirstTopic的訊息,與queue的方式不一樣三個執行緒都能收到同樣的訊息
ReceiveTopic receive1= new ReceiveTopic("thread1");
ReceiveTopic receive2= new ReceiveTopic("thread2");
ReceiveTopic receive3= new ReceiveTopic("thread3");
Thread thread1= new Thread(receive1);
Thread thread2= new Thread(receive2);
Thread thread3= new Thread(receive3);
thread1.start();
thread2.start();
thread3.start();
}
}
4 queue方式的訊息處理示例
參考上一期文章:開源jms服務ActiveMQ的負載均衡+高可用部署方案探索。
相關推薦
ActiveMQ的queue以及topic兩種訊息處理機制分析
Q來作為jms匯流排,並且給大家介紹了activeMQ的叢集和高可用部署方案,本期給大家再介紹下,如何根據自己的專案需求,更好地使用activeMQ的兩種訊息處理模式。 1 queue與topic的技術特點對比 Topic Queue 概要 Publish Subscribe mes
Android應用程式鍵盤(Keyboard)訊息處理機制分析
在Android系統中,鍵盤按鍵事件是由WindowManagerService服務來管理的,然後再以訊息的形式來分發給應用程式處理,不過和普通訊息不一樣,它是由硬體中斷觸發的;在上一篇文章《Android應用程式訊息處理機制(Looper、Handler)分
ActiveMQ兩種訊息模式以及為什麼使用MQ
1.為什麼使用MQ a.高併發 在高併發分散式環境下,由於來不及同步處理,請求往往發生堵塞;通過訊息佇列,可以非同步處理請求,緩解系統的壓力; b.鬆耦合性 一個應用傳送訊息到MQ之後並不關係訊息如何或者什麼時候被傳遞,同樣的訊息的接收者也不關係訊息從哪裡來的。在不同的環
SpringBoot 整合 RabbitMQ(包含三種訊息確認機制以及消費端限流)
目錄 說明 生產端 消費端 說明 本文 SpringBoot 與 RabbitMQ 進行整合的時候,包含了三種訊息的確認模式,如果查詢詳細的確認模式設定,請閱讀:RabbitMQ的三種訊息確認
Android學習筆記(36):Android的兩種事件處理方式
post gravity cal log 基於 處理方法 hang mil 重寫 Android提供了兩種事件處理的方式:基於回調的事件處理 和 基於監聽的事件處理。 我們來說的easy理解一點: (1)基於回調的事件處理就是繼承GUI組件,並重寫該組件的
簡單的實現圖片預覽, 通過原生ajax以及 jQuery兩種方法實現圖片預覽,有更好的辦法可以留言喔................
XML HP OS image end php代碼 append sda ext 1.原生寫ajax實現圖片預覽: 結構: <input type="file"> <img src="" > JavaScri
[js]js中6種錯誤處理機制
ram java throw mage 代碼執行 class 錯誤處理機制 code catch js中6種錯誤 http://javascript.ruanyifeng.com/grammar/error.html#toc5 https://www.jianshu.co
android 非同步訊息處理機制 — AHandler
1. 引入 ALooper、AHandler、AMessage 在 android multimedia stagefright 的框架程式碼中,通篇都是這幾個類的身影,所以熟悉 android 多媒體框架的第一步必須理解這幾個類的含義。 這幾個類是為了實現非同步訊息機制而設計的
Android非同步訊息處理機制詳解及原始碼分析
PS一句:最終還是選擇CSDN來整理髮表這幾年的知識點,該文章平行遷移到CSDN。因為CSDN也支援MarkDown語法了,牛逼啊! 【工匠若水 http://blog.csdn.net/yanbober 轉載煩請註明出處,尊重分享成果】 最近相對來說比較閒,加上養病,所
Redis的兩種訊息模式
Redis的兩種訊息模式 佇列模式 釋出訂閱模式 佇列模式 佇列模式下每個消費者可以同時從多個伺服器讀取訊息,但是每個訊息只能被一個消費者讀取。 在佇列模式下其實每次插入的資料都是載入在最前面的,而先插入的資料在後面,列表中始終維持了一個佇列故稱之為佇
ActiveMQ的兩種訊息形式。
一、訊息的傳遞型別 點對點:即一個生產者和一個消費者一一對應 PTP的過程好比是兩個人打電話,這兩個人獨享這一條通訊鏈路。一方傳送訊息,另外一方接收 訊息 。在實際應用中因為有多個使用者對使用 PTP 的鏈路,它的通訊場景如下圖所示:
【演算法模板】二叉樹的三種遍歷方式,以及根據兩種遍歷方式建樹
前言:今年九月份的PAT考試就栽在這“兩種遍歷建樹”上了,剛好沒看,剛好考到。作為自己的遺憾,今日碼完,貼在這裡留個紀念,希望能給自己警醒與警鐘。 簡要概括: 1、二叉樹的三種遍歷方式分別是 先序(先根)遍歷PreOrder,中序(中根)遍歷InOrder,後序(後根
Android之訊息處理機制(二)Handler的本質-Message和Looper到底是什麼?
目錄 Android之訊息處理機制(二) 以下皆為乾貨,比較幹,需要讀者細細理解。 前面(一)已經解釋了Handler的基本機制了,下面來概括一下本質。 一、MessageQueue MessageQueue其實就
C# 訊息處理機制及自定義過濾方式
一、訊息概述 Windows 下應用程式的執行是通過訊息驅動的。訊息是整個應用程式的工作引擎,我們需要理解掌握我們使用的程式語言是如何封裝訊息的原理。1. 什麼是訊息(Message) 訊息就是通知和命令。在.NET框架類庫中的System.Windows.Forms名稱
ActiveMQ的幾種訊息持久化機制
為了避免意外宕機以後丟失資訊,需要做到重啟後可以恢復訊息佇列,訊息系統一般都會採用持久化機制。 ActiveMQ的訊息持久化機制有JDBC,AMQ,KahaDB和LevelDB四種方式,無論使用哪種持久化方式,訊息的儲存邏輯都是一致的。 就是在傳送者將訊息傳送出
Android非同步訊息處理機制:Looper、Handler、Message
1 簡介 Handler,Looper,Message這三者都與Android非同步訊息處理執行緒相關, Looper:負責建立一個MessageQueue,然後進入一個無限迴圈體不斷從該MessageQueue中讀取訊息; Handler:訊息建立者,一個或者多個
ActiveMQ(二)——訊息處理機制
一、前言 上文中,小編提到安裝ActiveMQ,但是對於ActiveMQ中訊息是用什麼樣的形式儲存的?下面小編就向大家介紹一下。 二、訊息型別 對於訊息的傳遞有兩種型別: 1.點對點的,即一個生產者和一個消費者一一對應; 2.釋出/訂閱模式,即一個生產者產生訊
淺談Android的訊息處理機制--Handler
1.為什麼有Handler? 在UI執行緒中不能進行耗時操作,例如資料讀寫、網路請求、圖片載入等,所以這些操作被放在子執行緒裡,Handler便是子執行緒和UI執行緒之間通訊的橋樑之一。 2.幹什麼用的? 進行非同步訊息處理,即上述內容。 3.Handler類裡面有什麼是必須知道
android的訊息處理機制——Looper,Handler,Message (原理圖、原始碼)
轉自:http://my.oschina.net/u/1391648/blog/282892 在開始討論android的訊息處理機制前,先來談談一些基本相關的術語。 通訊的同步(Synchronous):指向客戶端傳送請求後,必須要在服務端有迴應後客戶端才繼續傳送
Android Handler 非同步訊息處理機制的妙用 建立強大的圖片載入類
最近建立了一個群,方便大家交流,群號:55032675上一篇部落格介紹了Android非同步訊息處理機制,如果你還不瞭解,可以看:Android 非同步訊息處理機制 讓你深入理解 Looper、Handler、Message三者關係 。那篇部落格的最後,提出可以把非同步訊息處理