訊息佇列的兩種模式及實現
轉載:http://blog.csdn.net/heyutao007/article/details/50131089
訊息佇列的兩種模式
Java訊息服務(Java Message
Service,JMS)應用程式介面是一個Java平臺中關於面向訊息中介軟體(MOM)的API,用於在兩個應用程式之間,或分散式系統中傳送訊息,進行非同步通訊。
點對點與釋出訂閱最初是由JMS定義的。這兩種模式主要區別或解決的問題就是傳送到佇列的訊息能否重複消費(多訂閱)
1、定義
JMS規範目前支援兩種訊息模型:點對點(point to point, queue)和釋出/訂閱(publish/subscribe,topic)。
1.1、點對點:Queue,不可重複消費
訊息生產者生產訊息傳送到queue中,然後訊息消費者從queue中取出並且消費訊息。
訊息被消費以後,queue中不再有儲存,所以訊息消費者不可能消費到已經被消費的訊息。Queue支援存在多個消費者,但是對一個訊息而言,只會有一個消費者可以消費。
1.2、釋出/訂閱:Topic,可以重複消費
訊息生產者(釋出)將訊息釋出到topic中,同時有多個訊息消費者(訂閱)消費該訊息。和點對點方式不同,釋出到topic的訊息會被所有訂閱者消費。
支援訂閱組的釋出訂閱模式:
釋出訂閱模式下,當釋出者訊息量很大時,顯然單個訂閱者的處理能力是不足的。實際上現實場景中是多個訂閱者節點組成一個訂閱組負載均衡消費topic訊息即分組訂閱,這樣訂閱者很容易實現消費能力線性擴充套件。可以看成是一個topic下有多個Queue,每個Queue是點對點的方式,Queue之間是釋出訂閱方式。
2、區別
2.1、點對點模式
生產者傳送一條訊息到queue,一個queue可以有很多消費者,但是一個訊息只能被一個消費者接受,當沒有消費者可用時,這個訊息會被儲存直到有 一個可用的消費者,所以Queue實現了一個可靠的負載均衡。
2.2、釋出訂閱模式
釋出者傳送到topic的訊息,只有訂閱了topic的訂閱者才會收到訊息。topic實現了釋出和訂閱,當你釋出一個訊息,所有訂閱這個topic的服務都能得到這個訊息,所以從1到N個訂閱者都能得到這個訊息的拷貝。
3、流行模型比較
傳統企業型訊息佇列ActiveMQ遵循了JMS規範,實現了點對點和釋出訂閱模型,但其他流行的訊息佇列RabbitMQ、Kafka並沒有遵循JMS規範。
3.1、RabbitMQ
RabbitMQ實現了AQMP協議,AQMP協議定義了訊息路由規則和方式。生產端通過路由規則傳送訊息到不同queue,消費端根據queue名稱消費訊息。
RabbitMQ既支援記憶體佇列也支援持久化佇列,消費端為推模型,消費狀態和訂閱關係由服務端負責維護,訊息消費完後立即刪除,不保留歷史訊息。
(1)點對點
生產端傳送一條訊息通過路由投遞到Queue,只有一個消費者能消費到。
(2)多訂閱
當RabbitMQ需要支援多訂閱時,釋出者傳送的訊息通過路由同時寫到多個Queue,不同訂閱組消費不同的Queue。所以支援多訂閱時,訊息會多個拷貝。
3.2、Kafka
Kafka只支援訊息持久化,消費端為拉模型,消費狀態和訂閱關係由客戶端端負責維護,訊息消費完後不會立即刪除,會保留歷史訊息。因此支援多訂閱時,訊息只會儲存一份就可以了。但是可能產生重複消費的情況。
(1)點對點&多訂閱
釋出者生產一條訊息到topic中,不同訂閱組消費此訊息。
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
JMS(Java Messaging Service)是Java平臺上有關面向訊息中介軟體的技術規範,它便於訊息系統中的Java應用程式進行訊息交換,並且通過提供標準的產生、傳送、接收訊息的介面簡化企業應用的開發。
JMS類似與JDBC,sun提供介面,由各個廠商(provider)來進行具體的實現。市面上眾多成熟的JMS規範實現的框架Kafk,RabbitMQ,ActiveMQ,ZeroMQ,RocketMQ等。
JMS的佇列訊息(Queue)傳遞過程如下圖:
對於Queue模式,一個釋出者釋出訊息,下面的接收者按佇列順序接收,比如釋出了10個訊息,兩個接收者A,B那就是A,B總共會收到10條訊息,不重複。
JMS的主題訊息傳遞過程如下圖:
對於Topic模式,一個釋出者釋出訊息,有兩個接收者A,B來訂閱,那麼釋出了10條訊息,A,B各收到10條訊息。
我們從ActiveMQ來實踐:(安裝部署省掉)
Queue模式實踐:
訊息生產者:
public class Sender {
public static void main(String[] args) throws JMSException, InterruptedException {
// ConnectionFactory :連線工廠,JMS 用它建立連線
//61616是ActiveMQ預設埠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
// Connection :JMS 客戶端到JMS Provider 的連線
Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一個傳送或接收訊息的執行緒
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// Destination :訊息的目的地;訊息傳送給誰.
Destination destination = session.createQueue("my-queue");
// MessageProducer:訊息傳送者
MessageProducer producer = session.createProducer(destination);
// 設定不持久化,可以更改
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for(int i=0;i<10;i++){
//建立文字訊息
TextMessage message = session.createTextMessage("hello.I am producer, this is a test message"+i);
Thread.sleep(1000);
//傳送訊息
producer.send(message);
}
session.commit();
session.close();
connection.close();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
訊息接收者
// ConnectionFactory :連線工廠,JMS 用它建立連線
private static ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
public static void main(String[] args) throws JMSException {
// Connection :JMS 客戶端到JMS Provider 的連線
final Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一個傳送或接收訊息的執行緒
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// Destination :訊息的目的地;訊息送誰那獲取.
Destination destination = session.createQueue("my-queue");
// 消費者,訊息接收者
MessageConsumer consumer1 = session.createConsumer(destination);
consumer1.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg) {
try {
TextMessage message = (TextMessage)msg ;
System.out.println("consumerOne收到訊息: "+message.getText());
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
執行之後控制檯不會退出一直監聽訊息庫,對於訊息傳送者的十條資訊,控制輸出:
consumerOne收到訊息: hello.I am producer, this is a test message0
consumerOne收到訊息: hello.I am producer, this is a test message1
consumerOne收到訊息: hello.I am producer, this is a test message2
consumerOne收到訊息: hello.I am producer, this is a test message3
consumerOne收到訊息: hello.I am producer, this is a test message4
consumerOne收到訊息: hello.I am producer, this is a test message5
consumerOne收到訊息: hello.I am producer, this is a test message6
consumerOne收到訊息: hello.I am producer, this is a test message7
consumerOne收到訊息: hello.I am producer, this is a test message8
consumerOne收到訊息: hello.I am producer, this is a test message9
如果此時另外一個執行緒也存在消費者監聽該Queue,則兩者交換輸出,共輸出10條
Topic模式實現
訊息釋出者
public static void main(String[] args) throws JMSException, InterruptedException {
// ConnectionFactory :連線工廠,JMS 用它建立連線
//61616是ActiveMQ預設埠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
// Connection :JMS 客戶端到JMS Provider 的連線
Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一個傳送或接收訊息的執行緒
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// Destination :訊息的目的地;訊息傳送給誰.
//Destination destination = session.createQueue("my-queue");
Destination destination = session.createTopic("STOCKS.myTopic"); //建立topic myTopic
// MessageProducer:訊息傳送者
MessageProducer producer = session.createProducer(destination);
// 設定不持久化,可以更改
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for(int i=0;i<10;i++){
//建立文字訊息
TextMessage message = session.createTextMessage("hello.I am producer, this is a test message"+i);
//傳送訊息
producer.send(message);
}
session.commit();
session.close();
connection.close();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
訊息訂閱者
private static ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
public void run() {
// Connection :JMS 客戶端到JMS Provider 的連線
try {
final Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一個傳送或接收訊息的執行緒
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// Destination :訊息的目的地;訊息送誰那獲取.
// Destination destination = session.createQueue("my-queue");
Destination destination = session.createTopic("STOCKS.myTopic"); // 建立topic
// myTopic
// 消費者,訊息接收者
MessageConsumer consumer1 = session.createConsumer(destination);
consumer1.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
try {
TextMessage message = (TextMessage) msg;
System.out.println("consumerOne收到訊息: " + message.getText());
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 再來一個消費者,訊息接收者
MessageConsumer consumer2 = session.createConsumer(destination);
consumer2.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
try {
TextMessage message = (TextMessage) msg;
System.out.println("consumerTwo收到訊息: " + message.getText());
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (Exception e) {
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
最後訊息會重複輸出:
consumerOne收到訊息: hello.I am producer, this is a test message0
consumerTwo收到訊息: hello.I am producer, this is a test message0
consumerOne收到訊息: hello.I am producer, this is a test message1
consumerTwo收到訊息: hello.I am producer, this is a test message1
consumerOne收到訊息: hello.I am producer, this is a test message2
consumerTwo收到訊息: hello.I am producer, this is a test message2
consumerOne收到訊息: hello.I am producer, this is a test message3
consumerTwo收到訊息: hello.I am producer, this is a test message3
consumerOne收到訊息: hello.I am producer, this is a test message4
consumerTwo收到訊息: hello.I am producer, this is a test message4
consumerOne收到訊息: hello.I am producer, this is a test message5
consumerTwo收到訊息: hello.I am producer, this is a test message5
consumerOne收到訊息: hello.I am producer, this is a test message6
consumerTwo收到訊息: hello.I am producer, this is a test message6
consumerOne收到訊息: hello.I am producer, this is a test message7
consumerTwo收到訊息: hello.I am producer, this is a test message7
consumerOne收到訊息: hello.I am producer, this is a test message8
consumerTwo收到訊息: hello.I am producer, this is a test message8
consumerOne收到訊息: hello.I am producer, this is a test message9
我們簡單總結一下使用MQ的過程:
- 1.建立與MQ的連結
- 2.建立訊息的目的地或者來原地即Destination
- 3.傳送訊息或者制定對應的MessageListener
上述就是關於MQ兩種訊息模型的簡單應用,至於具體的細節。如在消費者監聽訊息時有哪些Listener型別,生產者傳送訊息時有哪些Message型別。生成Session時引數1表示是否開啟事務,至於事務的處理,訊息的持久化等等。後面慢慢介紹。
轉載:http://blog.csdn.net/canot/article/details/53572400