(二)ActiveMQ 點對點系統模型
ActiveMQ 客戶端編寫支援Java, C, C++ 等多種語言,筆者使用Java 語言來實現。測試模型為:一個生產者生產訊息,兩個消費者消費訊息。
1. 引入jar 包
引入activemq-all-5.13.1.jar, 解壓apache-activemq-5.13.1-bin.zip 壓縮包,裡面有。 如果用maven3, 那麼新增Dependency
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.13.1</version> </dependency>
2. 源程式
【1. 生產者一】
package org.zgf.learn.learn.jms.activemq.p2p; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * JMS 生產者 */ public class JMSProducer { //設定預設的使用者名稱、密碼、連線地址 private static final String USE = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD= ActiveMQConnection.DEFAULT_PASSWORD; private static final String BOOKERURL = ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) throws Exception{ //1. 建立JMS 連線工程 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USE, PASSWORD,BOOKERURL); //2. 建立JMS 連線 Connection connection = connectionFactory.createConnection(); //3. 啟動JMS 連線 connection.start(); //4. 建立JMS 會話,需要開啟事務,提交方式為自動提交 Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //5. 建立JMS 訊息佇列 Destination destination = session.createQueue("p2pMsgQueue1"); //6. 建立JMS 訊息生產者 MessageProducer messageProducer = session.createProducer(destination); //7. 建立JMS 訊息(建立最簡單的訊息, 文字訊息) TextMessage textMessage = session.createTextMessage("hello,world! [" + System.currentTimeMillis() + "]"); //8. JMS 訊息生產者 傳送 JMS訊息 messageProducer.send(textMessage); //9. 提交事務 session.commit(); System.out.println("【生成者】訊息傳送成功"); closeConn(connection,session); } private static void closeConn(Connection connection, Session session){ if(null != session){ try { session.close(); } catch (JMSException e) { e.printStackTrace(); }finally { if(connection != null){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } } }
【2. 消費者一】
package org.zgf.learn.learn.jms.activemq.p2p; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * JMS 消費者一 */ public class JMSCustumer1 { //設定預設的使用者名稱、密碼、連線地址 private static final String USE = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD= ActiveMQConnection.DEFAULT_PASSWORD; private static final String BOOKERURL = ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) throws Exception{ //1. 建立JMS 連線工程 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USE, PASSWORD,BOOKERURL); //2. 建立JMS 連線 Connection connection = connectionFactory.createConnection(); //3. 啟動JMS 連線 connection.start(); //4. 建立JMS 會話,不需要開啟事務,提交方式為自動提交 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5. 建立JMS 訊息佇列 Destination destination = session.createQueue("p2pMsgQueue1"); //6. 建立JMS 訊息消費者 MessageConsumer messageConsumer = session.createConsumer(destination); //7. 為消費者設定監聽器 messageConsumer.setMessageListener(new JMSTextMsgListener("消費 者一")); System.out.println("消費者一開始監聽...."); //不能關閉連線,關閉之後就不能接受到訊息了 } }
【3. 消費者二】
package org.zgf.learn.learn.jms.activemq.p2p;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* JMS 消費者二
*/
public class JMSCustumer2 {
//設定預設的使用者名稱、密碼、連線地址
private static final String USE = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD= ActiveMQConnection.DEFAULT_PASSWORD;
private static final String BOOKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) throws Exception{
//1. 建立JMS 連線工程
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USE, PASSWORD,BOOKERURL);
//2. 建立JMS 連線
Connection connection = connectionFactory.createConnection();
//3. 啟動JMS 連線
connection.start();
//4. 建立JMS 會話,不需要開啟事務,提交方式為自動提交
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5. 建立JMS 訊息佇列
Destination destination = session.createQueue("p2pMsgQueue1");
//6. 建立JMS 訊息消費者
MessageConsumer messageConsumer = session.createConsumer(destination);
//7. 為消費者設定監聽器
messageConsumer.setMessageListener(new JMSTextMsgListener("消費者二"));
System.out.println("消費者二開始監聽....");
//不能關閉連線,關閉之後就不能接受到訊息了
}
}
【4. 訊息監聽器】
package org.zgf.learn.learn.jms.activemq.p2p;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* JMS 文字訊息監聽器
*
*/
public class JMSTextMsgListener implements MessageListener{
//消費者名稱
private final String customerName ;
public JMSTextMsgListener(String customerName) {
this.customerName = customerName;
}
@Override
public void onMessage(Message message) {
//1. 強制轉換訊息
TextMessage textMessage = (TextMessage) message;
//2. 獲取接收到的訊息內容
try {
String msgContent = textMessage.getText();
System.out.println("【" + this.customerName + "】接受到的訊息內容為:" + msgContent);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
3. 測試用例
0. 測試之前Activemq 中訊息佇列為空
1. 生產者生產併發送兩條訊息, 此時在web瀏覽器中可以看到:
未被消費的訊息:2 , 當前消費者個數:0, 訊息入隊個數:2, 訊息出隊:0
2. 啟動消費者一, 由於目前訊息佇列中存在兩條未消費的訊息,所以消費者一會立即消費這兩條訊息。
未被消費的訊息:0 , 當前消費者個數:1, 訊息入隊個數:2, 訊息出隊:2
消費者一控制檯列印結果:
3. 啟動消費者二 , 由於目前訊息佇列中沒有被訊息的訊息,所以消費者二不消費任何訊息, 此時有兩個消費者在監聽。
未被消費的訊息:0 , 當前消費者個數:1, 訊息入隊個數:2, 訊息出隊:2
消費者二控制檯列印結果:
4. 生產者每次傳送一條訊息,會發現,消費者一和消費者二進行輪流消費訊息,消費順序按啟動監聽的順序進行。
4. 總結
1. 在點對點模型中,未被消費的訊息會儲存在activeMQ 佇列中
2. 當一個生產者擁有多個消費者時,多個消費者將按照監聽順序輪流消費訊息
3. 切記一條訊息只能被一個消費者消費。