JMS訊息傳送機制
JMS不僅需要高效的儲存訊息,還需要確保訊息能夠無誤的被傳輸.這就需要JMS提供一種"保證傳送"機制和事務.如果拋開JMS的規範,那麼它的技術實現本身就是網路IO + 檔案儲存;其中網路IO的困境就在"資料傳輸安全保證""網路失效"等方面,"檔案儲存"即要求資料需要被高效的存取.其中"檔案儲存"層面已經有較多的第三方儲存工具和解決方案,比如檔案型別資料庫/記憶體資料庫等.如果訊息系統需要分散式且整體架構良好,那麼上述問題,就更加棘手.
一.保證傳送
保證傳送,用來描述JMS Client和JMS Provider之間的訊息互動是"可靠的",這種"可靠"並非絕對意義上的百分百資料不丟失或者資料不重複,所謂"保證傳送",就是在良好的API設計和相對良好的執行環境中,訊息最終會被接收和消費;但是如果API設計不良或者執行環境極其糟糕,比如事務使用不正確,或者確認機制操作不當,或者網路環境糟糕以至於無法連續的通訊等,都將會導致訊息的傳送和接收,出現"意料之外"的事情:佇列訊息被多次傳送,訊息被重複接收等.
一條訊息的"來龍去脈"可以簡單的通過上圖表示
1: 訊息生產者中send會阻塞操作(網路IO阻塞),如果網路異常,將導致訊息傳送失敗,此時生產者需要在編碼設計上做好異常檢測或者重發機制;其中網路異常主要體現在"網路中斷"或者JMS Provider失效.
2: JMS Provider接收到訊息之後,將會儲存在記憶體中(或者DB中),此後並立即向此Producer傳送"ACK"訊號,如果JMS Provider在儲存上遇到無法解決的異常(比如磁碟故障,嵌入式DB異常),那麼"ACK"響應將會攜帶異常資訊(message);如果"ACK"傳送中網路IO異常導致無法傳送,將會導致"此訊息"被移除,此時send方法也將從阻塞中返回並丟擲異常.因為網路IO異常,通常TCP連結的雙端都能及時的感知到.
ACK傳送正常,指的是TCP連線上"ACK"資料已經通過網路傳送給了Client,這個過程中沒有發生異常,此後也意味著當前JMS訊息被持久儲存,producer的send方法也正確返回.此後如果Client端在處理ACK資訊過程中出現問題(幾乎不可能),那麼JMS Provider也將認為此訊息已經被正確傳送.需要提醒的是,在Client端:訊息的send和ACK在IO流上不是一個連續的過程,在不同的確認時機中,ACK資訊是可以是同步的(send方法阻塞,知道收到ACK,比如持久化型別的訊息),有些時非同步獲取的(比如事務中的訊息,或者非持久化訊息),不過JMS中的ACK MODE都producer並沒有任何意義..
3: 訊息消費者receive訊息,這個操作在Queue中是consumer主動"拉取"(監聽),在Topic中是Provider推送;當消費者接收到訊息之後,可以立即確認(AUTO),然後再去執行與訊息有關的業務;也可以"延遲"確認等.
4: 當訊息被消費者接收到之後,JMS Provider將訊息新增到此消費者的"待確認訊息"列表,如果JMS Provider在此消費者的IO通道中阻塞返回,但卻沒有收到ACK,那麼將導致此訊息重發.如果ACK正常接收到,那麼JMS Provider將會把此訊息從持久儲存裝置刪除.
如下為JMS中ACK MODE:
1. AUTO_ACKNOWLEDGE:自動確認,當Consumer客戶端(通常是指Consumer所屬的Session)在收到訊息之後,即準備確認訊息,通常在receive()方法返回之前,或者在messageListener.onMessage()正常執行之後,向Provider傳送ACK指令..所謂AUTO_ACK,就是確認的時機有Session"擇機"實施;開發者無法干擾.
2. DUPS_OK_ACKNOWLEDGE: 允許延遲批量確認,重點在"批量",AUTO_ACK通常是確認一條訊息(事實上在不同的JMS實現中,它仍然可以像DUPS一樣確認多條訊息)..當消費者接收到訊息之後,並不是立即確認,而是"滯留"一段時間之後才會確認(批量確認);這種方式直觀上是提升了client端消費資料的速度(優先接觸訊息),但這種模式下,需要消費者能夠接受"訊息重發"的情況,比如當Consumer客戶端失效重啟,那些尚未確認但已經被"touch"(消費)過的訊息有可能會重複接受到;對於JMS Provider而言,如果它"等待"一定時間後,仍未收到"確認",將會重發訊息.,這種模式,對效能的提升是不明確的,因為較晚的確認訊息,意味著JMS Provider需要更長時間的保留"待確認"訊息列表..究竟多少條訊息作為"批量"的邊界,有具體的JMS實現者決定.
3. CLIENT_ACKNOWLEDGE: 客戶端確認,需要要求訊息消費者明確的呼叫message.acknowledge()方法來確認此訊息的接收成功,無論何種原因(未捕獲異常),如果此方法未被呼叫,那麼此訊息將會被重發.這種模式下,允許消費者自己選擇時機確認訊息,通常使用在訊息編組(group)的情況下:將一系列訊息作為一個group接收,當group中最後一個訊息確認成功後,那麼JMS Provider就認為此組訊息全部接收成功(只需確認組的最後一條訊息即可,那麼JMS Provider會認為此前的其他訊息也接收正常).
二.事務
事務用來描述"一系列訊息要麼全部確認成功,要麼全不確認"的特徵,它和資料庫事務最終需要達成的效果是一樣的.JMS Provider會快取每個生產者當前事務下的所有訊息,直到commit或者rollback.commit操作將會導致事務中所有的訊息被持久儲存;rollback意味著JMS Provider將會清除此事務下所有的訊息記錄...在事務未提交之前,訊息是不會被持久儲存的,也不會被消費者消費.
每次事務提交之後,在client端會生成一個事務ID(一個session中不會出現重複的ID,clientID:sessionID:txID);事務的提交或者回滾都會攜帶ID.對於producer而言,在事務型別的session中,傳送訊息(一個或者多個)之後,需要執行session.commit(),否則訊息將不會被儲存.對於consumer而言,訊息接收到之後,需要手動的使用commit,否則JMS Provider會認為訊息沒有被接收,導致重發,因此你可以認為commit就是一個訊息確認操作.
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
try{
Message message = session.createTextMessage(text);
String cid = "ID:" + System.currentTimeMillis();
message.setJMSCorrelationID(cid);
producer.send(message);
session.commit();
}catch(Exception e){
//
}
public class QueueMessageListener implements MessageListener{
private Session session;
public QueueMessageListener(Session session){
this.session = session;
}
public void onMessage(Message message) {
if(message == null){
return;
}
try{
//message handler
//
session.commit();
}catch(Exception e){
e.printStackTrace();
}
}
}
三.訊息編組
一序列訊息,如果不編組的話,可能會被分發給不同的消費者;但是很多時候,我們期望這"一序列"的訊息能夠有序的交付給一個消費者,無論是訊息的傳送還是消費,都希望它們是不可分割的"組合".那麼此時我們需要"訊息編組",我們需要使用到"JMSXGroupID"/"JMSXGroupSeq"兩個屬性.注意JMSProvider並不會根據JMSXGroupSeq進行排序,順序還是需要自己來維護.
1.訊息生產者
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
public class XGroupProducer {
private MessageProducer producer;
private Session session;
private Connection connection;
private boolean isOpen = true;
public XGroupProducer() throws Exception{
Context context = new InitialContext();
ConnectionFactory connectionFactory = (QueueConnectionFactory)context.lookup("QueueCF");
connection = connectionFactory.createConnection();
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Destination queue = (Queue)context.lookup("queue1");
producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
}
public boolean send(String[] texts) {
if(!isOpen){
throw new RuntimeException("session has been closed!");
}
try{
synchronized (this) {
String groupId = connection.getClientID() + ":Group";
Message bm = session.createTextMessage();
bm.setStringProperty("GroupMarker", "begin");
producer.send(bm);
for(int i= 0 ; i < texts.length; i++) {
Message message = session.createTextMessage(texts[i]);
message.setStringProperty("JMSXGroupID", groupId);
message.setIntProperty("JMSXGroupSeq", i +1);
producer.send(message);
}
//取消group粘性
Message em = session.createTextMessage();
em.setStringProperty("JMSXGroupID", groupId);
em.setIntProperty("JMSXGroupSeq", 0);
em.setStringProperty("GroupMarker", "end");
producer.send(em);
session.commit();
}
return true;
}catch(Exception e){
return false;
}
}
public synchronized void close(){
try{
if(isOpen){
isOpen = false;
}
session.close();
connection.close();
}catch (Exception e) {
//
}
}
}
2.訊息消費者
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import com.test.jms.object.XGroupQueueMessageListener;
public class XGroupConsumer {
private Connection connection;
private Session session;
private MessageConsumer consumer;
private boolean isStarted;
public XGroupConsumer() throws Exception{
Context context = new InitialContext();
ConnectionFactory connectionFactory = (QueueConnectionFactory)context.lookup("QueueCF");
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = (Queue)context.lookup("queue1");
consumer = session.createConsumer(queue);
consumer.setMessageListener(new XGroupQueueMessageListener());
}
public synchronized boolean start(){
if(isStarted){
return true;
}
try{
connection.start();
isStarted = true;
return true;
}catch(Exception e){
return false;
}
}
public synchronized void close(){
isStarted = false;
try{
session.close();
connection.close();
}catch(Exception e){
//
}
}
}
3.測試類
public class XGroupTestMain {
/**
* @param args
*/
public static void main(String[] args) throws Exception{
XGroupConsumer consumer = new XGroupConsumer();
consumer.start();
XGroupProducer producer = new XGroupProducer();
producer.send(new String[]{"1","2","3","4"});
}
}