分散式訊息通訊-ActiveMQ-筆記
訊息中介軟體的初步認識
- 什麼是訊息中介軟體?
-
訊息中介軟體是指利用高效可靠的訊息傳遞機制進行平臺無關的資料交流,並基於資料通訊來進行分散式系統的整合。
-
通過提供訊息傳遞和訊息排隊模型,可以在分散式架構下擴充套件程序之間的通訊。
-
-
訊息中介軟體能做什麼?
-
訊息中介軟體主要解決的就是分散式系統之間訊息傳遞的問題,它能夠遮蔽各種平臺以及協議之間的特性,實現應用程式之間的協同。
-
我們從註冊這個服務可以看到,每一個子操作都是相對獨立的,
- 同時,基於領域劃分以後,傳送啟用郵件、傳送營銷簡訊、贈送積分及紅包都屬於不同的子域。
- 所以我們可以對這些子操作進行來實現非同步化執行,類似於多執行緒並行處理的概念。
- 如何實現非同步化呢?
- 用多執行緒能實現嗎?多執行緒當然可以實現,只是,訊息的持久化、訊息的重發這些條件,多執行緒並不能滿足。
- 所以需要藉助一些開源中介軟體來解決。
- 而分散式訊息佇列就是一個非常好的解決辦法,引入分散式訊息佇列以後,架構圖就變成這樣了(下圖是非同步訊息佇列的場景)。
- 通過引入分散式佇列,就能夠大大提升程式的處理效率,並且還解決了各個模組之間的耦合問題
- 這個是分散式訊息佇列的第一個解決場景【非同步處理】
-
我們再來展開一種場景,
- 通過分散式訊息佇列來實現流量整形,
- 比如在電商平臺的秒殺場景下,流量會非常大。
- 通過訊息佇列的方式可以很好的緩解高流量的問題
- 使用者提交過來的請求,先寫入到訊息佇列。
- 訊息佇列是有長度的,如果訊息佇列長度超過指定長度,直接拋棄
- 秒殺的具體核心處理業務,
- 接收訊息佇列中訊息進行處理,這裡的訊息處理能力取決於消費端本身的吞吐量
當然,訊息中介軟體還有更多應用場景,
- 比如在弱一致性事務模型中,可以採用分散式訊息佇列的實現最大能力通知方式來實現資料的最終一致性等等
ActiveMQ 簡介
- ActiveMQ 是完全基於 JMS 規範實現的一個訊息中介軟體產品。
- 是 Apache 開源基金會研發的訊息中介軟體。
- ActiveMQ 主要應用在分散式系統架構中,幫助構建高可用、高效能、可伸縮的企業級面向訊息服務的系統
ActiveMQ 特性
- 多語言和協議編寫客戶端 語言:java/C/C++/C#/Ruby/Perl/Python/PHP
- 應 用 協 議 : openwire/stomp/REST/ws/notification/XMPP/AMQP
- 完全支援 jms1.1 和 J2ee1.4 規範
- 對 spring 的支援,ActiveMQ 可以很容易內嵌到 spring 模組中
從 JMS 規範來了解 ActiveMQ
JMS 定義
- Java 訊息服務(Java Message Service)是 java 平臺中關於面向訊息中介軟體的 API,
- 用於在兩個應用程式之間,或者分散式系統中傳送訊息,進行非同步通訊。
- JMS 是一個與具體平臺無關的 API,
- 絕大多數 MOM(Message Oriented Middleware)(面向訊息中介軟體)提供商都對 JMS 提供了支援。
- ActiveMQ 就是其中一個實現
什麼是 MOM
- MOM 是面向訊息的中介軟體,使用訊息傳送提供者來協調訊息傳送操作。
- MOM 需要提供 API 和管理工具。
- 客戶端使用 api 呼叫,把訊息傳送到由提供者管理的目的地。
- 在傳送訊息之後,客戶端會繼續執行其他工作,並且在接收方收到這個訊息確認之前,提供者一直保留該訊息。
MOM 的特點
- 訊息非同步接收,傳送者不需要等待訊息接受者響應
- 訊息可靠接收,確保訊息在中介軟體可靠儲存。
- 只有接收方收到後才刪除訊息
Java 訊息傳送服務規範最初的開發目的是為了使 Java 應用程式能夠訪問現有 MOM 系統。
- 引入該規範之後,它已被許多現有的 MOM 供應商採用並且已經憑藉自身的功能實現為非同步訊息傳送系統。
JMS 規範
- JMS 規範的目的
- 是為了使得 Java 應用程式能夠訪問現有 MOM (訊息中介軟體)系統,
- 形成一套統一的標準規範,解決不同訊息中介軟體之間的協作問題。
在建立 JMS 規範時,設計者希望能夠結合現有的訊息傳送的精髓,
- 比如說:
- 不同的訊息傳送模式或域,例如點對點訊息傳送和釋出/訂閱訊息傳送
- 提供於接收同步和非同步訊息的工具
- 對可靠訊息傳送的支援
- 常見訊息格式,例如流、文字和位元組
JMS 的體系結構
通過 JMS 規範結合 ActiveMQ 實現訊息傳送案例
- 案例總結這個案例的架構圖如下
細化 JMS 的基本功能
訊息傳遞域
- JMS 規範中定義了兩種訊息傳遞域:
- 點對點(point-topoint ) 消 息 傳 遞 域
- 發 布 / 訂 閱 消 息 傳 遞 域(publish/subscribe)
- 簡單理解就是:
- 有點類似於我們通過 qq 聊天的時候,在群裡面發訊息和給其中一個同學私聊訊息。
- 在群裡發訊息,所有群成員都能收到訊息。
- 私聊訊息只能被私聊的學員能收到訊息
- 有點類似於我們通過 qq 聊天的時候,在群裡面發訊息和給其中一個同學私聊訊息。
- 點對點訊息傳遞域
- 每個訊息只能有一個消費者
- 訊息的生產者和消費者之間沒有時間上的相關性。
- 無論消費者在生產者傳送訊息的時候是否處於執行狀態,都可以提取訊息
- 釋出訂閱訊息傳遞域
- 每個訊息可以有多個消費者
- 生產者和消費者之間有時間上的相關性。
- 訂閱一個主題的消費者只能消費自它訂閱之後釋出的訊息。
- JMS 規範允許客戶建立持久訂閱,這在一定程度上降低了時間上的相關性要求。
- 持久訂閱允許消費者消費它在未處於啟用狀態時傳送的訊息
訊息結構組成
- JMS 訊息由這幾部分組成:訊息頭、屬性、訊息體
訊息頭
- 訊息頭(Header) - 訊息頭包含訊息的識別資訊和路由資訊,訊息頭包含一些標準的屬性如:
- JMSDestination 訊息傳送的目的地,queue 或者 topic)
- JMSDeliveryMode 傳送模式。持久模式和非持久模式
- JMSPriority 訊息優先順序
- 優先順序分為 10 個級別,從 0(最低)到 9(最高). 如果不設定優先順序,預設級別是 4。
- 需要注意的是,JMS provider 並不一定保證按照優先順序的順序提交訊息
- JMSMessageID 唯一識別每個訊息的標識
屬性
- 按型別可以分為:
- 應用設定的屬性,標準屬性和訊息中介軟體定義的屬性
-
應用程式設定和新增的屬性,比如
-
Message.setStringProperty(“key”,”value”);
-
通過下面的程式碼可以獲得自定義屬性:
-
在傳送端,定義訊息屬性:
-
message.setStringProperty("Mic","Hello World");
-
-
在接收端接收資料:
-
Enumeration enumeration=message.getPropertyNames(); while(enumeration.hasMoreElements()){ String name=enumeration.nextElement().toString(); System.out.println("name:"+name+":"+messag e.getStringProperty(name)); System.out.println(); }
-
-
-
-
JMS 定義的屬性:
-
使用“JMSX”作為屬性名的字首,通過下面這段程式碼可以返回所有連線支援的 JMSX 屬性的名字
-
-
- JMS provider 特定的屬性
訊息體
- 就是我們需要傳遞的訊息內容,
- JMS API 定義了 5 中訊息體格式,
- 可以使用不同形式傳送接收資料,並可以相容現有的訊息格式,其中包括 :
TextMessage |
java.lang.String 物件,如 xml 檔案內容 |
MapMessage |
名/值對的集合,名是 String 物件,值型別可以是 Java 任何基本型別 |
BytesMessage |
位元組流 |
StreamMessage |
Java 中的輸入輸出流 |
ObjectMessage |
Java 中的可序列化物件 |
Message |
沒有訊息體,只有訊息頭和屬性。 |
- 絕大部分的時候,我們只需要基於訊息體進行構造
持久訂閱
- 持久訂閱的概念,也很容易理解,比如還是以 QQ 為例,我們把 QQ 退出了,但是下次登入的時候,仍然能收到離線的訊息。
持久訂閱就是這樣一個道理,持久訂閱有兩個特點:
- 持久訂閱者和非持久訂閱者針對的 Domain 是 Pub/Sub,而不是 P2P
- 當 Broker 傳送訊息給訂閱者時,
- 如果訂閱者處於 未啟用狀態狀態:持久訂閱者可以收到訊息,而非持久訂閱者則收不到訊息。
- 當然這種方式也有一定的影響:
- 當持久訂閱者處於 未啟用狀態時,Broker 需要為持久訂閱者儲存訊息;
- 如果持久訂閱者訂閱的訊息太多則會溢位。
消費端改動
connection=connectionFactory.createConnection();
connection.setClientID("Mic-001"); connection.start();
Session session=connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
Topic destination=session.createTopic("myTopic");
MessageConsumer consumer=session.createDurableSubscriber(des tination,"Mic-001");
TextMessage message=(TextMessage)consumer.receive();
System.out.println(message.getText());
- 修改三處地方,然後先啟動消費端去註冊一個持久訂閱。
- 持久訂閱時,客戶端向 JMS 伺服器註冊一個自己身份的 ID,
- 當這個客戶端處於離線時,JMS Provider 會為這個 ID 儲存所有傳送到主題的訊息,
- 當客戶再次連線到 JMS Provider 時,會根據自己的 ID 得到所有當自己處於離線時傳送到主題的訊息。
- 這個身份 ID,在程式碼中的體現就是 connection 的 ClientID,
- 這個其實很好理解,你要想收到朋友傳送的 qq 訊息,前提就是你得先註冊個 QQ 號,而且還要有臺能上網的裝置,電腦或手機。
- 裝置就相當於是 clientId 是唯一的;
- qq 號相當於是訂閱者的名稱,在同一臺裝置上,不能用同一個 qq 號掛 2 個客戶端。
- 連線的 clientId 必須是唯一的,訂閱者的名稱在同一個連線內必須唯一。
- 這樣才能唯一的確定連線和訂閱者。
activeMQ 控制檯的截圖
- 設定持久訂閱以後,在控制檯能看到下圖的變化
JMS 訊息的可靠性機制
- 理論上來說,我們需要保證訊息中介軟體上的訊息,只有被消費者確認過以後才會被簽收,
- 相當於我們寄一個快遞出去,收件人沒有收到快遞,就認為這個包裹還是屬於待簽收狀態,這樣才能保證包裹能夠安全達到收件人手裡。
- 訊息中介軟體也是一樣。
- 訊息的消費通常包含 3 個階段:客戶接收訊息、客戶處理訊息、訊息被確認
- 首先,來簡單瞭解 JMS 的事務性會話和非事務性會話的概念
- JMS Session 介面提供了 commit 和 rollback 方法。
- 事務提交意味著生產的所有訊息被髮送,消費的所有訊息被確認;
- 事務回滾意味著生產的所有訊息被銷燬,消費的所有訊息被恢復並重新提交,除非它們已經過期。
- 事務性的會話總是牽涉到事務處理中,commit 或 rollback 方法一旦被呼叫,一個事務就結束了,而另一個事務被開始。
- 關閉事務性會話將回滾其中的事務