訊息佇列MQ實踐----實現Queue(佇列訊息)和Topic(主題訊息)兩種模式
阿新 • • 發佈:2019-02-10
之前有篇檔案介紹了生產消費者模式(http://blog.csdn.net/canot/article/details/51541920 ),當時是通過BlockingQueue阻塞佇列來實現,以及在Redis中使用pub/sub模式(http://blog.csdn.net/canot/article/details/51938955)。而實際專案中往往是通過JMS使用訊息佇列來實現這兩種模式的。
JMS(Java Messaging Service)是Java平臺上有關面向訊息中介軟體的技術規範,它便於訊息系統中的Java應用程式進行訊息交換,並且通過提供標準的產生、傳送、接收訊息的介面簡化企業應用的開發。
JMS類似與JDBC,sun提供介面,由各個廠商(provider)來進行具體的實現。市面上眾多成熟的JMS規範實現的框架Kafk,RabbitMQ,ActiveMQ,ZeroMQ,RocketMQ等。
JMS的佇列訊息(Queue)傳遞過程如下圖:
這裡寫圖片描述
對於Queue模式,一個釋出者釋出訊息,下面的接收者按佇列順序接收,比如釋出了10個訊息,兩個接收者A,B那就是A,B總共會收到10條訊息,不重複。
JMS的主題訊息傳遞過程如下圖:
這裡寫圖片描述
對於Topic模式,一個釋出者釋出訊息,有兩個接收者A,B來訂閱,那麼釋出了10條訊息,A,B各收到10條訊息。
我們從ActiveMQ來實踐:(安裝部署省掉)
Queue模式實踐:
訊息生產者:
public class Sender {
public static void main(String[] args) throws JMSException, InterruptedException {
// ConnectionFactory :連線工廠,JMS 用它建立連線
//61616是ActiveMQ預設埠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
);
// Connection :JMS 客戶端到JMS Provider 的連線
Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一個傳送或接收訊息的執行緒
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// Destination :訊息的目的地;訊息傳送給誰.
Destination destination = session.createQueue();
// MessageProducer:訊息傳送者
MessageProducer producer = session.createProducer(destination);
// 設定不持久化,可以更改
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for(int i=0;i<10;i++){
//建立文字訊息
TextMessage message = session.createTextMessage(+i);
Thread.sleep(1000);
//傳送訊息
producer.send(message);
}
session.commit();
session.close();
connection.close();
}
}
訊息接收者
// ConnectionFactory :連線工廠,JMS 用它建立連線
private static ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, );
public static void main(String[] args) throws JMSException {
// Connection :JMS 客戶端到JMS Provider 的連線
final Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一個傳送或接收訊息的執行緒
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// Destination :訊息的目的地;訊息送誰那獲取.
Destination destination = session.createQueue();
// 消費者,訊息接收者
MessageConsumer consumer1 = session.createConsumer(destination);
consumer1.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg) {
try {
TextMessage message = (TextMessage)msg ;
System.out.println(+message.getText());
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
執行之後控制檯不會退出一直監聽訊息庫,對於訊息傳送者的十條資訊,控制輸出:
consumerOne收到訊息: hello.I am producer, this is a test message0
consumerOne收到訊息: hello.I am producer, this is a test message1
consumerOne收到訊息: hello.I am producer, this is a test message2
consumerOne收到訊息: hello.I am producer, this is a test message3
consumerOne收到訊息: hello.I am producer, this is a test message4
consumerOne收到訊息: hello.I am producer, this is a test message5
consumerOne收到訊息: hello.I am producer, this is a test message6
consumerOne收到訊息: hello.I am producer, this is a test message7
consumerOne收到訊息: hello.I am producer, this is a test message8
consumerOne收到訊息: hello.I am producer, this is a test message9
如果此時另外一個執行緒也存在消費者監聽該Queue,則兩者交換輸出,共輸出10條
Topic模式實現
訊息釋出者
public static void main(String[] args) throws JMSException, InterruptedException {
// ConnectionFactory :連線工廠,JMS 用它建立連線
//61616是ActiveMQ預設埠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
);
// Connection :JMS 客戶端到JMS Provider 的連線
Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一個傳送或接收訊息的執行緒
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// Destination :訊息的目的地;訊息傳送給誰.
//Destination destination = session.createQueue("my-queue");
Destination destination = session.createTopic(); //建立topic myTopic
// MessageProducer:訊息傳送者
MessageProducer producer = session.createProducer(destination);
// 設定不持久化,可以更改
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for(int i=0;i<10;i++){
//建立文字訊息
TextMessage message = session.createTextMessage(+i);
//傳送訊息
producer.send(message);
}
session.commit();
session.close();
connection.close();
}
訊息訂閱者
private static ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, );
public void run() {
// Connection :JMS 客戶端到JMS Provider 的連線
try {
final Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一個傳送或接收訊息的執行緒
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// Destination :訊息的目的地;訊息送誰那獲取.
// Destination destination = session.createQueue("my-queue");
Destination destination = session.createTopic(); // 建立topic
// myTopic
// 消費者,訊息接收者
MessageConsumer consumer1 = session.createConsumer(destination);
consumer1.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
try {
TextMessage message = (TextMessage) msg;
System.out.println( + message.getText());
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 再來一個消費者,訊息接收者
MessageConsumer consumer2 = session.createConsumer(destination);
consumer2.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
try {
TextMessage message = (TextMessage) msg;
System.out.println( + message.getText());
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (Exception e) {
}
}
最後訊息會重複輸出:
consumerOne收到訊息: hello.I am producer, this is a test message0
consumerTwo收到訊息: hello.I am producer, this is a test message0
consumerOne收到訊息: hello.I am producer, this is a test message1
consumerTwo收到訊息: hello.I am producer, this is a test message1
consumerOne收到訊息: hello.I am producer, this is a test message2
consumerTwo收到訊息: hello.I am producer, this is a test message2
consumerOne收到訊息: hello.I am producer, this is a test message3
consumerTwo收到訊息: hello.I am producer, this is a test message3
consumerOne收到訊息: hello.I am producer, this is a test message4
consumerTwo收到訊息: hello.I am producer, this is a test message4
consumerOne收到訊息: hello.I am producer, this is a test message5
consumerTwo收到訊息: hello.I am producer, this is a test message5
consumerOne收到訊息: hello.I am producer, this is a test message6
consumerTwo收到訊息: hello.I am producer, this is a test message6
consumerOne收到訊息: hello.I am producer, this is a test message7
consumerTwo收到訊息: hello.I am producer, this is a test message7
consumerOne收到訊息: hello.I am producer, this is a test message8
consumerTwo收到訊息: hello.I am producer, this is a test message8
consumerOne收到訊息: hello.I am producer, this is a test message9
我們簡單總結一下使用MQ的過程:
1.建立與MQ的連結
2.建立訊息的目的地或者來原地即Destination
3.傳送訊息或者制定對應的MessageListener
上述就是關於MQ兩種訊息模型的簡單應用,至於具體的細節。如在消費者監聽訊息時有哪些Listener型別,生產者傳送訊息時有哪些Message型別。生成Session時引數1表示是否開啟事務,至於事務的處理,訊息的持久化等等。後面慢慢介紹。