1. 程式人生 > >電商專案day13-1(jms入門)

電商專案day13-1(jms入門)

一.點對點模式

理解:

         點對點的模式主要建立在一個佇列上面,當連線一個列隊的時候,傳送端不需要知道接收端是否正在接收,可以直接向 ActiveMQ 傳送訊息,傳送的訊息,將會先進入佇列中,如果有接收端在監聽,則會發向接收端,如果沒有接收端接收,則會儲存在 activemq 伺服器,
直到接收端接收訊息,點對點的訊息模式可以有多個傳送端,多個接收端,但是一條訊息,只會被一個接收端給接收到,哪個接收端先連上 ActiveMQ,則會先接收到,而後來的接收端則接收不到那條訊息。

1.建立工程

2.匯入activemq-client 包

public class QueueProducer {
    public static void main(String[] args) throws JMSException {
        //1.建立連線工廠
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
        //2.獲取連線
        Connection connection = connectionFactory.createConnection();
        //3.啟動連線
        connection.start();
        //4.獲取session(引數一:是否啟動事物  引數二:訊息確認)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        /*
            AUTO_ACKNOWLEDGE = 1 自動確認
             CLIENT_ACKNOWLEDGE = 2 客戶端手動確認
             DUPS_OK_ACKNOWLEDGE = 3 自動批量確認
             SESSION_TRANSACTED = 0 事務提交併確認
         */
        //5.建立訊息佇列
        Queue queue = session.createQueue("test-queue");
        //6.建立訊息生產者
        MessageProducer producer = session.createProducer(queue);
        //7.建立訊息
        TextMessage textMessage = session.createTextMessage("歡迎來到ActiveMQ的世界");
        //8.傳送訊息
        producer.send(textMessage);
        //9.關閉資源
        producer.close();
        session.close();
        connection.close();
    }
}

建立消費者:

public class QueueConsumer {
    public static void main(String[] args) throws JMSException, IOException {
        //1.建立連線工廠
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
        //2.獲取連線
        Connection connection = connectionFactory.createConnection();
        //3.啟動連線
        connection.start();
        //4.獲取session(引數一:是否啟動事物  引數二:訊息確認)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        /*
            AUTO_ACKNOWLEDGE = 1 自動確認
             CLIENT_ACKNOWLEDGE = 2 客戶端手動確認
             DUPS_OK_ACKNOWLEDGE = 3 自動批量確認
             SESSION_TRANSACTED = 0 事務提交併確認
         */
        //5.建立訊息佇列
        Queue queue = session.createQueue("test-queue");
       //6.建立消費者
        MessageConsumer consumer = session.createConsumer(queue);
        //7.監聽訊息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                //進行強轉
                TextMessage textMessage = (TextMessage)message;
                try {
                    System.out.println("接受到訊息"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //8.等待鍵盤輸入
        System.in.read();
        //9.關閉資源
        consumer.close();
        session.close();
        connection.close();
    }
}

 

如果我們啟動兩個消費者,則只有一個消費者能輸出訊息

 

二.釋出/訂閱模式

public class TopicProducer {
    public static void main(String[] args) throws JMSException {
        //1.建立連線工廠
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
        //2.獲取連線
        Connection connection = connectionFactory.createConnection();
        //3.啟動連線
        connection.start();
        //4.獲取session(引數一:是否啟動事物  引數二:訊息確認)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        /*
            AUTO_ACKNOWLEDGE = 1 自動確認
             CLIENT_ACKNOWLEDGE = 2 客戶端手動確認
             DUPS_OK_ACKNOWLEDGE = 3 自動批量確認
             SESSION_TRANSACTED = 0 事務提交併確認
         */
        //5.建立主題物件
        Topic topic = session.createTopic("test-topic");
        //6.建立訊息生產者
        MessageProducer producer = session.createProducer(topic);
        //7.建立訊息
        TextMessage textMessage = session.createTextMessage("歡迎來到ActiveMQ的世界之topic");
        //8.傳送訊息
        producer.send(textMessage);
        //9.關閉資源
        producer.close();
        session.close();
        connection.close();
    }
}

建立消費者

public class TopicConsumer {
    public static void main(String[] args) throws JMSException, IOException {
        //1.建立連線工廠
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
        //2.獲取連線
        Connection connection = connectionFactory.createConnection();
        //3.啟動連線
        connection.start();
        //4.獲取session(引數一:是否啟動事物  引數二:訊息確認)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        /*
            AUTO_ACKNOWLEDGE = 1 自動確認
             CLIENT_ACKNOWLEDGE = 2 客戶端手動確認
             DUPS_OK_ACKNOWLEDGE = 3 自動批量確認
             SESSION_TRANSACTED = 0 事務提交併確認
         */
        //5.建立主題物件
        Topic topic = session.createTopic("test-topic");
        //6.建立消費者
        MessageConsumer consumer = session.createConsumer(topic);
        //7.監聽訊息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                //進行強轉
                TextMessage textMessage = (TextMessage)message;
                try {
                    System.out.println("接受到訊息"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //8.一直保持監聽狀態
        while(true){}
        //9.關閉資源
        consumer.close();
        session.close();
        connection.close();
    }
}

當我們有兩個消費者的時候,這個兩個消費者都會有訊息產生,這就是原生的釋出/訂閱模式

主要被監聽的topic訊息才能被消費