ActiveMQ學習--002--Topic消息例子程序
阿新 • • 發佈:2018-04-20
throw pack pre 地址 第一步 創建 在線 per 例子程序
一、非持久的Topic消息示例
註意 此種方式消費者只能接收到 消費者啟動之後,發送者發送的消息。
發送者
package com.lhy.mq.helloworld; import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; importjavax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class NoPersistenceTopicSender { public static void main(String[] args) throws Exception { //第一步:建立ConnectionFactory工廠對象。需要填入用戶名、密碼、連接地址,均使用默認即可,默認端口為"tcp://localhost:61616" ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "lhy","123456", //ActiveMQConnectionFactory.DEFAULT_USER, //ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616"); Connection connection= connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("NB-NB"); //隊列名稱 MessageProducer producer = session.createProducer(null);// // 第六步:可以使用MessageProducer的setDeliveryMode方法為其設置持久化特性和非持久化特性(DeliveryMode) //producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for (int i = 0; i < 3; i++) { TextMessage message = session.createTextMessage("我是消息內容 -333- "+i); producer.send(destination, message); System.err.println("生產者發送消息:"+message.getText()); } session.commit(); if(connection != null){ connection.close(); } } }
接收者
package com.lhy.mq.helloworld; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class NoPersitenceTopicReceiver { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "lhy","123456", "tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("NB-NB"); MessageConsumer consumer = session.createConsumer(destination); Message message = consumer.receive(); while(message != null){ TextMessage textMsg = (TextMessage)message; System.err.println("消費消息:"+textMsg.getText()); //接收下一個消息 message = consumer.receive(1000L); } //提交一下事務,否則不確認消息,消息不會出隊列 session.commit(); session.close(); connection.close(); } }
二、持久訂閱例子程序
發送者
package com.lhy.mq.helloworld; import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class PersistenceTopicSender { public static void main(String[] args) throws Exception { //第一步:建立ConnectionFactory工廠對象。需要填入用戶名、密碼、連接地址,均使用默認即可,默認端口為"tcp://localhost:61616" ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "lhy","123456", "tcp://127.0.0.1:61616"); Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Topic destination = session.createTopic("Persistence-Topic"); //隊列名稱 MessageProducer producer = session.createProducer(null);// //默認為持久訂閱,註意這個一定在start之前設置 producer.setDeliveryMode(DeliveryMode.PERSISTENT); connection.start(); for (int i = 0; i < 3; i++) { TextMessage message = session.createTextMessage("我是消息內容 -666- "+i); producer.send(destination, message); System.err.println("生產者發送-topic-消息:"+message.getText()); } session.commit(); if(connection != null){ connection.close(); } } }
消費者,可以有多個消費者
1, 消費者需要在Connection上設置消費者id,來識別消費者
2,需要創建TopicSubscriber 來訂閱
3,設置好之後再start 這個Connection
4,一定要先運行一次消費者,來向ActiveMQ註冊這個消費者,然後再運行發送消息,這樣無論消費者是否在線,都會接收到消息。否則只能接收到註冊之後的消息。
package com.lhy.mq.helloworld; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消費者需要先運行一次,向producer註冊一下 * @author dell * */ public class PersitenceTopicReceiver { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "lhy","123456", "tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); //設置消費者的id,向發送者先註冊一下,producer就知道誰在訂閱 connection.setClientID("client2"); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Topic destination = session.createTopic("Persistence-Topic"); TopicSubscriber consumer = session.createDurableSubscriber(destination, "T1");//創建一個持久訂閱 //最後start connection.start(); Message message = consumer.receive(); while(message != null){ TextMessage textMsg = (TextMessage)message; System.err.println("消費消息:"+textMsg.getText()); //接收下一個消息 message = consumer.receive(1000L); } //提交一下事務,否則不確認消息,消息不會出隊列 session.commit(); session.close(); connection.close(); } }
分別修改消費者的clientID為 client1、client2運行,相當於2個消費者。
管控臺:2個消費者,
ActiveMQ學習--002--Topic消息例子程序