1. 程式人生 > >javax.jms及java訊息機制

javax.jms及java訊息機制

JMS(Java Message Service) 即Java訊息服務。

它提供標準的產生、傳送、接收訊息的介面簡化企業 應用的開發。它支援兩種訊息通訊模型:

  1. 點到點(point-to-point)(P2P)模型
  2. 釋出/訂閱(Pub/Sub)模型

P2P 模型規定了一個訊息只能有一個接收者;
Pub/Sub 模型允許一個訊息可以有多個接收者

對於點到點模型,訊息生產者產生一個訊息後,把這個訊息傳送到一個Queue(佇列)中,然後訊息接收者再從這個Queue中讀取,一旦這個訊息被一個接收者讀取之後,它就在這個Queue中消失了,所以一個訊息只能被一個接收者消費

與點到點模型不同,釋出/訂閱模型中,訊息生產者產生一個訊息後,把這個訊息傳送到一個Topic中,這個Topic可以同時有多個接收者在監聽,當一個訊息到達這個Topic之後,所有訊息接收者都會收到這個訊息。

Destination :訊息傳送的目的地,也就是前面說的Queue和Topic。建立好一個訊息之後,只需要把這個訊息傳送到目的地,訊息的傳送者就可以繼續做自己的事情,而不用等待訊息被處理完成。至於這個訊息什麼時候,會被哪個消費者消費,完全取決於訊息的接受者。
Message :從字面上就可以看出是被髮送的訊息。它有下面幾種型別:
StreamMessage: Java 資料流訊息,用標準流操作來順序的填充和讀取。
MapMessage:一個Map型別的訊息;名稱為 string 型別,而值為 Java 的基本型別。
TextMessage:普通字串訊息,包含一個String。
ObjectMessage:

物件訊息,包含一個可序列化的Java 物件
BytesMessage:二進位制陣列訊息,包含一個byte[]。
XMLMessage: 一個XML型別的訊息。
最常用的是TextMessage和ObjectMessage。
Session: 與JMS提供者所建立的會話,通過Session我們才可以建立一個Message。
Connection: 與JMS提供者建立的一個連線。可以從這個連線建立一個會話,即Session。
ConnectionFactory: 那如何建立一個Connection呢?這就需要下面講到的ConnectionFactory了。通過這個工廠類就可以得到一個與JMS提供者的連線,即Conection。
Producer:
訊息的生產者,要傳送一個訊息,必須通過這個生產者來發送。
通過下面這個簡圖可以看出上面這些概念的關係。

ConnectionFactory—->Connection—>Session—>Message
Destination + Session————————————>Producer
Destination + Session————————————>MessageConsumer

Queue實現的是點到點模型

,在下面的例子中,啟動2個消費者共同監聽一個Queue,然後迴圈給這個Queue中傳送多個訊息,我們依然採用ActiveMQ。

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

public class QueueTest {
    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");

        Connection connection = factory.createConnection();
        connection.start();


        //建立一個Queue
        Queue queue = new ActiveMQQueue("testQueue");
        //建立一個Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);


        //註冊消費者1
        MessageConsumer comsumer1 = session.createConsumer(queue);
        comsumer1.setMessageListener(new MessageListener(){
            public void onMessage(Message m) {
                try {
                    System.out.println("Consumer1 get " + ((TextMessage)m).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });


        //註冊消費者2
        MessageConsumer comsumer2 = session.createConsumer(queue);
        comsumer2.setMessageListener(new MessageListener(){
            public void onMessage(Message m) {
                try {
                    System.out.println("Consumer2 get " + ((TextMessage)m).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

        });


        //建立一個生產者,然後傳送多個訊息。
        MessageProducer producer = session.createProducer(queue);
        for(int i=0; i<10; i++){
            producer.send(session.createTextMessage("Message:" + i));
        }
    }
}

執行這個例子會得到下面的輸出結果:

Consumer1 get Message:0
Consumer2 get Message:1
Consumer1 get Message:2
Consumer2 get Message:3
Consumer1 get Message:4
Consumer2 get Message:5
Consumer1 get Message:6
Consumer2 get Message:7
Consumer1 get Message:8
Consumer2 get Message:9

可以看出每個訊息直被消費了一次,但是如果有多個消費者同時監聽一個Queue的話,無法確定一個訊息最終會被哪一個消費者消費。

Topic實現的是釋出/訂閱模型

與Queue不同的是,Topic實現的是釋出/訂閱模型,在下面的例子中,啟動2個消費者共同監聽一個Topic,然後迴圈給這個Topic中傳送多個訊息。

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;

public class TopicTest {
    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");

        Connection connection = factory.createConnection();
        connection.start();

        //建立一個Topic
        Topic topic= new ActiveMQTopic("testTopic");
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //註冊消費者1
        MessageConsumer comsumer1 = session.createConsumer(topic);
        comsumer1.setMessageListener(new MessageListener(){
            public void onMessage(Message m) {
                try {
                    System.out.println("Consumer1 get " + ((TextMessage)m).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });

        //註冊消費者2
        MessageConsumer comsumer2 = session.createConsumer(topic);
        comsumer2.setMessageListener(new MessageListener(){
            public void onMessage(Message m) {
                try {
                    System.out.println("Consumer2 get " + ((TextMessage)m).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

        });

        //建立一個生產者,然後傳送多個訊息。
        MessageProducer producer = session.createProducer(topic);
        for(int i=0; i<10; i++){
            producer.send(session.createTextMessage("Message:" + i));
        }
    }
}

執行後得到下面的輸出結果:

Consumer1 get Message:0
Consumer2 get Message:0
Consumer1 get Message:1
Consumer2 get Message:1
Consumer1 get Message:2
Consumer2 get Message:2
Consumer1 get Message:3
Consumer2 get Message:3
Consumer1 get Message:4
Consumer2 get Message:4
Consumer1 get Message:5
Consumer2 get Message:5
Consumer1 get Message:6
Consumer2 get Message:6
Consumer1 get Message:7
Consumer2 get Message:7
Consumer1 get Message:8
Consumer2 get Message:8
Consumer1 get Message:9
Consumer2 get Message:9

JMSReplyTo

在下面的例子中,首先建立兩個Queue,傳送者給一個Queue傳送,接收者接收到訊息之後給另一個Queue回覆一個Message,然後再建立一個消費者來接受所回覆的訊息。

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class MessageSendReceiveAndReply {
    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");

        Connection connection = factory.createConnection();
        connection.start();

        //訊息傳送到這個Queue
        Queue queue = new ActiveMQQueue("testQueue");
        //訊息回覆到這個Queue
        Queue replyQueue = new ActiveMQQueue("replyQueue");

        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //建立一個訊息,並設定它的JMSReplyTo為replyQueue。
        Message message = session.createTextMessage("Andy");
        message.setJMSReplyTo(replyQueue);

        MessageProducer producer = session.createProducer(queue);
        producer.send(message);

        //訊息的接收者
        MessageConsumer comsumer = session.createConsumer(queue);
        comsumer.setMessageListener(new MessageListener(){
            public void onMessage(Message m) {
                try {
                    //建立一個新的MessageProducer來發送一個回覆訊息。
                    MessageProducer producer = session.createProducer(m.getJMSReplyTo());
                    producer.send(session.createTextMessage("Hello " + ((TextMessage) m).getText()));
                } catch (JMSException e1) {
                    e1.printStackTrace();
                }
            }

        });

        //這個接收者用來接收回復的訊息
        MessageConsumer comsumer2 = session.createConsumer(replyQueue);
        comsumer2.setMessageListener(new MessageListener(){
            public void onMessage(Message m) {
                try {
                    System.out.println(((TextMessage) m).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

首先訊息生產者傳送一個訊息,內容為“Andy”, 然後消費者收到這個訊息之後根據訊息的JMSReplyTo,回覆一個訊息,內容為“Hello Andy‘。 最後在回覆的Queue上建立一個接收回復訊息的消費者,它輸出所回覆的內容。

執行上面的程式,可以得到下面的輸出結果:
Hello Andy