ActiveMQ 實現點對點的訊息模型小Demo
摘要:JMS(JAVA Message Service,java訊息服務) API是一個訊息服務的標準或者說是規範,允許應用程式元件基於JavaEE平臺建立,傳送,接受和讀取訊息.它使分散式通訊耦合度更低,訊息服務更加可靠以及非同步性
JMS有兩種訊息模型:點對點和釋出訂閱模型.本文章主要介紹點對點的訊息模型
P2P模式圖
涉及到的概念
- 訊息佇列(Queue)
- 傳送者(Sender)
- 接收者(Receiver)
- 每個訊息都被髮送到一個特定的佇列,接收者從佇列中獲取訊息。佇列保留著訊息,直到他們被消費或超時。
P2P的特點
- 每個訊息只有一個消費者(Consumer)(即一旦被消費,訊息就不再在訊息佇列中)
- 傳送者和接收者之間在時間上沒有依賴性,也就是說當傳送者傳送了訊息之後,不管接收者有沒有正在執行,它不會影響到訊息被髮送到佇列
- 接收者在成功接收訊息之後需向佇列應答成功
- 如果你希望傳送的每個訊息都應該被成功處理的話,那麼你需要P2P模式
在JMS中,訊息的產生和訊息是非同步的。對於消費來說,JMS的訊息者可以通過兩種方式來消費訊息。
○ 同步
訂閱者或接收者呼叫receive方法來接收訊息,receive方法在能夠接收到訊息之前(或超時之前)將一直阻塞
○ 非同步
訂閱者或接收者可以註冊為一個訊息監聽器。當訊息到達之後,系統自動呼叫監聽器的onMessage方法。
JMS程式設計模型
(1) ConnectionFactory
建立Connection物件的工廠,針對兩種不同的jms訊息模型,分別有QueueConnectionFactory和TopicConnectionFactory兩種。可以通過JNDI來查詢ConnectionFactory物件。
(2) Destination
Destination的意思是訊息生產者的訊息傳送目標或者說訊息消費者的訊息來源。對於訊息生產者來說,它的Destination是某個佇列(Queue)或某個主題(Topic);對於訊息消費者來說,它的Destination也是某個佇列或主題(即訊息來源)。
所以,Destination實際上就是兩種型別的物件:Queue、Topic可以通過JNDI來查詢Destination。
(3) Connection
Connection表示在客戶端和JMS系統之間建立的連結(對TCP/IP socket的包裝)。Connection可以產生一個或多個Session。跟ConnectionFactory一樣,Connection也有兩種型別:QueueConnection和TopicConnection。
(4) Session
Session是我們操作訊息的介面。可以通過session建立生產者、消費者、訊息等。Session提供了事務的功能。當我們需要使用session傳送/接收多個訊息時,可以將這些傳送/接收動作放到一個事務中。同樣,也分QueueSession和TopicSession。
(5) 訊息的生產者
訊息生產者由Session建立,並用於將訊息傳送到Destination。同樣,訊息生產者分兩種型別:QueueSender和TopicPublisher。可以呼叫訊息生產者的方法(send或publish方法)傳送訊息。
(6) 訊息消費者
訊息消費者由Session建立,用於接收被髮送到Destination的訊息。兩種型別:QueueReceiver和TopicSubscriber。可分別通過session的createReceiver(Queue)或createSubscriber(Topic)來建立。當然,也可以session的creatDurableSubscriber方法來建立持久化的訂閱者。
(7) MessageListener
訊息監聽器。如果註冊了訊息監聽器,一旦訊息到達,將自動呼叫監聽器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一種MessageListener。
程式碼:
package com.east.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 訊息的生產者(傳送者)
*
* @author linhaiyun
* @time 2017.12.12
*/
public class JMSProducer {
// 預設連線使用者名稱
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
// 預設連線密碼
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
// 預設連線地址
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
// 傳送的訊息數量
private static final int SENDNUM = 50;
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
// 連線工廠
ConnectionFactory factory;
// 連線
Connection connection = null;
// 會話 接受或者傳送訊息的執行緒
Session session;
// 訊息的目的地
Destination destination;
// 訊息生產者
MessageProducer messageProducer;
// 例項化連線工廠
factory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);
try {
// 通過連線工廠獲取連線
connection = factory.createConnection();
// 啟動連線
connection.start();
// 建立session
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 建立一個名稱為HelloHaige的訊息佇列
destination = session.createQueue("HelloHaige");
// 建立訊息生產者
messageProducer = session.createProducer(destination);
// 傳送訊息
sendMessage(session, messageProducer);
session.commit();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
* 傳送訊息
*
* @param session
* @param messageProducer
* @time 2017.12.12 12:10:25
*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
for (int i = 0; i < JMSProducer.SENDNUM; i++) {
// 建立一條文字訊息
TextMessage message = session.createTextMessage("ActiveMQ 傳送訊息" + i);
System.out.println("傳送訊息:Activemq 傳送訊息" + i);
// 通過訊息生產者發出訊息
messageProducer.send(message);
}
}
}
package com.east.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 訊息的消費者(接受者)
*
* @author linhaiyun
* @time 2017.12.12
*/
public class JMSConsumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;// 預設連線使用者名稱
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;// 預設連線密碼
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;// 預設連線地址
public static void main(String[] args) {
// TODO Auto-generated method stub
ConnectionFactory factory; // 連線工廠
Connection connection; // 連線
Session session; // 會話接受或者傳送訊息的執行緒
Destination destination; // 訊息的目的地
MessageConsumer messageConsumer;// 訊息的消費者
// 例項化連線工廠
factory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
try {
// 通過連線工廠獲取連線
connection = factory.createConnection();
// 啟動連線
connection.start();
// 建立session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 建立一個連線HelloWorld的訊息佇列
destination = session.createQueue("HelloHaige");
// 建立訊息消費者
messageConsumer = session.createConsumer(destination);
while(true){
TextMessage message = (TextMessage) messageConsumer.receive(100000);
if(message !=null){
System.out.println("收到的訊息:" + message.getText());
}else{
break;
}
}
} catch (JMSException e) {
// TODO Auto-generated catch block
System.out.println("例項化消費者失敗!");
e.printStackTrace();
}
}
}
結果:
開啟本地的ActiveMQ 服務
執行ActiveMQ生產者,建立50個訊息,結果如圖
執行消費者進行訊息的接受消費