activeMq-JMS消息可靠性機制-4
消息接收確認
JMS消息只有在被確認之後,才認為已經被成功地消費了。
消息的成功消費通常包含三個階段:客戶接收消息、客戶處理消息和消息被確認。
//參數1:是否啟用事務(false表示不開啟事務) 參數2:接收模式(一般設置為自動接收)
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
在事務性會話中,當一個事務被提交的時候(session.commit() ),確認自動發生。
在非事務性會話中,消息何時被確認取決於創建會話時的應答模式(acknowledgement mode)。
該參數有以下四個可選值:
Session.AUTO_ACKNOWLEDGE:
當客戶成功的從receive方法返回的時候,或者從
MessageListener.onMessage方法成功返回的時候,會話自動確認客戶收到的消息。
Session.CLIENT_ACKNOWLEDGE:
客戶通過調用消息的acknowledge方法確認消
息。需要註意的是,在這種模式中,確認是在會話層上進行,確認一個被消費的消息
將自動確認所有已被會話消費的消息。例如,如果一個消息消費者消費了10 個消
息,然後確認第5 個消息,那麽所有10 個消息都被確認。
開發者需要需要關註幾個方法:
1) message.acknowledge(),
2) ActiveMQMessageConsumer.acknowledege(),
3) ActiveMQSession.acknowledge();
其1)和3)是等效的,將當前session中所有consumer中尚未ACK的消息都一起確認,
2)只會對當前consumer中那些尚未確認的消息進行確認。
通常會在基於Group(消息分組)情況下會使用CLIENT_ACKNOWLEDGE,
我們將在一個group的消息序列接受完畢之後確認消息(組);不過當你認為消息很重要,
只有當消息被正確處理之後才能確認時,也可以使用此模式 。
Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("queue");
MessageConsumer consumer = session.createConsumer(queue);
int i = 0;
while(i < 3){
TextMessage msg = (TextMessage)consumer.receive();
if(i == 2){
// 如果不確認簽收,消息一直存在,當再次啟動客戶端會再次接收到消息
msg.acknowledge();// 確認簽收
}
i++;
}
Session.SESSION_TRANSACTED:
用session.commit()進行簽收 ,要麽全部正常確認,要麽全部redelivery。
這種嚴謹性,通常在基於GROUP(消息分組)或者其他場景下特別適合。
Session.DUPS_ACKNOWLEDGE:
該選擇只是會話遲鈍的確認消息的提交。如果JMS
provider失敗,那麽可能會導致一些重復的消息。如果是重復的消息,那麽JMS
provider 必須把消息頭的JMSRedelivered字段設置為true
消息持久性
JMS 支持以下兩種消息提交模式:
PERSISTENT:只是JMS provider持久保存消息,以保證消息不會因為JMS provider的失敗而丟失
NON_PERSISTENT:不要求JMS provider持久保存消息
// producer.setDeliveryMode(DeliveryMode.PERSISTENT);將消息傳遞特性置為持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);// 非持久化
消息優先級
可以使用消息優先級來指示JMS provider首先提交緊急的消息。優先級分
10個級別,從0(最低)到9(最高)。如果不指定優先級,默認級別是4。需要
註意的是,JMS provider並不一定保證按照優先級的順序提交消息
producer.setPriority(int i)
消息過期
可以設置消息在一定時間後過期,默認是永不過期
producer.setTimeToLive(Long aliveTime);
註意timeToLive屬性只會在DisableMessageTimestamp=false(禁用消息時間戳)的情況下才有意義。
消息的臨時目的地
可以通過會話上的createTemporaryQueue 方法和createTemporaryTopic
方法來創建臨時目的地。它們的存在時間只限於創建它們的連接所保持的時間。
只有創建該臨時目的地的連接上的消息消費者才能夠從臨時目的地中提取消息
持久訂閱
首先消息生產者必須使用PERSISTENT提交消息。客戶可以通過會話上的
createDurableSubscriber方法來創建一個持久訂閱,該方法的第一個參數必須
是一個topic。第二個參數是訂閱的名稱。
JMS provider會存儲發布到持久訂閱對應的topic上的消息。如果最初創建
持久訂閱的客戶或者任何其它客戶,使用相同的連接工廠和連接的客戶ID,相同
的主題和相同的訂閱名,再次調用會話上的createDurableSubscriber方法,那
麽該持久訂閱就會被激活。JMS provider會向客戶發送客戶處於非激活狀態時所
發布的消息。
持久訂閱在某個時刻只能有一個激活的訂閱者。持久訂閱在創建之後會一
直保留,直到應用程序調用會話上的unsubscribe方法。
本地事務
在一個JMS客戶端,可以使用本地事務來組合消息的發送和接收。JMS
Session接口提供了commit和rollback方法。事務提交意味著生產的所有消息被
發送,消費的所有消息被確認;事務回滾意味著生產的所有消息被銷毀,消費的
所有消息被恢復並重新提交,除非它們已經過期。
事務性的會話總是牽涉到事務處理中,commit或rollback方法一旦被調
用,一個事務就結束了,而另一個事務被開始。關閉事務性會話將回滾其中的事務。
需要註意的是,如果使用請求/回復機制,即發送一個消息,同時希望在同
一個事務中等待接收該消息的回復,那麽程序將被掛起,因為知道事務提交,發
送操作才會真正執行。
需要註意的還有一個,消息的生產和消費不能包含在同一個事務中。
JMS的PTP模型
JMS PTP(Point-to-Point)模型定義了客戶端如何向隊列發送消息,從隊列接收
消息,以及瀏覽隊列中的消息。
PTP模型是基於隊列的,生產者發消息到隊列,消費者從隊列接收消息,隊
列的存在使得消息的異步傳輸成為可能。和郵件系統中的郵箱一樣,隊列可以包
含各種消息,JMS Provider 提供工具管理隊列的創建、刪除。
PTP的一些特點:
1:如果在Session 關閉時,有一些消息已經被收到,但還沒有被簽收
(acknowledged),那麽,當消費者下次連接到相同的隊列時,這些消息還會被再
次接收
2:如果用戶在receive 方法中設定了消息選擇條件,那麽不符合條件的消息會留在
隊列中,不會被接收到
3:隊列可以長久地保存消息直到消費者收到消息。消費者不需要因為擔心消息會丟
失而時刻和隊列保持激活的連接狀態,充分體現了異步傳輸模式的優勢
JMS的Pub/Sub模型
JMS Pub/Sub 模型定義了如何向一個內容節點發布和訂閱消息,這些節點被稱作topic
主題可以被認為是消息的傳輸中介,發布者(publisher)發布消息到主題,訂閱者
(subscribe) 從主題訂閱消息。主題使得消息訂閱者和消息發布者保持互相獨立,不需要
接觸即可保證消息的傳送。
Pub/Sub的一些特點:
1:消息訂閱分為非持久訂閱和持久訂閱
非持久訂閱只有當客戶端處於激活狀態,也就是和JMS Provider保持連接狀態才能
收到發送到某個主題的消息,而當客戶端處於離線狀態,這個時間段發到主題的消息將會
丟失,永遠不會收到。
持久訂閱時,客戶端向JMS 註冊一個識別自己身份的ID,當這個客戶端處於離線
時,JMS Provider會為這個ID 保存所有發送到主題的消息,當客戶再次連接到JMS
Provider時,會根據自己的ID 得到所有當自己處於離線時發送到主題的消息。
2:如果用戶在receive 方法中設定了消息選擇條件,那麽不符合條件的消息不會被接收
3:非持久訂閱狀態下,不能恢復或重新派送一個未簽收的消息。只有持久訂閱才能恢復或重
新派送一個未簽收的消息。
4:當所有的消息必須被接收,則用持久訂閱。當丟失消息能夠被容忍,則用非持久訂閱
非持久的Topic消息示例
/*對於非持久的Topic消息的發送 基本跟前面發送隊列信息是一樣的,只是把創建Destination的地方,由創 建隊列替換成創建Topic*/ Destination destination = session.createTopic("MyTopic"); /*對於非持久的Topic消息的接收 1:必須要接收方在線,然後客戶端再發送信息,接收方才能接收到消息 2:同樣把創建Destination的地方,由創建隊列替換成創建Topic*/ Destination destination = session.createTopic("MyTopic"); /*3:由於不知道客戶端發送多少信息,因此改成while循環的方式了*/ Message message = consumer.receive(); while(message!=null) { TextMessage txtMsg = (TextMessage)message; System.out.println("收到消 息:" + txtMsg.getText()); message = consumer.receive(1000L); }View Code
持久的Topic消息示例
消息的發送
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.106:61616"); Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Topic destination = session.createTopic("MyTopic"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); connection.start(); for(int i=0; i<2; i++) { TextMessage message = session.createTextMessage("messagedd--"+i); Thread.sleep(1000); //通過消息生產者發出消息 producer.send(message); } session.commit(); session.close(); connection.close(); 1:要用持久化訂閱,發送消息者要用 DeliveryMode.PERSISTENT 模式發現,在連接之前設定 2:一定要設置完成後,再start 這個 connectionView Code
消息的接收
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://192.168.1.106:61616"); Connection connection = cf.createConnection(); connection.setClientID("cc1"); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Topic destination = session.createTopic("MyTopic"); TopicSubscriber ts = session.createDurableSubscriber(destination, "T1"); connection.start(); Message message = ts.receive(); while(message!=null) { TextMessage txtMsg = (TextMessage)message; session.commit(); System.out.println("收到消 息:" + txtMsg.getText()); message = ts.receive(1000L); } session.close(); connection.close(); 1:需要在連接上設置消費者id,用來識別消費者 2:需要創建TopicSubscriber來訂閱 3:要設置好了過後再start 這個 connection 4:一定要先運行一次,等於向消息服務中間件註冊這個消費者,然後再運行客戶端發送信息,這個時候, 無論消費者是否在線,都會接收到,不在線的話,下次連接的時候,會把沒有收過的消息都接收下來。View Code
參考:
Producer特性詳解:http://shift-alt-ctrl.iteye.com/blog/2034440
activeMq-JMS消息可靠性機制-4