ActiveMQ訊息機制[QUEUE/TOPIC]例項
阿新 • • 發佈:2019-02-20
一. 點對點訊息機制[QUEUE]
首先得下載依賴activemq的jar包
生產者:
package com.activemq.src; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import com.activemq.main.P2PActivemqTest; /** * P2P生產者 * @author admin * */ public class MyMessageProducer implements Runnable{ @Override public void run() { try { System.out.println("生產者開始!!!"); // 建立連線工廠 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(P2PActivemqTest.BROKE_MQ_URL); // 建立JMS連線例項,並啟動連線 Connection connection = factory.createConnection(); connection.start(); // 建立Session物件,不開啟事務 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 建立目標,可以是 Queue 或 Topic Destination destination = session.createQueue(P2PActivemqTest.QUEUE_NAME); // 建立生成者 MessageProducer producer = session.createProducer(destination); // 設定訊息不需持久化。預設訊息需要持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 建立文字訊息 或者其他格式的資訊1.session.createMapMessage(); 2.session.createObjectMessage(Serializable object); TextMessage message = session.createTextMessage("Hello MQ!"); // 傳送訊息。non-persistent 預設非同步傳送;persistent 默認同步傳送(同步傳送會阻塞生產者send) producer.send(message); // 關閉會話和連線 producer.close(); session.close(); connection.close(); System.out.println("生產者結束!!!"); } catch(Exception e) { e.printStackTrace(); } } }
消費者:
package com.activemq.src; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import com.activemq.main.P2PActivemqTest; /** * P2P消費者 * @author admin * */ public class MyMessageConsumer implements Runnable{ @Override public void run() { try { System.out.println("消費者開始11!!!"); // 建立連線工廠 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(P2PActivemqTest.BROKE_MQ_URL); // 建立JMS連線例項,並啟動連線 Connection connection = connectionFactory.createConnection(); connection.start(); // 建立Session物件,不開啟事務 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 建立目標,可以是 Queue 或 Topic Destination destination = session.createQueue(P2PActivemqTest.QUEUE_NAME); // 建立消費者 MessageConsumer consumer = session.createConsumer(destination); // 獲取訊息 程式需要的話,可以通過consumer.receive()得到的message來判斷是屬於哪個型別的資訊,text map object,
// 例如【message instanceof ObjectMessage】 // 此處注意:當消費者的佇列[名稱]和生產者的佇列[名稱]不一致時,這裡會一直阻塞,直到有這個消費者佇列的資訊為止 TextMessage message = (TextMessage)consumer.receive(); System.out.println(message.getText()); // 消費者不加這行程式碼的話,會造成髒資料,比如另外一個消費者用同樣佇列,也是還會接收到資訊 // 加入這行會通知訊息中心有消費者消費了佇列的資料,並且刪除佇列中的資訊,這樣才符合點對點規則 message.acknowledge(); // 關閉會話和連線 consumer.close(); session.close(); connection.close(); System.out.println("消費者結束11!!!"); } catch(Exception e) { } } }
測試執行類:
package com.activemq.main;
import com.activemq.src.MyMessageConsumer;
import com.activemq.src.MyMessageProducer;
/**
* 測試執行類
* @author admin
*
*/
public class P2PActivemqTest {
// 連線activemq伺服器url
public static final String BROKE_MQ_URL = "tcp://伺服器ip:61616";
// 點對點訊息佇列名稱
public static final String QUEUE_NAME = "P2P-QUEUE";
public static void main(String[] args) {
// 訊息生產者
MyMessageProducer mmp = new MyMessageProducer();
// 訊息消費者
MyMessageConsumer mmc = new MyMessageConsumer();
// 建立執行緒
Thread mp = new Thread(mmp);
Thread mc = new Thread(mmc);
// 啟動執行緒
mp.start();
mc.start();
}
}
其中遇到的問題:
1.因為我的activemq安裝在Linux上,所以執行的時候報了連線超時!
解決方法:關閉了Linux防火牆
2.用多個消費者去測試的時候,發現都能獲取得到生產者釋出到佇列中的資訊,也就是產生了髒資料!
3.關於點對點訊息佇列的其他問題可以關注下這裡【點選參考這裡】
----------------------------------------------------------------
訂閱模式【釋出者/消費者】TOPIC
----------------------------------------------------------------
訊息釋出:
package com.activemq.topic.src;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.activemq.main.TopicActivemqTest;
/**
* Topic生產者
* @author admin
*
*/
public class MyTopicMessageProducer implements Runnable{
@Override
public void run() {
try {
System.out.println("生產者開始!!!");
// 建立連線工廠
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(TopicActivemqTest.BROKE_MQ_URL);
// 建立JMS連線例項,並啟動連線
Connection connection = factory.createConnection();
connection.start();
// 建立Session物件,不開啟事務
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 建立主題
Topic topic = session.createTopic(TopicActivemqTest.TOPIC_NAME);
// 建立生成者
MessageProducer producer = session.createProducer(topic);
// 設定訊息不需持久化。預設訊息需要持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 建立文字訊息 或者其他格式的資訊
// 1.session.createMapMessage(); 2.session.createObjectMessage(Serializable object);
while(true){
TextMessage message = session.createTextMessage("Hello MQ_" + System.currentTimeMillis());
// 傳送訊息。non-persistent 預設非同步傳送;persistent 默認同步傳送
producer.send(message);
System.out.println("傳送訊息:"+message.getText());
Thread.sleep(2000);
// 關閉會話和連線
/*producer.close();
session.close();
connection.close();*/
}
} catch(Exception e) {
e.printStackTrace();
}
}
}
因為訂閱模式基本上都有多個系統進行主題訂閱,所以會有多個消費者!!這裡建立了兩個消費者進行測試:
消費者1:
package com.activemq.topic.src;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.activemq.main.TopicActivemqTest;
/**
* Topic消費者
* @author admin
*
*/
public class MyTopicMessageConsumer implements Runnable{
@Override
public void run() {
try {
System.out.println("消費者開始11!!!");
// 建立連線工廠
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(TopicActivemqTest.BROKE_MQ_URL);
// 建立JMS連線例項,並啟動連線
Connection connection = connectionFactory.createConnection();
connection.start();
// 建立Session物件,不開啟事務
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 建立主題
Topic topic = session.createTopic(TopicActivemqTest.TOPIC_NAME);
// 建立消費者
MessageConsumer consumer = session.createConsumer(topic);
// 非同步消費
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage mess = (TextMessage)message;
try {
System.out.println("接收到的資訊:" + mess.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch(Exception e) {
// do nothing
}
}
}
消費者2:
package com.activemq.topic.src;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.activemq.main.TopicActivemqTest;
/**
* Topic消費者2
* @author admin
*
*/
public class Consumer2 {
public static void main(String[] args) {
try {
// 建立連線工廠
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(TopicActivemqTest.BROKE_MQ_URL);
// 建立JMS連線例項,並啟動連線
Connection connection = connectionFactory.createConnection();
connection.start();
// 建立Session物件,不開啟事務
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 建立主題
Topic topic = session.createTopic(TopicActivemqTest.TOPIC_NAME);
// 建立消費者
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage mess = (TextMessage)message;
try {
System.out.println("接收到的資訊consumer2:" + mess.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch(Exception e) {
// do nothing
}
}
}
測試執行類:
首先啟動消費者2,然後再啟動下面的主測試類:
package com.activemq.main;
import com.activemq.topic.src.MyTopicMessageConsumer;
import com.activemq.topic.src.MyTopicMessageProducer;
/**
* 測試執行類
* @author admin
*
*/
public class TopicActivemqTest {
// 連線activemq伺服器url
public static final String BROKE_MQ_URL = "tcp://伺服器ip:61616";
// 點對點訊息佇列名稱
public static final String TOPIC_NAME = "TOPIC-S-SCHEDUAL";
public static void main(String[] args) {
// 訊息生產者
MyTopicMessageProducer mmp = new MyTopicMessageProducer();
// 訊息消費者
MyTopicMessageConsumer mmc = new MyTopicMessageConsumer();
// 建立執行緒
Thread mp = new Thread(mmp);
Thread mc = new Thread(mmc);
// 啟動執行緒
mp.start();
mc.start();
}
}
以上