[ActiveMQ實戰]基於JMS的pub/sub傳播機制
阿新 • • 發佈:2019-01-08
上篇部落格介紹了activemq基於JMS的點對點訊息傳播機制的實現,這裡介紹另一個釋出/訂閱方式實現。
一、釋出訂閱模型
就像訂閱報紙。我們可以選擇一份或者多份報紙,比如:北京日報、人民日報。這些報紙就相當於釋出訂閱模型中的topic。如果有很多人訂閱了相同的報紙,那我們就在同一個topic中註冊,對於報紙發行方,它就和所有的訂閱者形成了一對多的關係。如下:
二、釋出者的實現
package com.tgb.activemqTopic; import java.awt.font.TextMeasurer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 訊息釋出者 * @author xx * */ public class JMSProducer { //預設連線使用者名稱 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //預設連線密碼 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //預設的連線地址 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; //傳送的訊息數量 private static final int SENNUM = 10; public static void main(String[] args){ ConnectionFactory factory ; //連線工廠 Connection connection = null ; //連線 Session session ; //會話,接收或者傳送訊息的執行緒 Destination destination; //訊息的目的地 MessageProducer messageProducer; //訊息生產者 //例項化連線工廠 factory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL); //通過連線工廠獲取connection try { connection = factory.createConnection(); connection.start(); //啟動連線 //建立session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //建立訊息佇列 // destination = session.createQueue("FirstQueue"); //建立主題 destination = session.createTopic("topic1"); //建立訊息釋出者 messageProducer = session.createProducer(destination); //傳送訊息 sendMessage(session, messageProducer); session.commit(); } catch (JMSException e) { e.printStackTrace(); }finally{ if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } /** * 傳送訊息 * @param session * @param mp * @throws JMSException */ public static void sendMessage(Session session, MessageProducer mp) throws JMSException{ for(int i = 0;i<JMSProducer.SENNUM;i++){ TextMessage message = session.createTextMessage("ActiveMq 釋出的訊息" + i); System.out.println("釋出訊息:" + "ActiveMq 釋出的訊息" + i); mp.send(message); } } }
這裡和點對點的不同就是不再是建立訊息佇列,而是建立topic。而且也不是訊息生產者而是釋出者。
三、訂閱者
訂閱者不止一個,它們都監聽同一個topic。這裡為了試驗效果至少建兩個訂閱者,我這裡只寫一個,另一個大家照著寫就行。
1.訂閱者
package com.tgb.activemqTopic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 訊息的訂閱者一 * @author xx */ public class JMSConsumer1 { //預設連線使用者名稱 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //預設連線密碼 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //預設的連線地址 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) { ConnectionFactory factory ; //連線工廠 Connection connection = null ; //連線 Session session ; //會話,接收或者傳送訊息的執行緒 Destination destination; //訊息的目的地 MessageConsumer messageConsumer; //訊息消費者 //例項化連線工廠 factory = new ActiveMQConnectionFactory(JMSConsumer1.USERNAME, JMSConsumer1.PASSWORD, JMSConsumer1.BROKEURL); //通過連線工廠獲取connection try { connection = factory.createConnection(); connection.start(); //啟動連線 //建立session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //建立連線訊息佇列,訊息到達的目的地 // destination = session.createQueue("FirstQueue"); destination = session.createTopic("topic1"); //建立消費者 messageConsumer = session.createConsumer(destination); //註冊訊息監聽 messageConsumer.setMessageListener(new Listener1()); }catch(Exception e){ e.printStackTrace(); } } }
2.監聽類
package com.tgb.activemqTopic; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * 訂閱者1的監聽 * 訊息監聽類 * @author xx */ public class Listener1 implements MessageListener { @Override public void onMessage(Message message) { try { System.out.println("訂閱者一收到的訊息:" + ((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }
四、測試
像點對點的要先啟動生產者,生產者要生產訊息。而釋出訂閱模型,要先啟動訂閱者,訂閱者先訂閱topic,再發布訊息。
1.啟動訂閱者,這裡我啟動兩個,可以看到在topic中註冊了兩個消費者
2.啟動釋出者,如下
釋出者釋出了10條資料,但是出隊的有20條,因為有兩個訂閱者。
五、總結
釋出者向一個特定的訊息主題釋出訊息,0或者多個訂閱者可能接收到來自特定訊息主題的訊息感興趣。其中釋出者和訂閱者不知道對方的存在。