Java SE基礎鞏固(十四):JMS
1 概述
JMS即Java Message Service(Java訊息服務),它是一組規範,提供了方便的並且通用的方式來讓Java程式建立,傳送,接受和讀取訊息。
在中大型的專案中,很多都用到了被叫做面向訊息的中介軟體(Message Oriented Middleware),通常簡稱訊息中介軟體,現在流行的訊息中介軟體產品主要有ActiveMQ,RabbitMQ,Kafka,RocketMQ等等,很多大型的商業軟體使用這些產品構建應用程式的訊息系統,訊息在應用程式中的各個模組(或者服務)之間傳遞,以此達到解耦,削峰,提高應用程式的吞吐量等目的。
因為產品眾多,如果沒有一個成熟的規範來約束,那麼各個產商訊息中介軟體的具體實現可能會大相徑庭,使得開發者在切換產品的時候非常麻煩,不得不重新編寫大量程式碼來適應產品。JMS就是這麼一個規範,它定義了一個訊息中介軟體產品應該具有哪些元件,哪些介面等等,如果訊息中介軟體都能根據規範定義的介面來實現的話,開發者在構建訊息系統的時候只需要付出很少的代價來切換不同的產品,因為介面都是一樣的,幾乎不需要如何修改程式碼。
JMS規範定義的幾個元件如下所示:
- JMS Provider。提供訊息服務的元件,用來為JMS客戶端提供服務,JMS客戶端可以有生產者客戶端和消費者客戶端等。
- JMS Message。即訊息傳遞實體。
- JMS Domains。即訊息傳遞的目的地。
JMS Message包含了如下幾個部分:
- Header。所有的訊息都有相同的Header欄位集合,類似HTTP Header,這些欄位用於在客戶端和服務端(provider)之間做路由或者身份認證等,包括JMSDestination,JMSMessageID,JMSTimestamp等。
- Properties。即屬性,是可選項,一個屬性由一個鍵值對構成,鍵是屬性名,值是屬性的值。應用程式可以自定義屬性,並將其附在Message上和Header以及Body一起傳送出去。
- Body,即訊息體主體,例如對於普通的文字訊息,訊息主題就是本文內容,例如“Hello”字串。
以上是對JMS的簡單介紹,更多關於JMS規範的內容,建議參看JSR 914檔案(直接在搜尋引擎中搜索該關鍵字就可以找到了),下面我將介紹JMS規範定義的介面並使用ActiveMQ來構建一個小Demo作為實踐,最後會介紹兩種常見訊息傳送模型:點對點傳輸模型和釋出/訂閱傳輸模型。
注意,這裡所說的訊息不是狹義的訊息,即不特指一句話或者一個郵件等,訊息不僅可以是字串形式的,還可以是一個Java物件等。
2 JMS介面
上圖是JMS規範定義的介面,最左側是JMS通用介面,右側是兩種不同的訊息傳送模型的介面,下面來介紹通用介面:
- ConnectionFactory,即連線工廠,Connection需要通過ConnectionFactory來建立。
- Connection,即連線,表示到JMS Provider的連線。
- Destination,即目的地,表示訊息傳送的終點,例如在發郵件的時候接受者的郵箱。
- Session,即會話,用於傳送和接受訊息的一個單執行緒上下文。
- MessageProducer,即訊息生產者,訊息生產者是訊息的傳送方,負責建立和傳送訊息都JMS Provider。
- MessageConsumer,即訊息消費者,訊息消費者是訊息的接收方,負責從JMS Provider接收和讀取(消費)訊息。
下圖是上述幾個介面的關係圖:
下面會看到,我們在使用ActiveMQ編寫程式碼的時候幾乎就是根據這麼一個關係圖來寫的。
3 使用ActiveMQ
標準的Java SE裡沒有實現JMS規範,所以標準JDK裡沒有上述提到的幾個介面和對應的實現,但Java EE裡有實現,不過搭建Java EE相對比較麻煩(只是一個Demo而已),所以這裡就直接使用ActiveMQ這個產品。ActiveMQ實現了JMS的規範,提供了JMS規範定義的介面,之所以使用ActiveMQ而不是其他的產品,是因為ActiveMQ的實現比較“標準”,非常貼合JMS規範,且學習門檻叫其他幾個產品低。
3.1 安裝並執行ActiveMQ
安裝很簡單,直接到ActiveMQ官網去下載即可,我這裡使用的是5.15.6這個版本,在windows下,下載apache-activemq-5.15.6-bin.zip這個壓縮包(在Unix系統下,下載apache-activemq-5.15.6-bin.tar.gz),然後解壓,在命令列裡進入bin目錄,輸入activemq start即可。
建議到官網的Get Start部分看看,安裝的啟動都講得很詳細,這裡我就不多說了。
啟動之後,可以在瀏覽器輸入http://localhost:8161來檢視ActiveMQ的各項資料,如下所示:
點選Manage ActiveMQ broker即可進入控制檯,賬號和密碼預設都是admin,進入之後如下所示:
在這裡,可以選擇各個標籤欄來檢視不同的資訊,例如Topic,Queue等,具體在這就不多說了,讀者可以自行嘗試。
3.2 匯入ActiveMQ依賴
如果使用的是Maven來構建專案,可以加入如下依賴:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.6</version>
</dependency>
複製程式碼
如果沒有使用Maven,就需要自己往專案裡匯入jar包了,在剛剛下載的壓縮包裡,包含了activemq-all-5.15.6.jar這個jar包,將其匯入到專案的類路徑即可。
3.3 編寫Demo
準備工作做完之後,就可以正式編寫程式碼了,演示程式碼如下所示:
JMSProducer
package top.yeonon.jms;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @Author yeonon
* @date 2018/10/20 0020 14:09
**/
public class JMSProducer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) throws JMSException {
//連線工廠
ConnectionFactory connectionFactory;
//連線
Connection connection;
//Session會話
Session session;
//目的地
Destination destination;
//建立連線工廠例項
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKER_URL);
//從連線工廠例項中建立連線
connection = connectionFactory.createConnection();
//從連線中建立Session,第一個引數是是否開啟事務,這裡選擇不開啟,傳入false
// 第二個引數是訊息確認模式,這裡選擇的是自動確認
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//建立了一個佇列,佇列Queue(javax.jms包裡的Queue)繼承了Destination介面
destination = session.createQueue("Hello");
//建立一個訊息生產者,需要傳入一個destination,這樣這個producer就和destination綁定了
MessageProducer producer = session.createProducer(destination);
//配置完成之後,啟動連線
connection.start();
//傳送訊息
for (int i = 0; i < 10; i++) {
TextMessage message = session.createTextMessage("message " + i);
System.out.println("producer send message : " + message.getText());
producer.send(message);
}
//傳送完成後,關閉連線
connection.close();
}
}
複製程式碼
JMSConsumer
package top.yeonon.jms;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @Author yeonon
* @date 2018/10/20 0020 14:19
**/
public class JMSConsumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) throws JMSException {
//連線工廠
ConnectionFactory connectionFactory;
//連線
Connection connection;
//Session會話
Session session;
//目的地
Destination destination;
//建立連線工廠例項
connectionFactory = new ActiveMQConnectionFactory(USERNAME,Session.AUTO_ACKNOWLEDGE);
//建立了一個佇列,佇列Queue(javax.jms包裡的Queue)繼承了Destination介面
destination = session.createQueue("Hello");
//建立一個訊息生產者,需要傳入一個destination,這樣這個producer就和destination綁定了
MessageConsumer consumer = session.createConsumer(destination);
//配置完成之後,啟動連線
connection.start();
//接受訊息
while (true) {
TextMessage message = (TextMessage) consumer.receive(10000);
if (message != null)
System.out.println("receive message : " + message.getText());
}
}
}
複製程式碼
程式碼中註釋寫的很清楚了,就只說說一個讓人感覺疑惑的Destination,在演示程式碼中,使用了Queue來表示destination,初次接觸訊息佇列的話,可能會覺得有些奇怪,一個Queue怎麼就是一個目的地了?其實這是完全可以的,因為目的地並不就一定是ip地址或者郵箱之類的東西,只要是一個可以接受訊息的的載體即可,現在大多數的訊息中介軟體都會或多或少只用佇列來作為訊息的載體,所以有時候直接把訊息中介軟體簡稱為訊息佇列(Message Queue)。
先執行consumer,稍微等待一會,再執行producer,producer執行完畢之後可以在producer的控制檯看到類似如下輸出:
producer send message : message 0
producer send message : message 1
producer send message : message 2
producer send message : message 3
producer send message : message 4
producer send message : message 5
producer send message : message 6
producer send message : message 7
producer send message : message 8
producer send message : message 9
複製程式碼
然後去consumer的控制檯看看,發現有類似如下輸出:
receive message : message 0
receive message : message 1
receive message : message 2
receive message : message 3
receive message : message 4
receive message : message 5
receive message : message 6
receive message : message 7
receive message : message 8
receive message : message 9
複製程式碼
如果能看到如上輸出,說明生產者已經順利將訊息傳送到JMS provider(ActiveMQ的服務)了,並且消費者也已經順利的從JMS provider裡取出訊息並進行讀取,處理了。如果你開啟了多個consumer,還能看到producer傳送的訊息被多個consumer共同消費,例如有兩個consumer,可能會有如下輸出:
consumer1:
receive message : message 0
receive message : message 2
receive message : message 4
receive message : message 6
receive message : message 8
複製程式碼
consumer2:
receive message : message 1
receive message : message 3
receive message : message 5
receive message : message 7
receive message : message 9
複製程式碼
有點負載均衡的意思,這是因為現在有兩個consumer共享一個佇列Hello,然後輪流從佇列Hello中取出訊息並消費(這是ActiveMQ的預設方式)。
4 訊息傳輸模型
JMS規範定義了兩種訊息傳輸模型:點對點傳輸模型和釋出/訂閱傳輸模型。
4.1 點對點傳輸模型
在點對點模型下,訊息由生產者傳送到佇列裡,佇列再將訊息傳送給消費者。在這個過程中,訊息的目的地是佇列,消費者監聽佇列,當佇列中有訊息的時候,佇列就會把訊息傳送給監聽了佇列的消費者。可以簡單的把佇列看做是一箇中轉站,負責接受生產者傳送過來的訊息,然後將訊息再傳送給消費者。
一個佇列可以有多個生產者和消費者,但是一個訊息只能被一個消費者接受並消費,JMS provider將根據“先來先服務”的原則確定訊息該傳送到哪個消費者中。如果沒有任何一個消費者在監聽佇列,那麼訊息再會在佇列裡堆積,可堆積的訊息多少由佇列的大小以及JMS provider的具體實現決定,如果超過了規定的大小,多餘的訊息可能會被拋棄,也可能會被儲存到持久化儲存器中(這取決於具體實現)。
4.2 釋出/訂閱傳輸模型
在釋出/訂閱模型下,訊息的目的地不再是簡單的佇列,而是一個Topic(主題),訊息由生產者傳送到Topic中,然後再傳遞到訂閱了該主題的消費者中。一個Topic也可以繫結多個生產者和多個消費者,和點對點模型不同的是,一個訊息可以被多個消費者消費,即假設有兩個消費者同時訂閱了一個Topic,那麼當有訊息發往這個Topic的時候,Topic會將訊息的兩個拷貝傳送到兩個消費者中。
值得注意的是,這裡的Topic其實是一個抽象的概念,其具體的形式也可以是一個佇列或者多個佇列組成的佇列組(這取決於具體實現),只是在佇列上再打上了一個Topic標籤,生產者和消費者的目標不再是簡單的某個佇列,而是一個Topic。舉個例子,現在我們定義了一個名為hello的Topic,生產者表示要將訊息傳送到這個hello Topic中,消費者也對這個hello Topic感興趣(即訂閱Topic),而這個Topic的底層是一個佇列,當生產者傳送訊息的時候,底層其實就是傳送到這個打上了hello topic標籤的佇列裡,然後再把訊息拷貝被傳遞到對hello topic感興趣的消費者中。
這裡的解釋可能有些不那麼合理,希望讀者能理解。不理解也沒事,下面我會寫一個釋出/訂閱模式的演示程式碼,方便讀者理解這個模型。
在第3節中編寫的Demo其實就是點對點模型,從點貴點模型切換到釋出/訂閱模型非常簡單,對於上面的那個Demo,只需要修改兩行程式碼即可:
//點對點模型中的producer
destination = session.createQueue("Hello");
//釋出/訂閱模型中的producer
destination = session.createTopic("Hello");
//點對點模型中的consumer
destination = session.createQueue("Hello");
//釋出/訂閱模型中的consumer
destination = session.createTopic("Hello");
複製程式碼
修改完畢之後,執行兩個consumer,一個producer,然後看看控制檯的輸出。
producer的輸出大致如下所示:
producer send message : message 0
producer send message : message 1
producer send message : message 2
producer send message : message 3
producer send message : message 4
producer send message : message 5
producer send message : message 6
producer send message : message 7
producer send message : message 8
producer send message : message 9
複製程式碼
consumer1:
receive message : message 0
receive message : message 1
receive message : message 2
receive message : message 3
receive message : message 4
receive message : message 5
receive message : message 6
receive message : message 7
receive message : message 8
receive message : message 9
複製程式碼
consumer2:
receive message : message 0
receive message : message 1
receive message : message 2
receive message : message 3
receive message : message 4
receive message : message 5
receive message : message 6
receive message : message 7
receive message : message 8
receive message : message 9
複製程式碼
發現consumer1和consumer2都收到了producer釋出的10個訊息,總共20個訊息。這和點對點模型中不一樣,點對點模型中無論有多少個消費者,總共都只能接受10個訊息。
5 小結
本文簡單介紹了JMS規範的一些元件,介面,瞭解JMS可以加深對各種訊息中介軟體產品的理解。之後介紹了JMS定義的兩種訊息傳輸模型:點對點模型和釋出/訂閱模型。兩者最大的區別是:點對點模型中,無論有多少個消費者,一個訊息只能被一個消費者接受並消費,釋出/訂閱模型中,一個訊息可以被多個消費者消費。文中還提供了一個Demo,讓大家直觀的感受到這兩種模型的區別,加深理解。