1. 程式人生 > >JMS訊息傳送機制

JMS訊息傳送機制

JMS不僅需要高效的儲存訊息,還需要確保訊息能夠無誤的被傳輸.這就需要JMS提供一種"保證傳送"機制和事務.如果拋開JMS的規範,那麼它的技術實現本身就是網路IO + 檔案儲存;其中網路IO的困境就在"資料傳輸安全保證""網路失效"等方面,"檔案儲存"即要求資料需要被高效的存取.其中"檔案儲存"層面已經有較多的第三方儲存工具和解決方案,比如檔案型別資料庫/記憶體資料庫等.如果訊息系統需要分散式且整體架構良好,那麼上述問題,就更加棘手.

 

一.保證傳送

    保證傳送,用來描述JMS Client和JMS Provider之間的訊息互動是"可靠的",這種"可靠"並非絕對意義上的百分百資料不丟失或者資料不重複,所謂"保證傳送",就是在良好的API設計和相對良好的執行環境中,訊息最終會被接收和消費;但是如果API設計不良或者執行環境極其糟糕,比如事務使用不正確,或者確認機制操作不當,或者網路環境糟糕以至於無法連續的通訊等,都將會導致訊息的傳送和接收,出現"意料之外"的事情:佇列訊息被多次傳送,訊息被重複接收等.

    


 一條訊息的"來龍去脈"可以簡單的通過上圖表示

    1:  訊息生產者中send會阻塞操作(網路IO阻塞),如果網路異常,將導致訊息傳送失敗,此時生產者需要在編碼設計上做好異常檢測或者重發機制;其中網路異常主要體現在"網路中斷"或者JMS Provider失效.

    2:  JMS Provider接收到訊息之後,將會儲存在記憶體中(或者DB中),此後並立即向此Producer傳送"ACK"訊號,如果JMS Provider在儲存上遇到無法解決的異常(比如磁碟故障,嵌入式DB異常),那麼"ACK"響應將會攜帶異常資訊(message);如果"ACK"傳送中網路IO異常導致無法傳送,將會導致"此訊息"被移除,此時send方法也將從阻塞中返回並丟擲異常.因為網路IO異常,通常TCP連結的雙端都能及時的感知到.

        ACK傳送正常,指的是TCP連線上"ACK"資料已經通過網路傳送給了Client,這個過程中沒有發生異常,此後也意味著當前JMS訊息被持久儲存,producer的send方法也正確返回.此後如果Client端在處理ACK資訊過程中出現問題(幾乎不可能),那麼JMS Provider也將認為此訊息已經被正確傳送.需要提醒的是,在Client端:訊息的send和ACK在IO流上不是一個連續的過程,在不同的確認時機中,ACK資訊是可以是同步的(send方法阻塞,知道收到ACK,比如持久化型別的訊息),有些時非同步獲取的(比如事務中的訊息,或者非持久化訊息),不過JMS中的ACK MODE都producer並沒有任何意義..

    3: 訊息消費者receive訊息,這個操作在Queue中是consumer主動"拉取"(監聽),在Topic中是Provider推送;當消費者接收到訊息之後,可以立即確認(AUTO),然後再去執行與訊息有關的業務;也可以"延遲"確認等.

    4: 當訊息被消費者接收到之後,JMS Provider將訊息新增到此消費者的"待確認訊息"列表,如果JMS Provider在此消費者的IO通道中阻塞返回,但卻沒有收到ACK,那麼將導致此訊息重發.如果ACK正常接收到,那麼JMS Provider將會把此訊息從持久儲存裝置刪除.

 

    如下為JMS中ACK MODE:

 

    1. AUTO_ACKNOWLEDGE:自動確認,當Consumer客戶端(通常是指Consumer所屬的Session)在收到訊息之後,即準備確認訊息,通常在receive()方法返回之前,或者在messageListener.onMessage()正常執行之後,向Provider傳送ACK指令..所謂AUTO_ACK,就是確認的時機有Session"擇機"實施;開發者無法干擾.

 

    2. DUPS_OK_ACKNOWLEDGE: 允許延遲批量確認,重點在"批量",AUTO_ACK通常是確認一條訊息(事實上在不同的JMS實現中,它仍然可以像DUPS一樣確認多條訊息)..當消費者接收到訊息之後,並不是立即確認,而是"滯留"一段時間之後才會確認(批量確認);這種方式直觀上是提升了client端消費資料的速度(優先接觸訊息),但這種模式下,需要消費者能夠接受"訊息重發"的情況,比如當Consumer客戶端失效重啟,那些尚未確認但已經被"touch"(消費)過的訊息有可能會重複接受到;對於JMS Provider而言,如果它"等待"一定時間後,仍未收到"確認",將會重發訊息.,這種模式,對效能的提升是不明確的,因為較晚的確認訊息,意味著JMS Provider需要更長時間的保留"待確認"訊息列表..究竟多少條訊息作為"批量"的邊界,有具體的JMS實現者決定.

 

    3. CLIENT_ACKNOWLEDGE: 客戶端確認,需要要求訊息消費者明確的呼叫message.acknowledge()方法來確認此訊息的接收成功,無論何種原因(未捕獲異常),如果此方法未被呼叫,那麼此訊息將會被重發.這種模式下,允許消費者自己選擇時機確認訊息,通常使用在訊息編組(group)的情況下:將一系列訊息作為一個group接收,當group中最後一個訊息確認成功後,那麼JMS Provider就認為此組訊息全部接收成功(只需確認組的最後一條訊息即可,那麼JMS Provider會認為此前的其他訊息也接收正常).

 

二.事務

    事務用來描述"一系列訊息要麼全部確認成功,要麼全不確認"的特徵,它和資料庫事務最終需要達成的效果是一樣的.JMS Provider會快取每個生產者當前事務下的所有訊息,直到commit或者rollback.commit操作將會導致事務中所有的訊息被持久儲存;rollback意味著JMS Provider將會清除此事務下所有的訊息記錄...在事務未提交之前,訊息是不會被持久儲存的,也不會被消費者消費.

    每次事務提交之後,在client端會生成一個事務ID(一個session中不會出現重複的ID,clientID:sessionID:txID);事務的提交或者回滾都會攜帶ID.對於producer而言,在事務型別的session中,傳送訊息(一個或者多個)之後,需要執行session.commit(),否則訊息將不會被儲存.對於consumer而言,訊息接收到之後,需要手動的使用commit,否則JMS Provider會認為訊息沒有被接收,導致重發,因此你可以認為commit就是一個訊息確認操作.

session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);  
try{  
    Message message = session.createTextMessage(text);  
    String cid = "ID:" + System.currentTimeMillis();  
    message.setJMSCorrelationID(cid);  
    producer.send(message);  
    session.commit();  
}catch(Exception e){  
//  
}  
public class QueueMessageListener implements MessageListener{  
  
    private Session session;  
    public QueueMessageListener(Session session){  
        this.session = session;  
    }  
    public void onMessage(Message message) {  
        if(message == null){  
            return;  
        }  
        try{  
            //message handler  
            //  
            session.commit();  
        }catch(Exception e){  
            e.printStackTrace();  
        }  
    }  
}

三.訊息編組

    一序列訊息,如果不編組的話,可能會被分發給不同的消費者;但是很多時候,我們期望這"一序列"的訊息能夠有序的交付給一個消費者,無論是訊息的傳送還是消費,都希望它們是不可分割的"組合".那麼此時我們需要"訊息編組",我們需要使用到"JMSXGroupID"/"JMSXGroupSeq"兩個屬性.注意JMSProvider並不會根據JMSXGroupSeq進行排序,順序還是需要自己來維護.

   1.訊息生產者

import javax.jms.Connection;  
import javax.jms.ConnectionFactory;  
import javax.jms.DeliveryMode;  
import javax.jms.Destination;  
import javax.jms.Message;  
import javax.jms.MessageProducer;  
import javax.jms.Queue;  
import javax.jms.QueueConnectionFactory;  
import javax.jms.Session;  
import javax.naming.Context;  
import javax.naming.InitialContext;  
  
public class XGroupProducer {  
  
    private MessageProducer producer;  
    private Session session;  
    private Connection connection;  
    private boolean isOpen = true;  
      
    public XGroupProducer() throws Exception{  
        Context context = new InitialContext();  
        ConnectionFactory connectionFactory = (QueueConnectionFactory)context.lookup("QueueCF");  
        connection = connectionFactory.createConnection();  
        session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);  
        Destination queue = (Queue)context.lookup("queue1");  
        producer = session.createProducer(queue);  
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);  
        connection.start();  
          
    }  
      
      
    public boolean send(String[] texts) {  
        if(!isOpen){  
            throw new RuntimeException("session has been closed!");  
        }  
        try{  
            synchronized (this) {  
                String groupId = connection.getClientID() + ":Group";  
                Message bm = session.createTextMessage();  
                bm.setStringProperty("GroupMarker", "begin");  
                producer.send(bm);  
                for(int i= 0 ; i < texts.length; i++) {  
                    Message message = session.createTextMessage(texts[i]);  
                    message.setStringProperty("JMSXGroupID", groupId);  
                    message.setIntProperty("JMSXGroupSeq", i +1);  
                    producer.send(message);  
                }  
                //取消group粘性  
                Message em = session.createTextMessage();  
                em.setStringProperty("JMSXGroupID", groupId);  
                em.setIntProperty("JMSXGroupSeq", 0);  
                em.setStringProperty("GroupMarker", "end");  
                producer.send(em);  
                session.commit();  
            }  
            return true;  
        }catch(Exception e){  
            return false;  
        }  
    }  
      
      
    public synchronized void close(){  
        try{  
            if(isOpen){  
                isOpen = false;  
            }  
            session.close();  
            connection.close();  
        }catch (Exception e) {  
            //  
        }  
    }  
      
}  

    2.訊息消費者 

import javax.jms.Connection;  
import javax.jms.ConnectionFactory;  
import javax.jms.Destination;  
import javax.jms.MessageConsumer;  
import javax.jms.Queue;  
import javax.jms.QueueConnectionFactory;  
import javax.jms.Session;  
import javax.naming.Context;  
import javax.naming.InitialContext;  
  
import com.test.jms.object.XGroupQueueMessageListener;  
  
public class XGroupConsumer {  
  
    private Connection connection;  
    private Session session;  
    private MessageConsumer consumer;  
      
    private boolean isStarted;  
      
    public XGroupConsumer() throws Exception{  
        Context context = new InitialContext();  
        ConnectionFactory connectionFactory = (QueueConnectionFactory)context.lookup("QueueCF");  
        connection = connectionFactory.createConnection();  
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
        Destination queue = (Queue)context.lookup("queue1");  
        consumer = session.createConsumer(queue);  
        consumer.setMessageListener(new XGroupQueueMessageListener());  
          
    }  
      
      
    public synchronized boolean start(){  
        if(isStarted){  
            return true;  
        }  
        try{  
            connection.start();  
            isStarted = true;  
            return true;  
        }catch(Exception e){  
            return false;  
        }  
    }  
      
    public synchronized void close(){  
        isStarted = false;  
        try{  
            session.close();  
            connection.close();  
        }catch(Exception e){  
            //  
        }  
    }  
      
} 

   3.測試類 

public class XGroupTestMain {  
  
    /** 
     * @param args 
     */  
    public static void main(String[] args) throws Exception{  
        XGroupConsumer consumer = new XGroupConsumer();  
        consumer.start();  
        XGroupProducer producer = new XGroupProducer();  
        producer.send(new String[]{"1","2","3","4"});  
  
    }  
  
}