1. 程式人生 > >[ActiveMQ實戰]基於JMS的pub/sub傳播機制

[ActiveMQ實戰]基於JMS的pub/sub傳播機制

     上篇部落格介紹了activemq基於JMS的點對點訊息傳播機制的實現,這裡介紹另一個釋出/訂閱方式實現。

一、釋出訂閱模型

    就像訂閱報紙。我們可以選擇一份或者多份報紙,比如:北京日報、人民日報。這些報紙就相當於釋出訂閱模型中的topic。如果有很多人訂閱了相同的報紙,那我們就在同一個topic中註冊,對於報紙發行方,它就和所有的訂閱者形成了一對多的關係。如下:

 

二、釋出者的實現

package com.tgb.activemqTopic;

import java.awt.font.TextMeasurer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 訊息釋出者
 * @author xx
 *
 */
public class JMSProducer {
	//預設連線使用者名稱
	private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
	//預設連線密碼
	private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
	//預設的連線地址
	private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
	//傳送的訊息數量
	private static final int SENNUM = 10;
	
	public static void main(String[] args){
		ConnectionFactory factory ; //連線工廠
		Connection connection = null ; //連線
		Session session ; //會話,接收或者傳送訊息的執行緒
		Destination destination; //訊息的目的地
		MessageProducer messageProducer; //訊息生產者
		//例項化連線工廠
		factory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);
		//通過連線工廠獲取connection
		try {
			connection = factory.createConnection();
			connection.start(); //啟動連線
			//建立session
			session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			//建立訊息佇列
//			destination = session.createQueue("FirstQueue");
			
			//建立主題
			destination = session.createTopic("topic1");
			//建立訊息釋出者
			messageProducer = session.createProducer(destination);
			//傳送訊息
			sendMessage(session, messageProducer);
			session.commit();
		} catch (JMSException e) {
			e.printStackTrace();
		}finally{
			if (connection != null) {
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}
	
	/**
	 * 傳送訊息
	 * @param session
	 * @param mp
	 * @throws JMSException 
	 */
	public static void sendMessage(Session session, MessageProducer mp) throws JMSException{
		for(int i = 0;i<JMSProducer.SENNUM;i++){
			TextMessage message = session.createTextMessage("ActiveMq 釋出的訊息" + i);
			System.out.println("釋出訊息:" + "ActiveMq 釋出的訊息" + i);
			mp.send(message);
		}
	}
}

  這裡和點對點的不同就是不再是建立訊息佇列,而是建立topic。而且也不是訊息生產者而是釋出者。

三、訂閱者

   訂閱者不止一個,它們都監聽同一個topic。這裡為了試驗效果至少建兩個訂閱者,我這裡只寫一個,另一個大家照著寫就行。

1.訂閱者

package com.tgb.activemqTopic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 訊息的訂閱者一
 * @author xx
 */
public class JMSConsumer1 {

	//預設連線使用者名稱
	private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
	//預設連線密碼
	private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
	//預設的連線地址
	private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
	
	public static void main(String[] args) {
		ConnectionFactory factory ; //連線工廠
		Connection connection = null ; //連線
		Session session ; //會話,接收或者傳送訊息的執行緒
		Destination destination; //訊息的目的地
		MessageConsumer messageConsumer; //訊息消費者
		//例項化連線工廠
		factory = new ActiveMQConnectionFactory(JMSConsumer1.USERNAME, JMSConsumer1.PASSWORD, JMSConsumer1.BROKEURL);
		//通過連線工廠獲取connection
		try {
			connection = factory.createConnection();
			connection.start(); //啟動連線
			//建立session
			session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
			//建立連線訊息佇列,訊息到達的目的地
//			destination = session.createQueue("FirstQueue");
			destination = session.createTopic("topic1");
			//建立消費者
			messageConsumer = session.createConsumer(destination);
			//註冊訊息監聽
			messageConsumer.setMessageListener(new Listener1());
		}catch(Exception e){
			e.printStackTrace();
		}
	}
}


2.監聽類

package com.tgb.activemqTopic;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 訂閱者1的監聽
 * 訊息監聽類
 * @author xx
 */
public class Listener1 implements MessageListener {

	@Override
	public void onMessage(Message message) {
		try {
			System.out.println("訂閱者一收到的訊息:" + ((TextMessage)message).getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}

 四、測試

  像點對點的要先啟動生產者,生產者要生產訊息。而釋出訂閱模型,要先啟動訂閱者,訂閱者先訂閱topic,再發布訊息。

1.啟動訂閱者,這裡我啟動兩個,可以看到在topic中註冊了兩個消費者


2.啟動釋出者,如下

 釋出者釋出了10條資料,但是出隊的有20條,因為有兩個訂閱者。

五、總結

    釋出者向一個特定的訊息主題釋出訊息,0或者多個訂閱者可能接收到來自特定訊息主題的訊息感興趣。其中釋出者和訂閱者不知道對方的存在。