javax.jms及java訊息機制
JMS(Java Message Service) 即Java訊息服務。
它提供標準的產生、傳送、接收訊息的介面簡化企業 應用的開發。它支援兩種訊息通訊模型:
- 點到點(point-to-point)(P2P)模型
- 釋出/訂閱(Pub/Sub)模型
P2P 模型規定了一個訊息只能有一個接收者;
Pub/Sub 模型允許一個訊息可以有多個接收者
對於點到點模型,訊息生產者產生一個訊息後,把這個訊息傳送到一個Queue(佇列)中,然後訊息接收者再從這個Queue中讀取,一旦這個訊息被一個接收者讀取之後,它就在這個Queue中消失了,所以一個訊息只能被一個接收者消費
與點到點模型不同,釋出/訂閱模型中,訊息生產者產生一個訊息後,把這個訊息傳送到一個Topic中,這個Topic可以同時有多個接收者在監聽,當一個訊息到達這個Topic之後,所有訊息接收者都會收到這個訊息。
Destination :訊息傳送的目的地,也就是前面說的Queue和Topic。建立好一個訊息之後,只需要把這個訊息傳送到目的地,訊息的傳送者就可以繼續做自己的事情,而不用等待訊息被處理完成。至於這個訊息什麼時候,會被哪個消費者消費,完全取決於訊息的接受者。
Message :從字面上就可以看出是被髮送的訊息。它有下面幾種型別:
StreamMessage: Java 資料流訊息,用標準流操作來順序的填充和讀取。
MapMessage:一個Map型別的訊息;名稱為 string 型別,而值為 Java 的基本型別。
TextMessage:普通字串訊息,包含一個String。
ObjectMessage:
BytesMessage:二進位制陣列訊息,包含一個byte[]。
XMLMessage: 一個XML型別的訊息。
最常用的是TextMessage和ObjectMessage。
Session: 與JMS提供者所建立的會話,通過Session我們才可以建立一個Message。
Connection: 與JMS提供者建立的一個連線。可以從這個連線建立一個會話,即Session。
ConnectionFactory: 那如何建立一個Connection呢?這就需要下面講到的ConnectionFactory了。通過這個工廠類就可以得到一個與JMS提供者的連線,即Conection。
Producer:
通過下面這個簡圖可以看出上面這些概念的關係。
ConnectionFactory—->Connection—>Session—>Message
Destination + Session————————————>Producer
Destination + Session————————————>MessageConsumer
Queue實現的是點到點模型
,在下面的例子中,啟動2個消費者共同監聽一個Queue,然後迴圈給這個Queue中傳送多個訊息,我們依然採用ActiveMQ。
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class QueueTest {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
//建立一個Queue
Queue queue = new ActiveMQQueue("testQueue");
//建立一個Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//註冊消費者1
MessageConsumer comsumer1 = session.createConsumer(queue);
comsumer1.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
System.out.println("Consumer1 get " + ((TextMessage)m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//註冊消費者2
MessageConsumer comsumer2 = session.createConsumer(queue);
comsumer2.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
System.out.println("Consumer2 get " + ((TextMessage)m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//建立一個生產者,然後傳送多個訊息。
MessageProducer producer = session.createProducer(queue);
for(int i=0; i<10; i++){
producer.send(session.createTextMessage("Message:" + i));
}
}
}
執行這個例子會得到下面的輸出結果:
Consumer1 get Message:0
Consumer2 get Message:1
Consumer1 get Message:2
Consumer2 get Message:3
Consumer1 get Message:4
Consumer2 get Message:5
Consumer1 get Message:6
Consumer2 get Message:7
Consumer1 get Message:8
Consumer2 get Message:9
可以看出每個訊息直被消費了一次,但是如果有多個消費者同時監聽一個Queue的話,無法確定一個訊息最終會被哪一個消費者消費。
Topic實現的是釋出/訂閱模型
與Queue不同的是,Topic實現的是釋出/訂閱模型,在下面的例子中,啟動2個消費者共同監聽一個Topic,然後迴圈給這個Topic中傳送多個訊息。
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
public class TopicTest {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
//建立一個Topic
Topic topic= new ActiveMQTopic("testTopic");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//註冊消費者1
MessageConsumer comsumer1 = session.createConsumer(topic);
comsumer1.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
System.out.println("Consumer1 get " + ((TextMessage)m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//註冊消費者2
MessageConsumer comsumer2 = session.createConsumer(topic);
comsumer2.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
System.out.println("Consumer2 get " + ((TextMessage)m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//建立一個生產者,然後傳送多個訊息。
MessageProducer producer = session.createProducer(topic);
for(int i=0; i<10; i++){
producer.send(session.createTextMessage("Message:" + i));
}
}
}
執行後得到下面的輸出結果:
Consumer1 get Message:0
Consumer2 get Message:0
Consumer1 get Message:1
Consumer2 get Message:1
Consumer1 get Message:2
Consumer2 get Message:2
Consumer1 get Message:3
Consumer2 get Message:3
Consumer1 get Message:4
Consumer2 get Message:4
Consumer1 get Message:5
Consumer2 get Message:5
Consumer1 get Message:6
Consumer2 get Message:6
Consumer1 get Message:7
Consumer2 get Message:7
Consumer1 get Message:8
Consumer2 get Message:8
Consumer1 get Message:9
Consumer2 get Message:9
JMSReplyTo
在下面的例子中,首先建立兩個Queue,傳送者給一個Queue傳送,接收者接收到訊息之後給另一個Queue回覆一個Message,然後再建立一個消費者來接受所回覆的訊息。
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class MessageSendReceiveAndReply {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
//訊息傳送到這個Queue
Queue queue = new ActiveMQQueue("testQueue");
//訊息回覆到這個Queue
Queue replyQueue = new ActiveMQQueue("replyQueue");
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立一個訊息,並設定它的JMSReplyTo為replyQueue。
Message message = session.createTextMessage("Andy");
message.setJMSReplyTo(replyQueue);
MessageProducer producer = session.createProducer(queue);
producer.send(message);
//訊息的接收者
MessageConsumer comsumer = session.createConsumer(queue);
comsumer.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
//建立一個新的MessageProducer來發送一個回覆訊息。
MessageProducer producer = session.createProducer(m.getJMSReplyTo());
producer.send(session.createTextMessage("Hello " + ((TextMessage) m).getText()));
} catch (JMSException e1) {
e1.printStackTrace();
}
}
});
//這個接收者用來接收回復的訊息
MessageConsumer comsumer2 = session.createConsumer(replyQueue);
comsumer2.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
System.out.println(((TextMessage) m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
首先訊息生產者傳送一個訊息,內容為“Andy”, 然後消費者收到這個訊息之後根據訊息的JMSReplyTo,回覆一個訊息,內容為“Hello Andy‘。 最後在回覆的Queue上建立一個接收回復訊息的消費者,它輸出所回覆的內容。
執行上面的程式,可以得到下面的輸出結果:
Hello Andy