1. 程式人生 > >ActiveMQ 實現點對點的訊息模型小Demo

ActiveMQ 實現點對點的訊息模型小Demo

      摘要:JMS(JAVA Message Service,java訊息服務) API是一個訊息服務的標準或者說是規範,允許應用程式元件基於JavaEE平臺建立,傳送,接受和讀取訊息.它使分散式通訊耦合度更低,訊息服務更加可靠以及非同步性

 JMS有兩種訊息模型:點對點和釋出訂閱模型.本文章主要介紹點對點的訊息模型

P2P模式圖 

     

涉及到的概念 

  1. 訊息佇列(Queue)
  2. 傳送者(Sender)
  3. 接收者(Receiver)
  4. 每個訊息都被髮送到一個特定的佇列,接收者從佇列中獲取訊息。佇列保留著訊息,直到他們被消費或超時。

P2P的特點

  1. 每個訊息只有一個消費者(Consumer)(即一旦被消費,訊息就不再在訊息佇列中)
  2. 傳送者和接收者之間在時間上沒有依賴性,也就是說當傳送者傳送了訊息之後,不管接收者有沒有正在執行,它不會影響到訊息被髮送到佇列
  3. 接收者在成功接收訊息之後需向佇列應答成功
  4. 如果你希望傳送的每個訊息都應該被成功處理的話,那麼你需要P2P模式
訊息的消費 
     在JMS中,訊息的產生和訊息是非同步的。對於消費來說,JMS的訊息者可以通過兩種方式來消費訊息。 
     ○ 同步 
         訂閱者或接收者呼叫receive方法來接收訊息,receive方法在能夠接收到訊息之前(或超時之前)將一直阻塞 
     ○ 非同步 
         訂閱者或接收者可以註冊為一個訊息監聽器。當訊息到達之後,系統自動呼叫監聽器的onMessage方法。

JMS程式設計模型

(1) ConnectionFactory

建立Connection物件的工廠,針對兩種不同的jms訊息模型,分別有QueueConnectionFactory和TopicConnectionFactory兩種。可以通過JNDI來查詢ConnectionFactory物件。

(2) Destination

Destination的意思是訊息生產者的訊息傳送目標或者說訊息消費者的訊息來源。對於訊息生產者來說,它的Destination是某個佇列(Queue)或某個主題(Topic);對於訊息消費者來說,它的Destination也是某個佇列或主題(即訊息來源)。

所以,Destination實際上就是兩種型別的物件:Queue、Topic可以通過JNDI來查詢Destination。

(3) Connection

Connection表示在客戶端和JMS系統之間建立的連結(對TCP/IP socket的包裝)。Connection可以產生一個或多個Session。跟ConnectionFactory一樣,Connection也有兩種型別:QueueConnection和TopicConnection。

(4) Session

Session是我們操作訊息的介面。可以通過session建立生產者、消費者、訊息等。Session提供了事務的功能。當我們需要使用session傳送/接收多個訊息時,可以將這些傳送/接收動作放到一個事務中。同樣,也分QueueSession和TopicSession。

(5) 訊息的生產者

訊息生產者由Session建立,並用於將訊息傳送到Destination。同樣,訊息生產者分兩種型別:QueueSender和TopicPublisher。可以呼叫訊息生產者的方法(send或publish方法)傳送訊息。

(6) 訊息消費者

訊息消費者由Session建立,用於接收被髮送到Destination的訊息。兩種型別:QueueReceiver和TopicSubscriber。可分別通過session的createReceiver(Queue)或createSubscriber(Topic)來建立。當然,也可以session的creatDurableSubscriber方法來建立持久化的訂閱者。

(7) MessageListener

訊息監聽器。如果註冊了訊息監聽器,一旦訊息到達,將自動呼叫監聽器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一種MessageListener。

程式碼:

package com.east.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 訊息的生產者(傳送者)
 * 
 * @author linhaiyun
 * @time 2017.12.12
 */
public class JMSProducer {
	// 預設連線使用者名稱
	private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
	// 預設連線密碼
	private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
	// 預設連線地址
	private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
	// 傳送的訊息數量
	private static final int SENDNUM = 50;

	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
		// 連線工廠
		ConnectionFactory factory;
		// 連線
		Connection connection = null;
		// 會話 接受或者傳送訊息的執行緒
		Session session;
		// 訊息的目的地
		Destination destination;
		// 訊息生產者
		MessageProducer messageProducer;
		// 例項化連線工廠
		factory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);

		try {
			// 通過連線工廠獲取連線
			connection = factory.createConnection();
			// 啟動連線
			connection.start();
			// 建立session
			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
			// 建立一個名稱為HelloHaige的訊息佇列
			destination = session.createQueue("HelloHaige");
			// 建立訊息生產者
			messageProducer = session.createProducer(destination);
			// 傳送訊息
			sendMessage(session, messageProducer);
			session.commit();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}

	/**
	 * 傳送訊息
	 * 
	 * @param session
	 * @param messageProducer
	 * @time 2017.12.12 12:10:25
	 */
	public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
		for (int i = 0; i < JMSProducer.SENDNUM; i++) {
			// 建立一條文字訊息
			TextMessage message = session.createTextMessage("ActiveMQ 傳送訊息" + i);
			System.out.println("傳送訊息:Activemq 傳送訊息" + i);
			// 通過訊息生產者發出訊息
			messageProducer.send(message);
		}
	}

}

package com.east.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 訊息的消費者(接受者)
 * 
 * @author linhaiyun
 * @time 2017.12.12
 */
public class JMSConsumer {
	private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;// 預設連線使用者名稱
	private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;// 預設連線密碼
	private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;// 預設連線地址

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		ConnectionFactory factory; // 連線工廠
		Connection connection; // 連線

		Session session; // 會話接受或者傳送訊息的執行緒
		Destination destination; // 訊息的目的地

		MessageConsumer messageConsumer;// 訊息的消費者

		// 例項化連線工廠
		factory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
		
		try {
			// 通過連線工廠獲取連線
			connection = factory.createConnection();
			// 啟動連線
			connection.start();
			// 建立session
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			// 建立一個連線HelloWorld的訊息佇列
			destination = session.createQueue("HelloHaige");
			// 建立訊息消費者
			messageConsumer = session.createConsumer(destination);
			
			while(true){
				TextMessage message = (TextMessage) messageConsumer.receive(100000);
				if(message !=null){
					System.out.println("收到的訊息:" + message.getText());
				}else{
					break;
				}
			}
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			System.out.println("例項化消費者失敗!");
			e.printStackTrace();
		}
		
	}

}

結果:

開啟本地的ActiveMQ 服務


執行ActiveMQ生產者,建立50個訊息,結果如圖


執行消費者進行訊息的接受消費