1. 程式人生 > 實用技巧 >ActiveMQ訊息中介軟體簡單配置

ActiveMQ訊息中介軟體簡單配置

@目錄

訊息中介軟體

什麼是訊息中介軟體

概述

訊息中介軟體可以理解成就是一個服務軟體,儲存資訊的容器,比如生活中的快遞雲櫃.
我們把資料放到訊息中介軟體當中, 然後通知對應的服務進行獲取
訊息中介軟體是在訊息的傳輸過程中儲存資訊的容器

訊息中介軟體應用場景

  1. 使用訊息伺服器當做大的佇列使用, 先進先出, 來處理高併發寫入操作
  2. 使用訊息伺服器可以將業務系統的序列執行改為並行執行, 處理效率高, 更合理的榨取伺服器的效能.

同步與非同步技術

同步技術

dubbo是一中同步技術, 實時性高, controller呼叫service專案, 呼叫就執行,
如果service專案中的程式碼沒有執行完, controller裡面的程式碼一致等待結果.

非同步技術

mq訊息中介軟體技術(jms) 是一種非同步技術, 訊息傳送方, 將訊息傳送給訊息伺服器,
訊息伺服器未必立即處理.什麼時候去處理, 主要看訊息伺服器是否繁忙,
訊息進入伺服器後會進入佇列中, 先進先出.實時性不高.

JMS

概述:

jms的全稱叫做Java message service (Java訊息服務) jms是jdk底層定義的規範
各大廠商都是實現這個規範的技術

jms訊息伺服器同類型技術

ActiveMQ:是apache的一個比較老牌的訊息中介軟體, 它比較均衡, 既不是最安全的, 也不是最快的.
RabbitMQ:是阿里巴巴的一個訊息中介軟體, 更適合金融類業務, 它對資料的安全性比較高.能夠保證資料不丟失.
Kafka:Apache下的一個子專案。特點:高吞吐,在一臺普通的伺服器上既可以達到10W/s的吞吐速率;適合處理海量資料。

JMS中支援的訊息型別:

TextMessage: 一個字串物件
MapMessage:key-value
ObjectMessage:一個序列化的 Java 物件
BytesMessage:一個位元組的資料流
StreamMessage:Java 原始值的資料流

JMS中的兩種傳送模式

點對點模式

一個傳送方, 一個接收方. 也可以多個傳送方, 一個接收方, 主要是接收方必須是第一個.

訂閱釋出模式

一個傳送方, 多個接收方. 傳送方也可以是多個, 主要看接收方, 接收方必須是多個

ActiveMQ安裝

連結: https://pan.baidu.com/s/1B0ZW3_Z3xcamUCniNjd10Q 提取碼: avr2

  1. 將apache-activemq-5.12.0-bin.tar.gz 上傳至Linux伺服器
  2. 解壓此檔案
    tar zxvf apache-activemq-5.12.0-bin.tar.gz
  3. 為apache-activemq-5.12.0目錄賦權
    chmod 777 apache-activemq-5.12.0
  4. 進入apache-activemq-5.12.0\bin目錄賦與執行許可權
    cd /usr/local/apache-activemq-5.12.0/bin
    chmod 755 activemq
  5. 啟動
    ./activemq start
  6. 在瀏覽器當中輸入http://192.168.0.106:8161/ ( ip:8161)
  7. 進入管理頁面
  8. 使用者名稱和密碼都是 admin

說明

  1. Number Of Pending Messages :等待消費的訊息 這個是當前未出佇列的數量。
  2. Number Of Consumers :消費者 這個是消費者端的消費者數量
  3. Messages Enqueued :進入佇列的訊息 進入佇列的總數量,包括出佇列的。
  4. Messages Dequeued :出了佇列的訊息 可以理解為是消費這消費掉的數量。

訊息伺服器小例子

  1. 建立普通maven Jar工程
  2. 引入pom依賴
    <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.13.4</version>
        </dependency>
    </dependencies>

點對點模式Queue

建立QueueProducer

public class QueueProducer {
    public static void main(String[] args) throws Exception{
        //1.建立連線工廠
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
        //2.獲取連線
        Connection connection = connectionFactory.createConnection();
        //3.啟動連線
        connection.start();
        //4.獲取session  (引數1:是否啟動事務,引數2:訊息確認模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.建立佇列物件, 指定傳送的佇列名稱, 佇列名稱可以隨意起名, 但是傳送到哪裡, 就要從哪裡去接收
        Queue queue = session.createQueue("test-queue");
        //6.建立訊息生產者
        MessageProducer producer = session.createProducer(queue);
        //7.建立訊息
        TextMessage textMessage = session.createTextMessage("Hello ActiveMQ");
        //8.傳送訊息
        producer.send(textMessage);
        //9.關閉資源
        producer.close();
        session.close();
        connection.close();
    }
}

建立QueueConsumer

public class QueueConsumer {
    public static void main(String[] args) throws Exception{
        //1.建立連線工廠
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
        //2.獲取連線
        Connection connection = connectionFactory.createConnection();
        //3.啟動連線
        connection.start();
        //4.獲取session  (引數1:是否啟動事務,引數2:訊息確認模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.建立佇列物件
        Queue queue = session.createQueue("test-queue");
        //6.建立訊息消費
        MessageConsumer consumer = session.createConsumer(queue);
        //7.監聽訊息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage=(TextMessage)message;
                try {
                    System.out.println("接收到訊息:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //8.等待鍵盤輸入
        System.in.read();
        //9.關閉資源
        consumer.close();
        session.close();
        connection.close();

    }
}

執行QueueProducer後執行QueueConsumer

訂閱釋出模式Topic

TopicConsumer1

public class TopicConsumer1 {
    public static void main(String[] args) throws Exception {
//1.建立連線工廠
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
        //2.獲取連線
        Connection connection = connectionFactory.createConnection();
        //3.啟動連線
        connection.start();
        //4.獲取session  (引數1:是否啟動事務,引數2:訊息確認模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.建立主題物件
        Topic topic = session.createTopic("test-topic");
        //6.建立訊息消費
        MessageConsumer consumer = session.createConsumer(topic);
        //7.監聽訊息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage=(TextMessage)message;
                try {
                    System.out.println("接收到訊息:"+textMessage.getText());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        //8.等待鍵盤輸入
        System.in.read();
        //9.關閉資源
        consumer.close();
        session.close();
        connection.close();

    }
}

TopicConsumer2

public class TopicConsumer2 {
    public static void main(String[] args) throws Exception {
        //1.建立連線工廠
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
        //2.獲取連線
        Connection connection = connectionFactory.createConnection();
        //3.啟動連線
        connection.start();
        //4.獲取session  (引數1:是否啟動事務,引數2:訊息確認模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.建立主題物件
        Topic topic = session.createTopic("test-topic");
        //6.建立訊息消費
        MessageConsumer consumer = session.createConsumer(topic);

        //7.監聽訊息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage=(TextMessage)message;
                try {
                    System.out.println("接收到訊息:"+textMessage.getText());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        //8.等待鍵盤輸入
        System.in.read();
        //9.關閉資源
        consumer.close();
        session.close();
        connection.close();
    }
}

TopicProducer

public class TopicProducer {
    public static void main(String[] args) throws Exception {
        //1.建立連線工廠
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
        //2.獲取連線
        Connection connection = connectionFactory.createConnection();
        //3.啟動連線
        connection.start();
        //4.獲取session  (引數1:是否啟動事務,引數2:訊息確認模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.建立主題物件
        Topic topic = session.createTopic("test-topic");
        //6.建立訊息生產者
        MessageProducer producer = session.createProducer(topic);
        //7.建立訊息
        TextMessage textMessage = session.createTextMessage("Hello Topic ActiveMQ");
        //8.傳送訊息
        producer.send(textMessage);
        //9.關閉資源
        producer.close();
        session.close();
        connection.close();

    }
}