JMS-中介軟體MQ-activeMQ入門
阿新 • • 發佈:2021-08-12
一、MQ(Message Queue)概念
- kafka
- RabbitMQ
- RocketMQ
- ActiveMQ:https://activemq.apache.org/
1.1 MQ的技術維度
- api傳送和接收
- MQ高可用
- MQ叢集和容錯配置
- MQ持久化
- redis
- 延時傳送/定時投遞
- 簽收機制
- spring整合
1.2 作用
- 解耦:系統間介面耦合嚴重
- 當新的模組接進來時,可以做到程式碼改動最小
- 削峰:面對大流量併發時,容易被沖垮
- 設定流量緩衝池,後端系統按照自身吞吐能力進行消費
- 非同步:等待同步存在效能問題
- 強弱依賴梳理能將不關鍵呼叫鏈路的操作非同步化,提升系統的吞吐能力
1.3 定義
- 面向訊息的中介軟體(message-oriented middleware)MOM
提供訊息傳遞和訊息排隊模型在分散式環境下提供應用的解耦、彈性伸縮、冗餘儲存、流量削峰、非同步通訊、資料同步 - 過程:
- 傳送方——>訊息伺服器,訊息伺服器將訊息存放在:佇列/主題,訊息伺服器——>接收方。
- 傳送和接收是非同步的
- 在pub/sub模式下,可以完成一對多,一個訊息多個接收方。
二、簡單操作
- 安裝過程略....
- 解壓命令:
tar -zxvf ./apache-activemq-5.16.2-bin.tar.gz -C /指定目錄
- 複製到自己的目錄:
sudo cp -r apache-activemq-5.16.2/ ~/myactiveMQ/
- 啟動檔案目錄:
/home/dj/myactiveMQ/apache-activemq-5.16.2/bin
總用量 144 -rwxr-xr-x. 1 root root 26692 8月 8 10:35 activemq -rwxr-xr-x. 1 root root 6189 8月 8 10:35 activemq-diag -rw-r--r--. 1 root root 16020 8月 8 10:35 activemq.jar -rw-r--r--. 1 root root 5597 8月 8 10:35 env drwxr-xr-x. 2 root root 78 8月 8 10:35 linux-x86-32 drwxr-xr-x. 2 root root 78 8月 8 10:35 linux-x86-64 drwxr-xr-x. 2 root root 82 8月 8 10:35 macosx -rw-r--r--. 1 root root 83820 8月 8 10:35 wrapper.jar
- 啟動命令:
./activemq start
- activeMQ的預設程序埠:
61616
- 檢查是否開啟命令:
lsof -i:61616
java 3611 root 137u IPv6 45437 0t0 TCP *:61616 (LISTEN)
netstat -anp|grep 61616
tcp6 0 0 :::61616 :::* LISTEN 3611/java
- 關閉命令:
./activemq stop
- 帶日誌的啟動命令:
./activemq start > /myactiveMQ/run_activemq.log
- activeMQ控制檯:
- http://linux伺服器ip:8161
- 賬號:admin,密碼:admin
如果Windows系統上無法訪問Linux系統虛擬機器的控制檯,可能是Linux系統的防火牆沒關
參考:https://www.jianshu.com/p/d58c3307de51
安裝iptables-service工具: yum install iptables-service
啟用: systemctl enable iptables
開啟: systemctl start iptables
檢視防火牆狀態: service iptables status
關閉防火牆: service iptables stop
開啟防火牆: service iptables start
三、java編碼MQ的api
3.1 依賴:
<!-- activemq的依賴 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.16.2</version>
</dependency>
<!-- activemq整合spring的基礎包 -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>4.20</version>
</dependency>
3.2 執行原理
Destination(目的地):
- Queue(佇列):一對一
- Topic(主題):一對多
3.3 Queue-測試程式碼例項
3.3.1 生產者編碼
public class JmsProducer {
//url地址
public static final String MY_ACTIVEMQ_URL = "tcp://192.168.10.100:61616";
public static final String MY_QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
//1 建立Connection連線工廠
ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(MY_ACTIVEMQ_URL);
//2 使用連線工廠建立連線並啟動
Connection connection = acf.createConnection();
connection.start();
//3 建立會話Session,兩個引數:第一個事務/第二個簽收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4 建立目的地
Queue queue = session.createQueue(MY_QUEUE_NAME);
//5 建立訊息生產者
MessageProducer messageProducer = session.createProducer(queue);
for (int i = 1; i <= 3; i++){
//6 建立訊息
TextMessage message = session.createTextMessage("訊息msg--->" + i);
//7 messageProducer將產生的訊息傳送到MQ佇列中
messageProducer.send(message);
}
//8 關閉資源
messageProducer.close();
session.close();
connection.close();
//測試程式完成
System.out.println("*****程式釋出成功");
}
}
- web控制檯效果:
3.3.2 消費者編碼
public class JmsConsumer {
//url地址
public static final String MY_ACTIVEMQ_URL = "tcp://192.168.10.100:61616";
public static final String MY_QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
//1 建立Connection連線工廠
ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(MY_ACTIVEMQ_URL);
//2 使用連線工廠建立連線並啟動
Connection connection = acf.createConnection();
connection.start();
//3 建立會話Session,兩個引數:第一個事務/第二個簽收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4 建立目的地
Queue queue = session.createQueue(MY_QUEUE_NAME);
//5 建立消費者
MessageConsumer messageConsumer = session.createConsumer(queue);
while (true){
//6 接收訊息
TextMessage message = (TextMessage) messageConsumer.receive();
//TextMessage message = (TextMessage) messageConsumer.receive(4000L);
//有時間的receive方法,設定的時間一過,自動關閉
if (message != null){
System.out.println("接收訊息===>"+message.getText());
}else {
System.out.println("!!!!接收完畢!!!!!");
break;
}
}
//7 關閉資源
messageConsumer.close();
session.close();
connection.close();
}
}
- 控制檯效果:
3.3.3 消費者編碼-監聽器
//5 建立消費者
MessageConsumer messageConsumer = session.createConsumer(queue);
/*
* 方法一:同步阻塞
* */
// while (true){
// //6 接收訊息
// TextMessage message = (TextMessage) messageConsumer.receive();
// if (message != null){
// System.out.println("接收訊息===>"+message.getText());
// }else {
// System.out.println("!!!!接收完畢!!!!!");
// break;
// }
// }
/*
* 方法二:設定監聽器
* */
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message != null && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收訊息===>"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read();
//7 關閉資源
messageConsumer.close();
session.close();
connection.close();
3.3.4 消費者的三個情況
- 1、先啟動生產者,只啟動1號消費者
- 問:1號消費者能消費訊息嗎?
- 答:可以
- 2、先啟動生產者,先啟動1號消費者,再啟動2號消費者
- 問:2號消費者能消費訊息嗎?
- 答:不可以,1號消費者全部消費完畢
- 3、先啟動2個消費者,再啟動生產者生產6條訊息
- 問:消費資訊的情況
- 答:2個消費者平均分
3.4 Topic-測試程式碼例項
3.4.1 注意點
- 1、生產者將訊息釋出到topic上,每個訊息可以有多個消費者,屬於1:N的關係。
- 2、生產者和消費者之間有時間上的相關性,訂閱某個主題的消費者只能消費自它訂閱之後釋出的訊息。
- 3、生產者生產時,topic不儲存訊息它是無狀態的不落地,如果沒有訂閱,那就是一條廢訊息,所以一般先啟動消費者再啟動生產者。
3.4.2 生產者編碼
只需要修改以下部分
```java
//4 建立目的地
Topic topic = session.createTopic(MY_TOPIC_NAME);
3.4.3 消費者編碼
只需要修改以下部分
//4 建立目的地
Topic topic = session.createTopic(MY_TOPIC_NAME);
-
控制檯列印測試結果:
-
web控制檯:
3.5 Queue和Topic模式的區別
四、JMS(Java Message Service)Java訊息服務
JMS是javaEE的技術體系中之一。
4.1 組成結構
- JMS Provider:MQ訊息中介軟體伺服器
- JMS Producer:建立和傳送JMS訊息的應用
- JMS Consumer:接收和處理JMS訊息的應用
- JMS Message:
- 訊息頭:
- JMSDestination:設定目的地:queue,topic
- JMSDeliveryMode:持久和非持久
- 持久:被傳送一次僅僅一次,在JMS提供者出現故障,訊息不會丟失,會在伺服器恢復後再次傳遞
- 非持久:最多傳送一次,意味著伺服器出現故障,訊息回永久消失
- JMSExpiration:在一定時間後過期,預設永久
- JMSPriority:優先順序,0-9個級別,0-4普通,5-9加急,預設4
- JMSMessageID:唯一識別每個訊息的標識,由MQ產生
- 訊息體:
- 封裝具體訊息
- 傳送和接收的訊息體型別必須一致
- 5種訊息體格式:
- TextMessage
- MapMessage
- BytesMessage
- StreamMessage
- ObjectMessage
- 訊息屬性:
- 需要除訊息頭欄位以外的值
- 識別/去重/重點標註等操作
- 訊息頭:
4.2 JMS的可靠性
4.2.1 PERSISTENT:永續性
-
引數設定:
- 持久:messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
- 非持久:messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
持久的Queue:
- 預設是持久化
-
持久的Topic:
- 生產者程式碼例項:
public class JmsProducerTopic_persistent { //url地址 public static final String MY_ACTIVEMQ_URL = "tcp://192.168.10.100:61616"; public static final String MY_TOPIC_NAME = "topic_dj"; public static void main(String[] args) throws JMSException { //1 建立Connection連線工廠 ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(MY_ACTIVEMQ_URL); //2 使用連線工廠建立連線並啟動 Connection connection = acf.createConnection(); //3 建立會話Session,兩個引數:第一個事務/第二個簽收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4 建立目的地 Topic topic = session.createTopic(MY_TOPIC_NAME); //5 建立訊息生產者 MessageProducer messageProducer = session.createProducer(topic); messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); connection.start(); for (int i = 1; i <= 3; i++){ //6 建立訊息 TextMessage message = session.createTextMessage("topic_dj訊息msg--->" + i); //7 messageProducer將產生的訊息傳送到MQ佇列中 messageProducer.send(message); } //8 關閉資源 messageProducer.close(); session.close(); connection.close(); //測試程式完成 System.out.println("*****程式釋出成功"); } }
- 消費者程式碼例項:
public class JmsConsumerTopic_persistent { //url地址 public static final String MY_ACTIVEMQ_URL = "tcp://192.168.10.100:61616"; public static final String MY_TOPIC_NAME = "topic_dj"; public static void main(String[] args) throws JMSException, IOException { System.out.println("我是1號topic消費者"); //1 建立Connection連線工廠 ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(MY_ACTIVEMQ_URL); //2 使用連線工廠建立連線並啟動 Connection connection = acf.createConnection(); connection.setClientID("1號消費者"); //3 建立會話Session,兩個引數:第一個事務/第二個簽收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4 建立目的地 Topic topic = session.createTopic(MY_TOPIC_NAME); TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark..."); //5 connection.start(); Message message = topicSubscriber.receive(); while (null != message){ TextMessage textMessage = (TextMessage) message; System.out.println("****收到持久化topic:"+textMessage.getText()); message = topicSubscriber.receive(1000L); } session.close(); connection.close(); } }
- 啟動前web控制檯:
-
啟動後web控制檯:
- 接收時間不過期
- 接收時間在一定時間過期
- 最後效果:
4.2.2 事務:
- 事務需要實現的程式碼例項:
//3 建立會話Session,兩個引數:第一個事務/第二個簽收
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
......
session.commit();
4.2.3 簽收:
- 非事務:
- 自動簽收(預設):Session.AUTO_ACKNOWLEDGE
- 手動簽收:Session.CLIENT_ACKNOWLEDGE
- 客戶端呼叫acknowledge方法手動簽收
try { System.out.println("接收訊息===>"+textMessage.getText()); message.acknowledge(); } catch (JMSException e) { e.printStackTrace(); }
- 允許重複訊息:Session.DUPS_OK_ACKNOWLEDGE
- 事務:
- 生產事務開啟,只有commit後才能將全部訊息變為已消費
- 訊息生產者
- 訊息消費者
- 事務和簽收的關係:
- 在事務會話中,當一個事務被成功提交則訊息會被自動簽收。如果事務回滾,則訊息會被再次傳送
- 非事務性會話中,訊息何時被確認取決於建立會話時的應答模式
4.3 JMS的點對點總結
基於佇列的,生產者傳送訊息到佇列,消費者從佇列接收訊息,佇列的存在使得訊息的非同步傳輸稱為可能。類似於發簡訊
- 如果Session關閉時,有部分訊息已被接收但還沒有被簽收(acknowledge),那當消費者下次連線到相同的佇列時,這些訊息還會被再次接收。
- 佇列可以長久的儲存訊息直到消費者收到訊息,消費者不需要因為擔心訊息丟失而和佇列時刻保持啟用的來連線狀態,充分體現了非同步傳輸模式的優勢
4.4 JMS的釋出訂閱總結
JMS Pub/Sub模型定義瞭如何向一個內容節點發布和訂閱訊息,這些節點被稱為topic。類似於微信公眾號
- 主題可以被是訊息的傳輸中介,釋出者釋出訊息到主題,訂閱者從主題訂閱訊息。
- 主題使得訊息訂閱者和訊息釋出者保持互相獨立,不需要接觸即可保證訊息的傳送。
- 非持久訂閱下,不能恢復或者重新派送一個未簽收的訊息。
- 持久訂閱能恢復或重新派送未簽收的訊息。
五、ActiveMQ的Broker
5.1 概念
- 相當於一個ActiveMQ伺服器例項
- Broker實現了用程式碼的形式啟動ActiveMQ將MQ嵌入Java程式碼中,以便隨時啟動。
- 在使用時啟動,可以節省資源,保證可靠性。
5.2 conf配置檔案
- 不同的配置檔案模擬不同的例項
./activemq start xbean:file:/myactiveMQ/apache-activemq-5.16.2/conf/activemq02.xml
5.3 嵌入式Broker
5.3.1 pom.xml
- json繫結
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.4</version>
</dependency>
5.3.2 EmbedBroker
public class EmbedBroker {
public static void main(String[] args) throws Exception {
//ActiveMQ也支援在vm中通訊基於嵌入式的broker
BrokerService brokerService = new BrokerService();
brokerService.setUseJmx(true);
brokerService.addConnector("tcp://localhost:61616");
brokerService.start();
System.in.read();
}
}