ActiveMQ——2.訊息的接收方式以及選擇器
阿新 • • 發佈:2019-01-29
訊息的接收方式
概述
訊息有兩種接收方式:同步接收和非同步接收。 同步接收:主執行緒阻塞式等待下一個訊息的到來,可以設定timeout,超時則返回null。 非同步接收:主執行緒設定MessageListener,然後繼續做自己的事,子執行緒負責監聽。同步接收又稱為阻塞式接收;非同步接收又稱為事件驅動的接收。
API
同步接收,是在獲取MessageConsumer例項之後,呼叫以下的API:- receive():Message
獲取下一個訊息。這個呼叫將導致無限期的阻塞,直到有新的訊息產生。 - receive(long timeout):Message
獲取下一個訊息。這個呼叫可能導致一段時間的阻塞,直到超時或者有新的訊息產生。超時則返回null。 - receiveNoWait():Message
獲取下一個訊息。這個呼叫不會導致阻塞,如果沒有下一個訊息,直接返回null。
- setMessageListener(MessageListener):void
設定訊息監聽器
- onMessage(Message message):void
這是一個回撥方法,當有新的訊息產生,這個方法會被自動呼叫。
核心程式碼
同步接收
long timeout = 10 * 1000;
for (Message message = consumer.receive(timeout); message != null; message = consumer
.receive(timeout)) {
String text = ((TextMessage) message).getText();
System.out.println(String.format("receive a message:%s", text));
}
非同步接收
consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { try { String text = ((TextMessage) message).getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } });
訊息選擇器
概述
訊息選擇器使用類似於SQL語法,為Consumer指定基於Message屬性的篩選條件。傳送的時候,給訊息新增一些屬性;在接收的時候,根據屬性進行過濾。API
javax.jms.Message提供了一系列的方法,用於設定屬性:- setIntProperty(String name, int value):void
設定int型別的屬性 - setStringProperty(String name, String value):void
設定字串型別的屬性 - setObjectProperty(String name, Object value):void
設定物件型別的屬性
- createConsumer(Destination destination, String messageSelector):MessageConsumer
為指定的destination建立一個consumer。只有當訊息的屬性滿足選擇器的表示式時,才會被接收;選擇器為null或者空字串時,表示沒有選擇器。
選擇器語法
選擇器是字串型別,官方文件說,選擇器的語法類似於SQL的語法。以下僅對int、String型別舉例:// 假如有int型別的屬性order,而我們接收order>10的訊息
String messageSelector = "order > 10";
// 假如有String型別的屬性kind,而我們接收kind="06"的訊息
String messageSelector = "kind='06'";
核心程式碼
final int NUM = 3;
List<Message> result = new ArrayList<Message>(NUM);
try {
for (int i = 0; i < NUM; i++) {
TextMessage message = session.createTextMessage();
message.setText(String.format("This is the %dth message.",
i + 1));
message.setIntProperty("order", i + 1);
result.add(message);
}
} catch (JMSException e) {
e.printStackTrace();
return null;
}
return result;
接收訊息
String messageSelector = "order > 1";
consumer = session.createConsumer(destination, messageSelector);