1. 程式人生 > >ActiveMQ學習--002--Topic消息例子程序

ActiveMQ學習--002--Topic消息例子程序

throw pack pre 地址 第一步 創建 在線 per 例子程序

一、非持久的Topic消息示例

註意 此種方式消費者只能接收到 消費者啟動之後,發送者發送的消息。

發送者

package com.lhy.mq.helloworld;

import java.util.concurrent.TimeUnit;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import
javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class NoPersistenceTopicSender { public static void main(String[] args) throws Exception { //第一步:建立ConnectionFactory工廠對象。需要填入用戶名、密碼、連接地址,均使用默認即可,默認端口為"tcp:
//localhost:61616" ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "lhy","123456", //ActiveMQConnectionFactory.DEFAULT_USER, //ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616"); Connection connection
= connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("NB-NB"); //隊列名稱 MessageProducer producer = session.createProducer(null);// // 第六步:可以使用MessageProducer的setDeliveryMode方法為其設置持久化特性和非持久化特性(DeliveryMode) //producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for (int i = 0; i < 3; i++) { TextMessage message = session.createTextMessage("我是消息內容 -333- "+i); producer.send(destination, message); System.err.println("生產者發送消息:"+message.getText()); } session.commit(); if(connection != null){ connection.close(); } } }

接收者

package com.lhy.mq.helloworld;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class NoPersitenceTopicReceiver {

    public static void main(String[] args) throws Exception {
                ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                        "lhy","123456",
                        "tcp://localhost:61616");
                Connection connection = connectionFactory.createConnection();
                connection.start();
                final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                Destination destination = session.createTopic("NB-NB");
                
                MessageConsumer consumer = session.createConsumer(destination);
                
                Message message = consumer.receive();
                while(message != null){
                    TextMessage textMsg = (TextMessage)message;
                    System.err.println("消費消息:"+textMsg.getText());
                    //接收下一個消息
                    message = consumer.receive(1000L);
                }
                
                //提交一下事務,否則不確認消息,消息不會出隊列
                session.commit();
                session.close();
                connection.close();
    }
}

二、持久訂閱例子程序

發送者

package com.lhy.mq.helloworld;

import java.util.concurrent.TimeUnit;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class PersistenceTopicSender {
    
    public static void main(String[] args) throws Exception {
        
        //第一步:建立ConnectionFactory工廠對象。需要填入用戶名、密碼、連接地址,均使用默認即可,默認端口為"tcp://localhost:61616"
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "lhy","123456",
                "tcp://127.0.0.1:61616");
        Connection connection = connectionFactory.createConnection();
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        Topic destination = session.createTopic("Persistence-Topic"); //隊列名稱
        MessageProducer producer = session.createProducer(null);//
        
        //默認為持久訂閱,註意這個一定在start之前設置
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        connection.start();
        
        for (int i = 0; i < 3; i++) {
            TextMessage message = session.createTextMessage("我是消息內容  -666- "+i);
            producer.send(destination, message);
            
            System.err.println("生產者發送-topic-消息:"+message.getText()); 
        }
        session.commit();
    
        if(connection != null){
            connection.close();
        }
    }

}

消費者,可以有多個消費者

1, 消費者需要在Connection上設置消費者id,來識別消費者

2,需要創建TopicSubscriber 來訂閱

3,設置好之後再start 這個Connection

4,一定要先運行一次消費者,來向ActiveMQ註冊這個消費者,然後再運行發送消息,這樣無論消費者是否在線,都會接收到消息。否則只能接收到註冊之後的消息。

package com.lhy.mq.helloworld;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消費者需要先運行一次,向producer註冊一下
 * @author dell
 *
 */
public class PersitenceTopicReceiver {

    public static void main(String[] args) throws Exception {
                ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                        "lhy","123456",
                        "tcp://localhost:61616");
                Connection connection = connectionFactory.createConnection();
                //設置消費者的id,向發送者先註冊一下,producer就知道誰在訂閱
                connection.setClientID("client2");
                
                final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                Topic destination = session.createTopic("Persistence-Topic");
                TopicSubscriber consumer = session.createDurableSubscriber(destination, "T1");//創建一個持久訂閱
                //最後start
                connection.start();
                
                Message message = consumer.receive();
                while(message != null){
                    TextMessage textMsg = (TextMessage)message;
                    System.err.println("消費消息:"+textMsg.getText());
                    //接收下一個消息
                    message = consumer.receive(1000L);
                }
                
                //提交一下事務,否則不確認消息,消息不會出隊列
                session.commit();
                session.close();
                connection.close();
    }
}

分別修改消費者的clientID為 client1、client2運行,相當於2個消費者。

管控臺:2個消費者,

技術分享圖片

技術分享圖片

技術分享圖片

ActiveMQ學習--002--Topic消息例子程序