Java 訊息中介軟體
阿新 • • 發佈:2018-11-20
Java 訊息中介軟體
文章目錄
訊息中介軟體:關注於資料的傳送與接收,利用高效可靠的非同步訊息傳遞機制整合分散式系統。
常見訊息中介軟體
1. ActiveMQ
ActiveMQ 是 Apache 出品,最流行,能力強勁的開源訊息匯流排。
- 完全支援 JMS 1.1 和 J2EE 1.4規範(持久化,XA訊息,事務)
- 支援多種語言和協議編寫客戶端
- 虛擬主題、組合目的、映象佇列
下載解壓後,執行 /bin/win64/ 路徑下 的 activemq.bat 批處理檔案,並開啟 http://localhost:8161
檢視是否安裝成功。
2. RabbitMQ
RabbitMQ 是一個開源的 AMQP 實現,伺服器端用 Erlang 語言編寫。用於在分散式系統中儲存轉發訊息,在易用性、擴充套件性、高可用性等方面表現不俗。
- 支援多種客戶端
- AMQP 的完整實現(vhost、Exchange、Binding、Routing Key 等)
- 事務支援/釋出確認
- 訊息持久化
3. Kafka
Kafka 是一種高吞吐量的分散式釋出訂閱訊息系統,是一個分散式的、分割槽的、可靠的分散式日誌儲存服務。
- 通過O(1)的磁碟資料結構提供訊息的持久化,對以TB的訊息儲存也能夠保持長時間的穩定效能
- 高吞吐量:即使是非常普通的硬體,Kafka 也可以支援每秒數百萬的訊息
- Partition、Consumer Group
4. 綜合比較
Propertity | ActiveMQ | RabbitMQ | Kafka |
---|---|---|---|
跨語言 | 支援(Java 優先) | 語言無關 | 支援(Java 優先) |
支援協議 | OpenWire,Stomp,XMPP,AMQP | AMQP | |
優點 | 遵循 JMS 規範;安裝部署方便 | 繼承 Erlang 天生的併發性,穩定性,安全性有保障 | |
缺點 | 訊息丟失;社群不活躍 | Erlang 語言難度較大;不支援動態擴充套件 | 嚴格的順序機制,不支援訊息優先順序;不支援標準的訊息協議,不利於平臺遷移 |
綜合評價 | 適合中小企業級訊息應用場景,不適合上千個佇列的應用場景 | 適合對穩定性要求高的企業級應用 | 一般應用在大資料日誌處理或對實時性、可靠性要求稍低的場景 |
規範與協議
1. JMS 規範
Java 訊息服務(Java Message Service)即 JMS,是一個 Java 平臺關於面向訊息中介軟體的 API,用於在兩個應用程式之間,或分散式系統中傳送訊息,進行非同步通訊。
- 提供者:實現 JMS規範的訊息中介軟體伺服器
- 客戶端:傳送或接收訊息的應用程式
- 生產者/釋出者:建立併發送訊息的客戶端
- 消費者/訂閱者:接收並處理訊息的客戶端
- 訊息:應用層序之間傳遞的資料內容
- 訊息模式:傳遞訊息的方式,JMS 中定義了佇列和主題兩種模式
1.1 訊息模式
1.1.1 佇列模型
- 客戶端包括生產者和消費者
- 佇列中訊息只能被一個消費者消費
- 消費者可以隨時消費佇列中的訊息
程式碼
- 生產者
package com.chen.jms.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 佇列模式:生產者
*
* @Author LeifChen
* @Date 2018-11-16
*/
public class QueueProducer {
private static final String URL = "tcp://localhost:61616";
private static final String QUEUE_NAME = "queue-test";
private static final int COUNT = 100;
public static void main(String[] args) throws JMSException {
// 1.建立 ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
// 2.建立 Connection
Connection connection = connectionFactory.createConnection();
// 3.啟動連線
connection.start();
// 4.建立會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.建立一個目標
Destination destination = session.createQueue(QUEUE_NAME);
// 6.建立一個釋出者
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < COUNT; i++) {
// 7.建立訊息
TextMessage textMessage = session.createTextMessage("test" + i);
// 8.釋出訊息
producer.send(textMessage);
System.out.println("傳送訊息:" + textMessage.getText());
}
// 9.關閉連線
connection.close();
}
}
- 消費者
package com.chen.jms.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 佇列模式:消費者
*
* @Author LeifChen
* @Date 2018-11-16
*/
public class QueueConsumer {
private static final String URL = "tcp://localhost:61616";
private static final String QUEUE_NAME = "queue-test";
public static void main(String[] args) throws JMSException {
// 1.建立 ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
// 2.建立 Connection
Connection connection = connectionFactory.createConnection();
// 3.啟動連線
connection.start();
// 4.建立會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.建立一個目標
Destination destination = session.createQueue(QUEUE_NAME);
// 6.建立一個消費者
MessageConsumer consumer = session.createConsumer(destination);
// 7.建立一個監聽器
consumer.setMessageListener(message -> {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收訊息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
});
}
}
1.1.2 主題模型
- 客戶端包括髮布者和訂閱者
- 主題中的訊息被所有訂閱者消費
- 消費者不能消費訂閱之前就傳送到主題中的訊息
程式碼
- 釋出者
package com.chen.jms.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 主題模式:釋出者
*
* @Author LeifChen
* @Date 2018-11-16
*/
public class TopicProducer {
private static final String URL = "tcp://localhost:61616";
private static final String TOPIC_NAME = "topic-test";
private static final int COUNT = 100;
public static void main(String[] args) throws JMSException {
// 1.建立 ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
// 2.建立 Connection
Connection connection = connectionFactory.createConnection();
// 3.啟動連線
connection.start();
// 4.建立會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.建立一個目標
Destination destination = session.createTopic(TOPIC_NAME);
// 6.建立一個釋出者
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < COUNT; i++) {
// 7.建立訊息
TextMessage textMessage = session.createTextMessage("test" + i);
// 8.釋出訊息
producer.send(textMessage);
System.out.println("傳送訊息:" + textMessage.getText());
}
// 9.關閉連線
connection.close();
}
}
- 訂閱者
package com.chen.jms.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 主題模式:訂閱者
*
* @Author LeifChen
* @Date 2018-11-16
*/
public class TopicConsumer {
private static final String URL = "tcp://localhost:61616";
private static final String TOPIC_NAME = "topic-test";
public static void main(String[] args) throws JMSException {
// 1.建立 ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
// 2.建立 Connection
Connection connection = connectionFactory.createConnection();
// 3.啟動連線
connection.start();
// 4.建立會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.建立一個目標
Destination destination = session.createTopic(TOPIC_NAME);
// 6.建立一個消費者
MessageConsumer consumer = session.createConsumer(destination);
// 7.建立一個監聽器
consumer.setMessageListener(message -> {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收訊息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
});
}
}
1.2 JMS 編碼介面
- ConnectionFactory:用於建立連線到訊息中介軟體的連線工廠
- Connection:代表了應用程式和訊息伺服器之間的通訊鏈路
- MessageConsumer:由會話建立,用於接收發送到目標的訊息
- MessageProducer:由會話建立,用於傳送訊息到目標
- Meeage:是在消費者和生產者之間傳送的物件,訊息頭,一組訊息屬性,一個訊息體
- Destination:指訊息釋出和接收的地點,包括佇列或主題
2. AMQP 協議
AMQP(advanced message queuing protocol)是一個提供訊息服務的應用層標準協議,基於此協議的客戶端與訊息中介軟體可傳遞訊息,並不受客戶端/中介軟體不同產品、不同開發語言等條件的限制。
3. JMS 與 AMQP 比較
Propertity | JMS 規範 | AMQP 協議 |
---|---|---|
定義 | Java API | Wire-protocol |
跨語言 | 否 | 是 |
訊息模型 | 提供兩種訊息模型: p2p pub/sub |
提供五種訊息模型: direct fanout topic headers system |
訊息型別 | TextMessage MapMessage BytesMessage StreamMessage ObjectMessage Message |
byte[] |
綜合評價 | JMS 定義了 Java API 層面的標準 | AMQP 是面向詳細、佇列、路由(包括點對點的釋出/訂閱)、可靠性、安全 |