1. 程式人生 > >ActiveMQ訊息機制[QUEUE/TOPIC]例項

ActiveMQ訊息機制[QUEUE/TOPIC]例項

一. 點對點訊息機制[QUEUE]

首先得下載依賴activemq的jar包

生產者:

package com.activemq.src;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.activemq.main.P2PActivemqTest;

/**
 * P2P生產者
 * @author admin
 *
 */
public class MyMessageProducer implements Runnable{

	@Override
	public void run() {
		try {
			System.out.println("生產者開始!!!");
			
            // 建立連線工廠
            ActiveMQConnectionFactory factory  = new ActiveMQConnectionFactory(P2PActivemqTest.BROKE_MQ_URL);
            
            // 建立JMS連線例項,並啟動連線
            Connection connection = factory.createConnection();
            connection.start();
            
            // 建立Session物件,不開啟事務
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 建立目標,可以是 Queue 或 Topic
            Destination destination = session.createQueue(P2PActivemqTest.QUEUE_NAME);
            
            // 建立生成者
            MessageProducer producer = session.createProducer(destination);
            
            // 設定訊息不需持久化。預設訊息需要持久化
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            
            // 建立文字訊息  或者其他格式的資訊1.session.createMapMessage(); 2.session.createObjectMessage(Serializable object);
            TextMessage message = session.createTextMessage("Hello MQ!");
            
            // 傳送訊息。non-persistent 預設非同步傳送;persistent 默認同步傳送(同步傳送會阻塞生產者send)
            producer.send(message);
            
            // 關閉會話和連線
            producer.close();
            session.close();
            connection.close();
            
            System.out.println("生產者結束!!!");
        } catch(Exception e) {
            e.printStackTrace();
        }
    }
}

消費者:
package com.activemq.src;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.activemq.main.P2PActivemqTest;

/**
 * P2P消費者
 * @author admin
 *
 */
public class MyMessageConsumer implements Runnable{

	@Override
	public void run() {
		try {
			System.out.println("消費者開始11!!!");
			
            // 建立連線工廠
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(P2PActivemqTest.BROKE_MQ_URL);
            
            // 建立JMS連線例項,並啟動連線
            Connection connection = connectionFactory.createConnection();
            connection.start();
            
            // 建立Session物件,不開啟事務
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 建立目標,可以是 Queue 或 Topic 
            Destination destination = session.createQueue(P2PActivemqTest.QUEUE_NAME);
            
            // 建立消費者
            MessageConsumer consumer = session.createConsumer(destination);
            
            // 獲取訊息  程式需要的話,可以通過consumer.receive()得到的message來判斷是屬於哪個型別的資訊,text  map object,
            // 例如【message instanceof ObjectMessage】
            // 此處注意:當消費者的佇列[名稱]和生產者的佇列[名稱]不一致時,這裡會一直阻塞,直到有這個消費者佇列的資訊為止
            TextMessage  message = (TextMessage)consumer.receive();
            System.out.println(message.getText());
            
            // 消費者不加這行程式碼的話,會造成髒資料,比如另外一個消費者用同樣佇列,也是還會接收到資訊
            // 加入這行會通知訊息中心有消費者消費了佇列的資料,並且刪除佇列中的資訊,這樣才符合點對點規則
            message.acknowledge();
            
            // 關閉會話和連線
            consumer.close();
            session.close();
            connection.close();
            
            System.out.println("消費者結束11!!!");
        } catch(Exception e) {
        }
	}
}

測試執行類:
package com.activemq.main;

import com.activemq.src.MyMessageConsumer;
import com.activemq.src.MyMessageProducer;

/**
 * 測試執行類
 * @author admin
 *
 */
public class P2PActivemqTest {
	// 連線activemq伺服器url
	public static final String BROKE_MQ_URL = "tcp://伺服器ip:61616";
	
	// 點對點訊息佇列名稱
	public static final String QUEUE_NAME = "P2P-QUEUE";

	public static void main(String[] args) {
		// 訊息生產者
		MyMessageProducer mmp = new MyMessageProducer();
		// 訊息消費者
		MyMessageConsumer mmc = new MyMessageConsumer();
		
		// 建立執行緒
		Thread mp = new Thread(mmp);
		Thread mc = new Thread(mmc);
		
		// 啟動執行緒
		mp.start();
		mc.start();
	}
}


其中遇到的問題:

1.因為我的activemq安裝在Linux上,所以執行的時候報了連線超時!

解決方法:關閉了Linux防火牆

2.用多個消費者去測試的時候,發現都能獲取得到生產者釋出到佇列中的資訊,也就是產生了髒資料!

3.關於點對點訊息佇列的其他問題可以關注下這裡【點選參考這裡

----------------------------------------------------------------

訂閱模式【釋出者/消費者】TOPIC

----------------------------------------------------------------

訊息釋出:

package com.activemq.topic.src;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.activemq.main.TopicActivemqTest;

/**
 * Topic生產者
 * @author admin
 *
 */
public class MyTopicMessageProducer implements Runnable{

	@Override
	public void run() {
		try {
			System.out.println("生產者開始!!!");
			
            // 建立連線工廠
            ActiveMQConnectionFactory factory  = new ActiveMQConnectionFactory(TopicActivemqTest.BROKE_MQ_URL);
            
            // 建立JMS連線例項,並啟動連線
            Connection connection = factory.createConnection();
            connection.start();
            
            // 建立Session物件,不開啟事務
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 建立主題
            Topic topic = session.createTopic(TopicActivemqTest.TOPIC_NAME);
            
            // 建立生成者
            MessageProducer producer = session.createProducer(topic);
            
            // 設定訊息不需持久化。預設訊息需要持久化
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            
            // 建立文字訊息  或者其他格式的資訊
            // 1.session.createMapMessage(); 2.session.createObjectMessage(Serializable object);
            while(true){
            	TextMessage message = session.createTextMessage("Hello MQ_" + System.currentTimeMillis());
            	
            	// 傳送訊息。non-persistent 預設非同步傳送;persistent 默認同步傳送
            	producer.send(message);
            	
            	System.out.println("傳送訊息:"+message.getText());
            	
            	Thread.sleep(2000);
            	
            	// 關閉會話和連線
            	/*producer.close();
            	session.close();
            	connection.close();*/
            }
            
        } catch(Exception e) {
            e.printStackTrace();
        }
    }
}

因為訂閱模式基本上都有多個系統進行主題訂閱,所以會有多個消費者!!這裡建立了兩個消費者進行測試:

消費者1:

package com.activemq.topic.src;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.activemq.main.TopicActivemqTest;

/**
 * Topic消費者
 * @author admin
 *
 */
public class MyTopicMessageConsumer implements Runnable{

	@Override
	public void run() {
		try {
			System.out.println("消費者開始11!!!");
			
            // 建立連線工廠
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(TopicActivemqTest.BROKE_MQ_URL);
            
            // 建立JMS連線例項,並啟動連線
            Connection connection = connectionFactory.createConnection();
            connection.start();
            
            // 建立Session物件,不開啟事務
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 建立主題
            Topic topic = session.createTopic(TopicActivemqTest.TOPIC_NAME);
            
            // 建立消費者
            MessageConsumer consumer = session.createConsumer(topic);
            // 非同步消費
            consumer.setMessageListener(new MessageListener() {
				@Override
				public void onMessage(Message message) {
					TextMessage  mess = (TextMessage)message;
		            try {
						System.out.println("接收到的資訊:" + mess.getText());
					} catch (JMSException e) {
						e.printStackTrace();
					}
				}
			});
        } catch(Exception e) {
        	// do nothing
        }
	}
}

消費者2:
package com.activemq.topic.src;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.activemq.main.TopicActivemqTest;

/**
 * Topic消費者2
 * @author admin
 *
 */
public class Consumer2 {

	public static void main(String[] args) {
		try {
            // 建立連線工廠
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(TopicActivemqTest.BROKE_MQ_URL);
            
            // 建立JMS連線例項,並啟動連線
            Connection connection = connectionFactory.createConnection();
            connection.start();
            
            // 建立Session物件,不開啟事務
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 建立主題
            Topic topic = session.createTopic(TopicActivemqTest.TOPIC_NAME);
            
            // 建立消費者
            MessageConsumer consumer = session.createConsumer(topic);
            
            consumer.setMessageListener(new MessageListener() {
				@Override
				public void onMessage(Message message) {
					TextMessage  mess = (TextMessage)message;
		            try {
						System.out.println("接收到的資訊consumer2:" + mess.getText());
					} catch (JMSException e) {
						e.printStackTrace();
					}
				}
			});
        } catch(Exception e) {
        	// do nothing
        }
	}

}


測試執行類:

首先啟動消費者2,然後再啟動下面的主測試類:

package com.activemq.main;

import com.activemq.topic.src.MyTopicMessageConsumer;
import com.activemq.topic.src.MyTopicMessageProducer;


/**
 * 測試執行類
 * @author admin
 *
 */
public class TopicActivemqTest {
	// 連線activemq伺服器url
	public static final String BROKE_MQ_URL = "tcp://伺服器ip:61616";
	
	// 點對點訊息佇列名稱
	public static final String TOPIC_NAME = "TOPIC-S-SCHEDUAL";

	public static void main(String[] args) {
		// 訊息生產者
		MyTopicMessageProducer mmp = new MyTopicMessageProducer();
		// 訊息消費者
		MyTopicMessageConsumer mmc = new MyTopicMessageConsumer();
		
		// 建立執行緒
		Thread mp = new Thread(mmp);
		Thread mc = new Thread(mmc);
		
		// 啟動執行緒
		mp.start();
		mc.start();
	}
}


以上