1. 程式人生 > >訊息中介軟體入門「一」:初識訊息中介軟體【ActiveMQ】

訊息中介軟體入門「一」:初識訊息中介軟體【ActiveMQ】

訊息中介軟體入門「一」:初識訊息中介軟體【ActiveMQ】

背景介紹

訊息中介軟體相當於程序間通訊的信託,可以降低複雜系統中各個模組間的耦合度。對於信託:你只需要把Message給我,就沒你的事兒了。我負責給你送到目的地,就不需要你必須實時的守著,等待所有通訊細節的完成。就算你突然掛了也沒事,Message由信託給你存著,直到送到目的地才會消失。也就是說通訊細節都由訊息中介軟體完成,生產者只需要把訊息給中介軟體即可,而消費者只需要繫結好地址。有了訊息就會主動推送過來,也不需要消費者主邏輯執行緒守著。而webservice則是需要客戶端與服務端都保持線上,客戶端發起請求後必須等待服務端完成處理並將資料返回。由此可見中介軟體可以延遲處理也能夠實時的處理,十分可靠。而webservice要求有較高的實時性,通訊過程不允許出現宕機情況。也無法延時處理。

常用的訊息中介軟體有:ActiveMQ,RabbitMQ,RocketMQ,kafka等。而ActiveMQ是一款完全遵循jms規範的訊息中介軟體,支援多種協議如Stomp等,如圖1所示。

圖1 ActiveMQ的運用

第一步:安裝ActiveMQ

  1. 訪問ActiveMQ官方下載安裝對應平臺的訊息安裝包,點我進入下載地址
    圖2 下載官網
    • 解壓到自己電腦上,進入bin目錄下,在cmd中輸入activemq start啟動如圖3。
      圖3 啟動activemq
    • 執行bin/win64/activemq.bat 也可以快速啟動

根據dos視窗內容瞭解activemq的啟動資訊

在啟動好actviemq後,在dos視窗中可以看到輸出了很多內容,那麼具體代表什麼含義呢:
圖4 啟動列印內容.png

  • 修改對應協議的埠,進入$(activemq_home)/config/,開啟actviemq.xml找到如下程式碼進行修改:
    Tips:在網頁訪問時將地址0.0.0.0換成主機地址,如127.0.0.1
       -->
        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> </transportConnectors>
  • 在dos中複製web管理地,將0.0.0.0換成127.0.0.1,在瀏覽器中開啟如圖 5:
    圖5 web主頁.png

  • 點選MangeActiveMQ broker,會彈出視窗輸入賬號密碼,預設為:admin/admin。管理介面如圖 6:
    圖6 管理介面.png

  • 點選Queues,管理佇列模式的訊息列表,如圖7:
    圖7 佇列模式.png

    Tips:佇列模式下,預設配置下,所有訊息一旦被儲存到中介軟體中,只要沒有被消費者取出,就會一直存在,即使是actviemq被關閉。並且每一條訊息只會存在一個消費者中,也就是說佇列模式下:所有訊息是一對一,也就是所謂的點對點模式。一條訊息一旦被一個消費者接收到後,actviemq就刪除該訊息,其他消費者無法接繼續收該訊息。佇列模式就類似於搶購小米手機,數量有限,手慢無。

  • 點選Topics,進入主題模式下的管理介面如圖8所示:
    圖8 主題模式.png

    Tips:主題模式下:消費者必須在生產者釋出訊息前,訂閱訊息的地址(destination),否則講接收不到任何訊息。並且訂閱了destination的所有消費者都會收到生產者生產的所有訊息。類似於廣播的概念,在廣播廳內所有有耳朵的人都能接收到訊息。

第二步:使用actviemq

  • 專案目錄介紹,如圖9:
    圖9 專案目錄
    特別說明:普通的java專案就可以了,不一定非要maven,因為依賴的jar包只有一個,而且在activemq的目錄下面,如圖10 :
    圖10 依賴jar包
    如果懶得複製,可以在maven中加入如下依賴:
 <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.5</version>
        </dependency>
    </dependencies>

Tips:version最好與下載的actviemq版本一致

1.使用佇列模式

  • 生產者程式碼:
package queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class QueueProducer {
    private static final String url="tcp://localhost:61616";
    private static final String queueName="HelloActiveMQ";

    public static void main(String[] args) throws JMSException, InterruptedException {
        //建立連線工廠
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(url);
        //建立連線
        Connection connection=connectionFactory.createConnection();
        //連線啟動
        connection.start();
        //通過連接獲得session
        Session session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        //建立目標
        Destination destination=session.createQueue(queueName);
        //建立一個生產者
        MessageProducer producer=session.createProducer(destination);
        for (int i = 0; i < 100; i++) {
            //建立訊息
            TextMessage message=session.createTextMessage(String.format("hello Mq:%d", i));
            //釋出訊息
            producer.send(message);
            System.out.println("傳送訊息:"+message.getText());
            Thread.sleep(500);
        }
        //關閉連線
        connection.close();

    }
}
  • 消費者程式碼:
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class MqConsumer {
    private static final String queueName="HelloActiveMQ";
    private static final String url="tcp://localhost:61616";

    public static void main(String[] args) throws JMSException {
        //獲取連線工廠
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(url);
        //獲取連線
        Connection connectionn=connectionFactory.createConnection();
        //啟動連線
        connectionn.start();
        //建立session
        Session session=connectionn.createSession(false,Session.AUTO_ACKNOWLEDGE);
        //繫結地址
        Destination destination=session.createQueue(queueName);
        //建立一個消費者物件
        MessageConsumer consumer=session.createConsumer(destination);
        //建立監聽器
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage= (TextMessage) message;
                try {
                    System.out.println("接收到訊息:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

接收訊息有兩種方式,一種是同步方式,一種是非同步方式。本例使用非同步方式。使用同步方式程式碼如下:

Message message = consumer.receive();
String text = ((TextMessage) message).getText(); 
System.out.println("接收到訊息:" + text); 

同步方式執行結束則不會再接收訊息,如果要持續接收訊息,需要寫在while死迴圈中;非同步方式似乎是為了解決這種問題,直接通過建立監聽器的方式啟動一個執行緒,會持續接收新訊息。

Tips:可以看出無論消費者還是生產者,前面都要經過:

  1. 通過ActiveMQConnectionFactory獲取連線工廠bean
  2. 建立連線
  3. 啟動該連線
  4. 通過連線後獲取Session
  5. 通過session根據方法獲取佇列的destination
  6. 再通過session獲取消費者consumer或者producer

其前面的過程如同jdbc連線資料庫一樣,而後面根據消費者和生產者的不同角色定位,去傳送或者接收訊息。


輸出效果:

1. 啟動一個生產者一個消費者

接收到訊息:hello Mq:0
…………
接收到訊息:hello Mq:98
接收到訊息:hello Mq:99
100條訊息全部接收

2. 啟動一個生產者兩個消費者

consumer 1:
……
接收到訊息:hello Mq:9
接收到訊息:hello Mq:11
接收到訊息:hello Mq:13
接收到訊息:hello Mq:15
接收到訊息:hello Mq:17
接收到訊息:hello Mq:19
接收到訊息:hello Mq:21
接收到訊息:hello Mq:23
接收到訊息:hello Mq:25
consumer 2:
……
接收到訊息:hello Mq:10
接收到訊息:hello Mq:12
接收到訊息:hello Mq:14
接收到訊息:hello Mq:16
接收到訊息:hello Mq:18
接收到訊息:hello Mq:20
接收到訊息:hello Mq:22
接收到訊息:hello Mq:24
接收到訊息:hello Mq:26

可以看出佇列模式下確實為一對一,點對點,消費者收到的每一條訊息都彼此不相同,一個為奇數條,一個為偶數條。並且佇列模式下,即時中途關閉actviemq,消費者沒取完的訊息也會被儲存起來,在下次執行時消費者可以接著接收訊息。

2. 使用主題模式

  • 生產者程式碼
package topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicProducer {
    private static final String url="tcp://localhost:61616";
    private static final String queueName="queue";

    public static void main(String[] args) throws JMSException, InterruptedException {
        //建立連線工廠
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(url);
        //建立連線
        Connection connection=connectionFactory.createConnection();
        //連線啟動
        connection.start();
        //通過連接獲得session
        Session session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        //建立目標
        Destination destination=session.createTopic(queueName);
        //建立一個生產者
        MessageProducer producer=session.createProducer(destination);
        for (int i = 0; i < 100; i++) {
            //建立訊息
            TextMessage message=session.createTextMessage(String.format("hello Mq:%d", i));
            //釋出訊息
            producer.send(message);
            System.out.println("傳送訊息:"+message.getText());
            Thread.sleep(500);
        }
        //關閉連線
        connection.close();

    }
}

可以看出於佇列模式唯一區別就是建立目標變成了Topic:

//建立目標
Destination destination=session.createTopic(queueName);
  • 消費者程式碼也是一樣,將建立目標destination變成Topic:
//建立目標
Destination destination=session.createTopic(queueName);

消費者所有程式碼:

package topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicConsumer {
    private static final String queueName = "queue";
    private static final String url = "tcp://localhost:61616";

    public static void main(String[] args) throws JMSException {
        //獲取連線工廠
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        //獲取連線
        Connection connectionn = connectionFactory.createConnection();
        //啟動連線
        connectionn.start();
        //建立session
        Session session = connectionn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //繫結地址
        Destination destination = session.createTopic(queueName);
        //建立一個消費者物件
        MessageConsumer consumer = session.createConsumer(destination);
        //建立監聽器
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    Thread.sleep(500);
                    System.out.println("接收到訊息:" + textMessage.getText());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

    }
}

由於主題模式中,生產者所有訊息,在訂閱了該主題的所有消費者中都能收到。因此直接啟動一個生產者和兩個消費者,這裡啟動時候必須先啟動消費者,不然會丟失部分訊息。

  • 輸出結果:
consumer1:
接收到訊息:hello Mq:0
接收到訊息:hello Mq:1
接收到訊息:hello Mq:2
接收到訊息:hello Mq:3
接收到訊息:hello Mq:4
接收到訊息:hello Mq:5
接收到訊息:hello Mq:6
接收到訊息:hello Mq:7
接收到訊息:hello Mq:8
接收到訊息:hello Mq:9
接收到訊息:hello Mq:10
consumer2:
接收到訊息:hello Mq:0
接收到訊息:hello Mq:1
接收到訊息:hello Mq:2
接收到訊息:hello Mq:3
接收到訊息:hello Mq:4
接收到訊息:hello Mq:5
接收到訊息:hello Mq:6
接收到訊息:hello Mq:7
接收到訊息:hello Mq:8
接收到訊息:hello Mq:9
接收到訊息:hello Mq:10

可以看出所有消費者在同一時間段內接收到的訊息都是一樣的,並且在訂閱前的訊息會丟失。進入Topic管理頁面中可以檢視丟失的訊息,以及推送的訊息。如圖10:
圖10 topic管理頁面
圖中丟失的訊息數量=生產訊息數量*消費者數量-消費出去的訊息數量。

因此由於主題模式的這種特性,消費者一定要在生產者生產訊息前訂閱好destination

activemq支援的訊息(Message)資料型別

Activemq支援的Message如下:

  1. TextMessage 文字訊息:攜帶一個java.lang.String作為有效資料(負載)的訊息,可用於字串型別的資訊交換
  2. ObjectMessage 物件訊息:攜帶一個可以序列化的Java物件作為有效負載的訊息,可用於Java物件型別的資訊交換;
  3. MapMessage 對映訊息:攜帶一組鍵值對的資料作為有效負載的訊息,有效資料值必須是Java原始資料型別(或者它們的包裝類)及String。即:byte , short , int , long , float , double , char , boolean , String
  4. BytesMessage 位元組訊息 :攜帶一組原始資料型別的位元組流作為有效負載的訊息;
  5. StreamMessage 流訊息:攜帶一個原始資料型別流作為有效負載的訊息,它保持了寫入流時的資料型別,寫入什麼型別,則讀取也需要是相同的型別;

可以看出最常用的應該還是ObjectMessage型別,TextMessage只是簡單的字串傳輸,不適用於複雜的資料互動。而MapMessage只支援基本資料型別和String。BytesMessage則偏向於底層。StreamMessage任然還是支援的資料型別有限制。

如何使用java物件傳輸訊息

使用java物件傳輸訊息官網給出了兩種方式:

1.設定信任物件的全類名。這種方式需要連線時候設定好全類名,並且actviemq也需要配置該全類名,並且類路徑必須保持一致。也就是說新增一個信任的物件,就必須在actviemq的配置檔案中修改一下,同時java程式碼也要修改一下。加大了中介軟體與java程式碼的耦合度。目前按照官方給出的教程還沒有成功過,主要是找不到官網描述的配置檔案,不知道是不是actviemq版本的原因==!。
2.設定信任所有的物件,這種方式只需要在java程式碼稍作修改即可,並且一旦配置好,所有的物件都可以傳輸,無聊是自定義的pojo還是java集合類都可以使用。

具體步驟【信任所有package】:

1. 在生產者和消費者連線公共部分connectionFactory呼叫setTrustAllPackages設定為true

ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(url);
((ActiveMQConnectionFactory) connectionFactory).setTrustAllPackages(true);

2. 生產者生產訊息使用ObjectMessage傳出訊息,程式碼如下:

 //建立一個生產者
        MessageProducer producer=session.createProducer(destination);
        for (int i = 0; i < 100; i++) {
            //建立訊息
            User user=new User();
            user.setAge(i);
            user.setName((i%2==0? "Mr":"Miss")+String.valueOf((char)(int)(Math.random()*26+'A'))+String.valueOf((char)(int)(Math.random()*26+'a')));
            user.setSex(i%2==0? "男":"女");
            user.setSalary((int)(Math.random()*10000));
            user.setNumber(UUID.randomUUID().toString());
            ArrayList<User> users=new ArrayList<User>();
            HashMap<String,User> map=new HashMap<String, User>();
            map.put(String.format("第%d個", i),user);
            users.add(user);
            ObjectMessage message=session.createObjectMessage(map);
            //TextMessage message=session.createTextMessage(String.format("hello Mq:%d", i));
            //釋出訊息
            producer.send(message);
            System.out.println("傳送訊息:"+message.getObject());
            Thread.sleep(1);
        }

執行效果如下:

傳送訊息:{第0個=User{name='MrEt', age=0, sex='男', number='b55a5bb5-31be-4df3-9050-adaa69d46482', salary=2266}}
傳送訊息:{第1個=User{name='MissZe', age=1, sex='女', number='df20aa34-1956-4401-809a-fb44ab524ad0', salary=781}}
傳送訊息:{第2個=User{name='MrFs', age=2, sex='男', number='7ec61656-f40f-4759-b967-a92df7a8889f', salary=7582}}
傳送訊息:{第3個=User{name='MissIm', age=3, sex='女', number='7ad7107d-7648-40b4-84f6-4e5f815771b6', salary=4046}}
傳送訊息:{第4個=User{name='MrZc', age=4, sex='男', number='fe01b62d-b0fb-4054-a641-40fe8f0c8cbc', salary=5659}}
傳送訊息:{第5個=User{name='MissJa', age=5, sex='女', number='a525b377-d05b-499c-a4eb-b6bcdecaf39a', salary=7397}}
傳送訊息:{第6個=User{name='MrWg', age=6, sex='男', number='2ea6fe19-dec3-4c34-8bc8-1ae6633ea4fa', salary=788}}
傳送訊息:{第7個=User{name='MissIw', age=7, sex='女', number='60c57b23-e092-4b94-912d-fc03979fc4bd', salary=7408}}
傳送訊息:{第8個=User{name='MrOt', age=8, sex='男', number='76454c8c-d00d-464e-8e74-e57b4d63d01e', salary=858}}
傳送訊息:{第9個=User{name='MissJi', age=9, sex='女', number='ac5bd1ef-0421-

3. 消費者接收訊息,對應的也是將TextMessage修改為ObjectMessage,程式碼如下:

//建立一個消費者物件
        MessageConsumer consumer=session.createConsumer(destination);
        //建立監聽器
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                //TextMessage textMessage= (TextMessage) message;
                ObjectMessage Objmessage= (ObjectMessage) message;
                try {
                    System.out.println("接收到訊息:"+Objmessage.getObject());
                    Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

執行效果如下:

接收到訊息:{第0個=User{name='MrEt', age=0, sex='男', number='b55a5bb5-31be-4df3-9050-adaa69d46482', salary=2266}}
接收到訊息:{第1個=User{name='MissZe', age=1, sex='女', number='df20aa34-1956-4401-809a-fb44ab524ad0', salary=781}}
接收到訊息:{第2個=User{name='MrFs', age=2, sex='男', number='7ec61656-f40f-4759-b967-a92df7a8889f', salary=7582}}
接收到訊息:{第3個=User{name='MissIm', age=3, sex='女', number='7ad7107d-7648-40b4-84f6-4e5f815771b6', salary=4046}}
接收到訊息:{第4個=User{name='MrZc', age=4, sex='男', number='fe01b62d-b0fb-4054-a641-40fe8f0c8cbc', salary=5659}}
接收到訊息:{第5個=User{name='MissJa', age=5, sex='女', number='a525b377-d05b-499c-a4eb-b6bcdecaf39a', salary=7397}}
接收到訊息:{第6個=User{name='MrWg', age=6, sex='男', number='2ea6fe19-dec3-4c34-8bc8-1ae6633ea4fa', salary=788}}
接收到訊息:{第7個=User{name='MissIw', age=7, sex='女', number='60c57b23-e092-4b94-912d-fc03979fc4bd', salary=7408}}
接收到訊息:{第8個=User{name='MrOt', age=8, sex='男', number='76454c8c-d00d-464e-8e74-e57b4d63d01e', salary=858}}
接收到訊息:{第9個=User{name='MissJi', age=9, sex='女', number='ac5bd1ef-0421-485e-b118-eb840d2172ce', salary=5612}}
}

總結

  • 結合activemq的管理頁面學習/開發可以加深理解
  • activemq支援的每種協議埠都是不同的,如果使用stomp,則注意根據啟動時dos視窗的info資訊去確定對應的地址,修改埠則是在activemq根目錄下config/actviemq.xml
    • 注意佇列模式與主題模式的區別,尤其是主題模式下,消費者一定要在生產者生產前建立連線
    • 使用ObjectMessage要在生產者和消費者的連線中設定TrustPackage

知識來源於網際網路,因此所學知識也將分享到網際網路,希望能給像我一樣迷茫萌新提供幫助