分散式訊息通訊ActiveMQ
訊息中介軟體
訊息中介軟體是指利用高效可靠的訊息傳遞機制進行平臺無關的資料交流,並且基於資料通訊來進行分散式系統的整合。通過提供訊息傳遞和訊息排隊模型,可以在分散式架構下擴充套件程序之間的通訊。
訊息中介軟體能做什麼
訊息中介軟體主要解決分散式系統之間訊息的傳遞問題 ,能夠遮蔽各種平臺以及協議之間的特性,實現應用之間的協同。
示例:
電商平臺中的註冊功能,使用者註冊不單是向資料庫insert,可能還需要贈送積分,傳送郵件,傳送簡訊等系列操作。
假如:每個操作都耗時1s,那麼註冊過程就需要耗時4s才能響應給使用者。從註冊這個服務可以看出,每個子操作都是獨立的,同時,基於領域劃分以後,它們都屬於不同的子域。所以我們可以對這些子操作實現非同步化操作。類似多執行緒並行處理。
如何實現非同步化?用多執行緒能實現嗎?多執行緒當然可以實現,只是,訊息的持久化、訊息的重發這些條件,多執行緒 並不能滿足.所以需要藉助一些開源的訊息中介軟體來解決。 而分散式訊息佇列就是一個很好的解決辦法。通過引入分散式佇列,大大提升程式的處理效率,並且還解決了各個模組之間的耦合問題。
分散式訊息佇列解決的場景:
引入訊息中介軟體後(非同步處理),電商平臺中的註冊架構圖變為
電商中的秒殺:
使用者提交過來的請求,先寫入訊息佇列。訊息佇列是有長度的,如果訊息佇列超過指定長度,直接拋棄。
秒殺的 具體核心處理業務,接收訊息佇列中訊息進行處理。這裡的訊息處理能力取決於消費端本身的吞吐量。
解耦、非同步化、流量整形、資料的最終一致性(最大化的重試完成資料一致性)
ActiveMQ 簡介
ActiveMQ
ActiveMQ 是完全基於JMS 規範實現的一個訊息中介軟體產品,是Apache 開源基金會研發的訊息中介軟體。ActiveMQ 主要應用在分散式系統架構中,幫助構建高可用、高效能、可伸縮的企業級面向服務的系統。
ActiveMQ 特性
-
多語言和協議編寫客戶端
-
語言:Java、C、C++、C#、Ruby、Perl、Python、PHP
-
協議:openwire、stomp、REST、ws、notification、xmpp、AMQP
-
-
完全支援JMS1.1和J2EE1.4規範
-
對Spring的支援,ActiveMQ可以很容易的嵌入到spring模組中
ActiveMQ 下載安裝啟動
下載地址
http://activemq.apache.org/activemq-5158-release.html
解壓
tar -zxvf apache-activemq-5.15.8-bin.tar.gz
啟動服務
-
cd apache-activemq-5.15.8/bin
sh activemq start
-
啟動並帶指定日誌檔案 sh activemq start > /tmp/activemqlog
關閉服務
-
sh activemq stop
監控地址
http://192.168.15.134:8161/admin/ admin admin
ActiveMQ 的埠61616
-
預設為61616
-
檢查是否成功啟動ActiveMQ
-
netstat -an|grep 61616
-
JMS 基本概念和模型
JMS的定義
JMS(Java Message Service) :面向訊息中介軟體的API
MOM(Message Oriented Middleware):面向訊息中介軟體
Java 訊息服務是Java平臺中關於面向訊息中介軟體的API,用於兩個程式 之間,或者分散式系統中傳送訊息,進行非同步通訊。
JMS 是一個與具體平臺無關的API,絕大多數MOM 提供商都對JMS提供了支援。ActiveMQ就是其中的一個實現。
MOM
MOM 是面向訊息的中介軟體,使用訊息傳送提供者來協調訊息傳送操作。 MOM 需要提供API和管理工具。客戶端使用API呼叫,把訊息傳送到由提供者管理的目的地。在傳送訊息後,客戶端會繼續執行其他工作,並且在接收方收到這個訊息確認之前,提供者一直保留該訊息。
MOM 的特點
-
訊息非同步接收,傳送者不需要等待訊息接受者響應
-
訊息可靠接收,確保訊息中介軟體可靠儲存。只有接收方收到訊息後才刪除訊息
開源JMS提供商
JbossMQ(jboss4)、Jboss messaging(jboss5)、joram、ubermq、mantamq、openjms ...
JMS 規範
JMS 規範的目的是為了使得Java 應用程式能夠訪問現有MOM(訊息中介軟體)系統,形成一套統一的標準規範,解決不同訊息中介軟體之間的協作問題。
-
不同訊息的傳遞域,點對點訊息傳送和釋出/訂閱訊息傳送
-
提供接收同步和非同步訊息的工具
-
對可靠訊息傳送的支援
-
常見訊息格式,例如流、文字和位元組
JMS 的體系結構
JMS 的基本功能
JMS 的基本功能是用於和麵向訊息中介軟體相互通訊的應用程式的介面
訊息傳遞域
-
p2p(point-2-point) 點對點訊息傳遞域
-
每個訊息只能有一個消費者(離線儲存)
-
類似QQ聊天的私聊
-
-
生產者和消費者之間沒有時間上的相關性,無論消費者在生產者傳送訊息的時候是否處於執行狀態,都可以提取訊息
-
如果session關閉時,有一些訊息已經被收到,但是沒有被簽收,消費者下一次連線到相同對列時,這些訊息仍然會被接收
-
如果使用者在receive 方法中設定了訊息的選擇條件(訊息過濾)
-
如果是持久化訊息,訊息會被持久化儲存,直到訊息被簽收
-
-
釋出訂閱(publish/subscribe)訊息傳遞域
-
每個訊息有多個消費者
-
類似QQ群聊
-
-
生產者和消費者有時間上的相關性
-
訂閱一個主題的消費者只能消費自它訂閱之後釋出的訊息。
-
JMS 規範允許客戶建立持久訂閱,一定程度上降低了時間的相關性要求
-
持久訂閱允許消費者消費它在未處於啟用狀態時傳送的訊息
-
-
持久化訂閱和非持久化訂閱
-
在非持久化訂閱的前提下,不能恢復或者重新指派一個未簽收的訊息;
-
如果所有訊息必須要簽收,則使用持久訂閱
-
訊息的組成
訊息頭(Header)
訊息頭包含訊息的識別資訊和路由資訊
訊息頭包含一些標準的屬性:
-
JMSDestination
-
訊息傳送的目的地,queue或者topic
-
-
JMSDeliveryMode
-
傳送模式,持久化模式和非持久模式
-
-
JMSPrority
-
訊息優先順序(優先順序分為10個級別,從0最低-9最高)
-
如果不設定優先順序,預設級別4,需要注意的是,JMS Provider 並不一定保證按照優先順序的順序提交
-
-
JMSMessageID
-
唯一識別每個訊息的標識
-
訊息體
就是我們需要傳遞的訊息的內容
JMS API定義了5種訊息體格式:
-
TextMessage
-
java.lang.String 物件,如xml檔案內容
-
-
MapMessage
-
名/值對的集合,名是String 物件,值可以是Java 任何基本型別
-
-
BytesMessage
-
位元組流
-
-
StreamMessage
-
Java 中的輸入輸出流
-
-
ObjectMessage
-
Java 中的可序列化物件
-
-
Message
-
沒有訊息體,只有訊息頭和屬性
-
訊息的屬性
按型別分為:
-
應用設定的屬性
-
Message.setStringProperty(key,value);
-
-
標準屬性
-
使用“JMSX” 作為屬性名的字首
-
-
訊息中介軟體定義的屬性
-
JMS Provider 特定的屬性
-
JMS 的可靠機制
訊息的確認方式
訊息的處理階段:
-
客戶端接收訊息
-
客戶端處理訊息
-
訊息被確認
會話存在兩種機制:
-
事務性會話
-
createSession(boolean transacted, int acknowledgeMode)
-
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
-
-
session.commit() //訊息被確認 事務提交意味著生產的所有訊息被髮送,消費的所有訊息被確認
-
session.rollback(); //重新處理 訊息沒有被提交,沒有被處理,消費端的所有訊息被恢復,並且重新被提交, 表示一個事務結束, 另一個事務會開始。事務回滾意味著生產的所有訊息被銷燬,消費的所有訊息 被恢復並重新提交,除非它們已經過期
-
通過session.commit() //完成事務的簽收
-
-
非事務性會話
-
transacted 設定為FALSE
-
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
-
客戶端簽收模型
-
Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
-
那麼需要手動簽收
-
textMessage.acknowledge();
-
客戶端延遲確認,訊息可能重複消費
-
Session session = connection.createSession(Boolean.FALSE, DUPS_OK_ACKNOWLEDGE);
-
-
事務性的自動確認
非事務性的自動確認和手動確認
訊息的持久化儲存
持久化(儲存在資料庫或磁碟)
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
對於持久訊息,訊息提供者會使用儲存-轉發機制,先將訊息儲存到穩定的介質中,等訊息傳送成功後再刪除。如果JMS Provider 宕機,那麼這些未送達的訊息則不會丟失,JMS Provider 恢復正常後,會重新讀取這些訊息,並傳送給對應的消費者。
非持久化(儲存在記憶體中)
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
對於非持久化訊息,JMS Provider 不會將它存到檔案、資料庫等穩定介質中。也就是說非持久訊息,儲存在記憶體中,如果JMS Provider 宕機,那麼非持久化訊息會丟失。
持久訂閱
-
持久訂閱者和非持久訂閱者針對的Domain 是Pub/Sub,而不是P2P
-
當Broker 傳送訊息給訂閱者時,如果訂閱者處於未啟用狀態,持久訂閱者可以收到訊息,而非持久訂閱者則收不到訊息。
-
當持久訂閱者處於未啟用狀態時,Broker 需要為持久訂閱者儲存訊息,如果持久訂閱者訂閱的訊息太多則會溢位。
-
持久訂閱時,客戶端向JMS 伺服器註冊一個自己身份的ID, 當這個客戶端處於離線時,JMS Provider 會為這個ID 儲存所有傳送到主題的訊息,當客戶再次連線到 JMS Provider時,會根據自己的ID得到所有當自己處於離線時傳送到主題的訊息。
-
持久訂閱的方式(消費端)
-
connection.setClientID("test");
-
Topic destination=session.createTopic("myTopic");
-
MessageConsumer consumer=session.createDurableSubscriber(destination,"test");
案例架構圖
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.8</version> </dependency>
生產端
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class JMSQueueProducer { public static void main(String args[]) { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.15.134:61616"); Connection connection = null; try { connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //建立目的地 Destination destination = session.createQueue("myQueue"); //建立傳送者 MessageProducer producer = session.createProducer(destination); //持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); TextMessage textMessage = session.createTextMessage("Hello,World"); producer.send(textMessage); session.commit(); session.close(); } catch (JMSException e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
消費端
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import javax.xml.soap.Text; public class JMSQueueConsumer { public static void main(String args[]) { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.15.134:61616"); Connection connection = null; try { connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); //建立目的地 Destination destination = session.createQueue("myQueue"); //建立接收者 MessageConsumer consumer = session.createConsumer(destination); //接收訊息 阻塞方式監聽訊息 TextMessage textMessage =(TextMessage) consumer.receive(); System.out.println(textMessage.getText()); session.commit(); //表示訊息被自動確認 session.close(); } catch (JMSException e) { e.printStackTrace(); }finally { if(connection!=null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }