訊息中介軟體入門「一」:初識訊息中介軟體【ActiveMQ】
訊息中介軟體入門「一」:初識訊息中介軟體【ActiveMQ】
背景介紹
訊息中介軟體相當於程序間通訊的信託,可以降低複雜系統中各個模組間的耦合度。對於信託:你只需要把Message給我,就沒你的事兒了。我負責給你送到目的地,就不需要你必須實時的守著,等待所有通訊細節的完成。就算你突然掛了也沒事,Message由信託給你存著,直到送到目的地才會消失。也就是說通訊細節都由訊息中介軟體完成,生產者只需要把訊息給中介軟體即可,而消費者只需要繫結好地址。有了訊息就會主動推送過來,也不需要消費者主邏輯執行緒守著。而webservice則是需要客戶端與服務端都保持線上,客戶端發起請求後必須等待服務端完成處理並將資料返回。由此可見中介軟體可以延遲處理也能夠實時的處理,十分可靠。而webservice要求有較高的實時性,通訊過程不允許出現宕機情況。也無法延時處理。
常用的訊息中介軟體有:ActiveMQ,RabbitMQ,RocketMQ,kafka等。而ActiveMQ是一款完全遵循jms規範的訊息中介軟體,支援多種協議如Stomp等,如圖1所示。
第一步:安裝ActiveMQ
- 訪問ActiveMQ官方下載安裝對應平臺的訊息安裝包,點我進入下載地址。
- 解壓到自己電腦上,進入bin目錄下,在cmd中輸入
activemq start
啟動如圖3。
- 執行bin/win64/activemq.bat 也可以快速啟動
- 解壓到自己電腦上,進入bin目錄下,在cmd中輸入
根據dos視窗內容瞭解activemq的啟動資訊
在啟動好actviemq後,在dos視窗中可以看到輸出了很多內容,那麼具體代表什麼含義呢:
- 修改對應協議的埠,進入
$(activemq_home)/config/
,開啟actviemq.xml
找到如下程式碼進行修改:
Tips:在網頁訪問時將地址0.0.0.0
換成主機地址,如127.0.0.1
-->
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
在dos中複製web管理地,將
0.0.0.0
換成127.0.0.1
,在瀏覽器中開啟如圖 5:
點選MangeActiveMQ broker,會彈出視窗輸入賬號密碼,預設為:
admin/admin
。管理介面如圖 6:
點選Queues,管理佇列模式的訊息列表,如圖7:
Tips:佇列模式下,預設配置下,所有訊息一旦被儲存到中介軟體中,只要沒有被消費者取出,就會一直存在,即使是actviemq被關閉。並且每一條訊息只會存在一個消費者中,也就是說佇列模式下:所有訊息是一對一,也就是所謂的點對點模式。一條訊息一旦被一個消費者接收到後,actviemq就刪除該訊息,其他消費者無法接繼續收該訊息。佇列模式就類似於搶購小米手機,數量有限,手慢無。
點選Topics,進入主題模式下的管理介面如圖8所示:
Tips:主題模式下:消費者必須在生產者釋出訊息前,訂閱訊息的地址(destination),否則講接收不到任何訊息。並且訂閱了destination的所有消費者都會收到生產者生產的所有訊息。類似於廣播的概念,在廣播廳內所有有耳朵的人都能接收到訊息。
第二步:使用actviemq
- 專案目錄介紹,如圖9:
特別說明:普通的java專案就可以了,不一定非要maven,因為依賴的jar包只有一個,而且在activemq的目錄下面,如圖10 :
如果懶得複製,可以在maven中加入如下依賴:
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.5</version>
</dependency>
</dependencies>
Tips:version最好與下載的actviemq版本一致
1.使用佇列模式
- 生產者程式碼:
package queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class QueueProducer {
private static final String url="tcp://localhost:61616";
private static final String queueName="HelloActiveMQ";
public static void main(String[] args) throws JMSException, InterruptedException {
//建立連線工廠
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(url);
//建立連線
Connection connection=connectionFactory.createConnection();
//連線啟動
connection.start();
//通過連接獲得session
Session session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//建立目標
Destination destination=session.createQueue(queueName);
//建立一個生產者
MessageProducer producer=session.createProducer(destination);
for (int i = 0; i < 100; i++) {
//建立訊息
TextMessage message=session.createTextMessage(String.format("hello Mq:%d", i));
//釋出訊息
producer.send(message);
System.out.println("傳送訊息:"+message.getText());
Thread.sleep(500);
}
//關閉連線
connection.close();
}
}
- 消費者程式碼:
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class MqConsumer {
private static final String queueName="HelloActiveMQ";
private static final String url="tcp://localhost:61616";
public static void main(String[] args) throws JMSException {
//獲取連線工廠
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(url);
//獲取連線
Connection connectionn=connectionFactory.createConnection();
//啟動連線
connectionn.start();
//建立session
Session session=connectionn.createSession(false,Session.AUTO_ACKNOWLEDGE);
//繫結地址
Destination destination=session.createQueue(queueName);
//建立一個消費者物件
MessageConsumer consumer=session.createConsumer(destination);
//建立監聽器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage= (TextMessage) message;
try {
System.out.println("接收到訊息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
接收訊息有兩種方式,一種是同步方式,一種是非同步方式。本例使用非同步方式。使用同步方式程式碼如下:
Message message = consumer.receive();
String text = ((TextMessage) message).getText();
System.out.println("接收到訊息:" + text);
同步方式執行結束則不會再接收訊息,如果要持續接收訊息,需要寫在while死迴圈中;非同步方式似乎是為了解決這種問題,直接通過建立監聽器的方式啟動一個執行緒,會持續接收新訊息。
Tips:可以看出無論消費者還是生產者,前面都要經過:
- 通過
ActiveMQConnectionFactory
獲取連線工廠bean - 建立連線
- 啟動該連線
- 通過連線後獲取
Session
- 通過
session
根據方法獲取佇列的destination
- 再通過
session
獲取消費者consumer
或者producer
其前面的過程如同jdbc連線資料庫一樣,而後面根據消費者和生產者的不同角色定位,去傳送或者接收訊息。
輸出效果:
1. 啟動一個生產者一個消費者
接收到訊息:hello Mq:0
…………
接收到訊息:hello Mq:98
接收到訊息:hello Mq:99
100條訊息全部接收
2. 啟動一個生產者兩個消費者
consumer 1:
……
接收到訊息:hello Mq:9
接收到訊息:hello Mq:11
接收到訊息:hello Mq:13
接收到訊息:hello Mq:15
接收到訊息:hello Mq:17
接收到訊息:hello Mq:19
接收到訊息:hello Mq:21
接收到訊息:hello Mq:23
接收到訊息:hello Mq:25
consumer 2:
……
接收到訊息:hello Mq:10
接收到訊息:hello Mq:12
接收到訊息:hello Mq:14
接收到訊息:hello Mq:16
接收到訊息:hello Mq:18
接收到訊息:hello Mq:20
接收到訊息:hello Mq:22
接收到訊息:hello Mq:24
接收到訊息:hello Mq:26
可以看出佇列模式下確實為一對一,點對點,消費者收到的每一條訊息都彼此不相同,一個為奇數條,一個為偶數條。並且佇列模式下,即時中途關閉actviemq,消費者沒取完的訊息也會被儲存起來,在下次執行時消費者可以接著接收訊息。
2. 使用主題模式
- 生產者程式碼
package topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class TopicProducer {
private static final String url="tcp://localhost:61616";
private static final String queueName="queue";
public static void main(String[] args) throws JMSException, InterruptedException {
//建立連線工廠
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(url);
//建立連線
Connection connection=connectionFactory.createConnection();
//連線啟動
connection.start();
//通過連接獲得session
Session session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//建立目標
Destination destination=session.createTopic(queueName);
//建立一個生產者
MessageProducer producer=session.createProducer(destination);
for (int i = 0; i < 100; i++) {
//建立訊息
TextMessage message=session.createTextMessage(String.format("hello Mq:%d", i));
//釋出訊息
producer.send(message);
System.out.println("傳送訊息:"+message.getText());
Thread.sleep(500);
}
//關閉連線
connection.close();
}
}
可以看出於佇列模式唯一區別就是建立目標變成了Topic:
//建立目標
Destination destination=session.createTopic(queueName);
- 消費者程式碼也是一樣,將建立目標destination變成Topic:
//建立目標
Destination destination=session.createTopic(queueName);
消費者所有程式碼:
package topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class TopicConsumer {
private static final String queueName = "queue";
private static final String url = "tcp://localhost:61616";
public static void main(String[] args) throws JMSException {
//獲取連線工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
//獲取連線
Connection connectionn = connectionFactory.createConnection();
//啟動連線
connectionn.start();
//建立session
Session session = connectionn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//繫結地址
Destination destination = session.createTopic(queueName);
//建立一個消費者物件
MessageConsumer consumer = session.createConsumer(destination);
//建立監聽器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
Thread.sleep(500);
System.out.println("接收到訊息:" + textMessage.getText());
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
由於主題模式中,生產者所有訊息,在訂閱了該主題的所有消費者中都能收到。因此直接啟動一個生產者和兩個消費者,這裡啟動時候必須先啟動消費者,不然會丟失部分訊息。
- 輸出結果:
consumer1:
接收到訊息:hello Mq:0
接收到訊息:hello Mq:1
接收到訊息:hello Mq:2
接收到訊息:hello Mq:3
接收到訊息:hello Mq:4
接收到訊息:hello Mq:5
接收到訊息:hello Mq:6
接收到訊息:hello Mq:7
接收到訊息:hello Mq:8
接收到訊息:hello Mq:9
接收到訊息:hello Mq:10
consumer2:
接收到訊息:hello Mq:0
接收到訊息:hello Mq:1
接收到訊息:hello Mq:2
接收到訊息:hello Mq:3
接收到訊息:hello Mq:4
接收到訊息:hello Mq:5
接收到訊息:hello Mq:6
接收到訊息:hello Mq:7
接收到訊息:hello Mq:8
接收到訊息:hello Mq:9
接收到訊息:hello Mq:10
可以看出所有消費者在同一時間段內接收到的訊息都是一樣的,並且在訂閱前的訊息會丟失。進入Topic管理頁面中可以檢視丟失的訊息,以及推送的訊息。如圖10:
圖中丟失的訊息數量=生產訊息數量*消費者數量-消費出去的訊息數量。
因此由於主題模式的這種特性,消費者一定要在生產者生產訊息前訂閱好destination
activemq支援的訊息(Message)資料型別
Activemq支援的
Message
如下:
- TextMessage 文字訊息:攜帶一個java.lang.String作為有效資料(負載)的訊息,可用於字串型別的資訊交換
- ObjectMessage 物件訊息:攜帶一個可以序列化的Java物件作為有效負載的訊息,可用於Java物件型別的資訊交換;
- MapMessage 對映訊息:攜帶一組鍵值對的資料作為有效負載的訊息,有效資料值必須是Java原始資料型別(或者它們的包裝類)及String。即:byte , short , int , long , float , double , char , boolean , String
- BytesMessage 位元組訊息 :攜帶一組原始資料型別的位元組流作為有效負載的訊息;
- StreamMessage 流訊息:攜帶一個原始資料型別流作為有效負載的訊息,它保持了寫入流時的資料型別,寫入什麼型別,則讀取也需要是相同的型別;
可以看出最常用的應該還是ObjectMessage型別,TextMessage只是簡單的字串傳輸,不適用於複雜的資料互動。而MapMessage只支援基本資料型別和String。BytesMessage則偏向於底層。StreamMessage任然還是支援的資料型別有限制。
如何使用java物件傳輸訊息
使用java物件傳輸訊息官網給出了兩種方式:
1.設定信任物件的全類名。這種方式需要連線時候設定好全類名,並且actviemq也需要配置該全類名,並且類路徑必須保持一致。也就是說新增一個信任的物件,就必須在actviemq的配置檔案中修改一下,同時java程式碼也要修改一下。加大了中介軟體與java程式碼的耦合度。目前按照官方給出的教程還沒有成功過,主要是找不到官網描述的配置檔案,不知道是不是actviemq版本的原因==!。
2.設定信任所有的物件,這種方式只需要在java程式碼稍作修改即可,並且一旦配置好,所有的物件都可以傳輸,無聊是自定義的pojo還是java集合類都可以使用。
具體步驟【信任所有package】:
1. 在生產者和消費者連線公共部分connectionFactory
呼叫setTrustAllPackages
設定為true
:
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(url);
((ActiveMQConnectionFactory) connectionFactory).setTrustAllPackages(true);
2. 生產者生產訊息使用ObjectMessage
傳出訊息,程式碼如下:
//建立一個生產者
MessageProducer producer=session.createProducer(destination);
for (int i = 0; i < 100; i++) {
//建立訊息
User user=new User();
user.setAge(i);
user.setName((i%2==0? "Mr":"Miss")+String.valueOf((char)(int)(Math.random()*26+'A'))+String.valueOf((char)(int)(Math.random()*26+'a')));
user.setSex(i%2==0? "男":"女");
user.setSalary((int)(Math.random()*10000));
user.setNumber(UUID.randomUUID().toString());
ArrayList<User> users=new ArrayList<User>();
HashMap<String,User> map=new HashMap<String, User>();
map.put(String.format("第%d個", i),user);
users.add(user);
ObjectMessage message=session.createObjectMessage(map);
//TextMessage message=session.createTextMessage(String.format("hello Mq:%d", i));
//釋出訊息
producer.send(message);
System.out.println("傳送訊息:"+message.getObject());
Thread.sleep(1);
}
執行效果如下:
傳送訊息:{第0個=User{name='MrEt', age=0, sex='男', number='b55a5bb5-31be-4df3-9050-adaa69d46482', salary=2266}}
傳送訊息:{第1個=User{name='MissZe', age=1, sex='女', number='df20aa34-1956-4401-809a-fb44ab524ad0', salary=781}}
傳送訊息:{第2個=User{name='MrFs', age=2, sex='男', number='7ec61656-f40f-4759-b967-a92df7a8889f', salary=7582}}
傳送訊息:{第3個=User{name='MissIm', age=3, sex='女', number='7ad7107d-7648-40b4-84f6-4e5f815771b6', salary=4046}}
傳送訊息:{第4個=User{name='MrZc', age=4, sex='男', number='fe01b62d-b0fb-4054-a641-40fe8f0c8cbc', salary=5659}}
傳送訊息:{第5個=User{name='MissJa', age=5, sex='女', number='a525b377-d05b-499c-a4eb-b6bcdecaf39a', salary=7397}}
傳送訊息:{第6個=User{name='MrWg', age=6, sex='男', number='2ea6fe19-dec3-4c34-8bc8-1ae6633ea4fa', salary=788}}
傳送訊息:{第7個=User{name='MissIw', age=7, sex='女', number='60c57b23-e092-4b94-912d-fc03979fc4bd', salary=7408}}
傳送訊息:{第8個=User{name='MrOt', age=8, sex='男', number='76454c8c-d00d-464e-8e74-e57b4d63d01e', salary=858}}
傳送訊息:{第9個=User{name='MissJi', age=9, sex='女', number='ac5bd1ef-0421-
3. 消費者接收訊息,對應的也是將TextMessage
修改為ObjectMessage
,程式碼如下:
//建立一個消費者物件
MessageConsumer consumer=session.createConsumer(destination);
//建立監聽器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
//TextMessage textMessage= (TextMessage) message;
ObjectMessage Objmessage= (ObjectMessage) message;
try {
System.out.println("接收到訊息:"+Objmessage.getObject());
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
});
執行效果如下:
接收到訊息:{第0個=User{name='MrEt', age=0, sex='男', number='b55a5bb5-31be-4df3-9050-adaa69d46482', salary=2266}}
接收到訊息:{第1個=User{name='MissZe', age=1, sex='女', number='df20aa34-1956-4401-809a-fb44ab524ad0', salary=781}}
接收到訊息:{第2個=User{name='MrFs', age=2, sex='男', number='7ec61656-f40f-4759-b967-a92df7a8889f', salary=7582}}
接收到訊息:{第3個=User{name='MissIm', age=3, sex='女', number='7ad7107d-7648-40b4-84f6-4e5f815771b6', salary=4046}}
接收到訊息:{第4個=User{name='MrZc', age=4, sex='男', number='fe01b62d-b0fb-4054-a641-40fe8f0c8cbc', salary=5659}}
接收到訊息:{第5個=User{name='MissJa', age=5, sex='女', number='a525b377-d05b-499c-a4eb-b6bcdecaf39a', salary=7397}}
接收到訊息:{第6個=User{name='MrWg', age=6, sex='男', number='2ea6fe19-dec3-4c34-8bc8-1ae6633ea4fa', salary=788}}
接收到訊息:{第7個=User{name='MissIw', age=7, sex='女', number='60c57b23-e092-4b94-912d-fc03979fc4bd', salary=7408}}
接收到訊息:{第8個=User{name='MrOt', age=8, sex='男', number='76454c8c-d00d-464e-8e74-e57b4d63d01e', salary=858}}
接收到訊息:{第9個=User{name='MissJi', age=9, sex='女', number='ac5bd1ef-0421-485e-b118-eb840d2172ce', salary=5612}}
}
總結
- 結合activemq的管理頁面學習/開發可以加深理解
- activemq支援的每種協議埠都是不同的,如果使用stomp,則注意根據啟動時dos視窗的info資訊去確定對應的地址,修改埠則是在activemq根目錄下
config/actviemq.xml
- 注意佇列模式與主題模式的區別,尤其是主題模式下,消費者一定要在生產者生產前建立連線
- 使用
ObjectMessage
要在生產者和消費者的連線中設定TrustPackage
知識來源於網際網路,因此所學知識也將分享到網際網路,希望能給像我一樣迷茫萌新提供幫助