JMS學習四(ActiveMQ消息過濾)
消息的過期、消息的選擇器和消息的優先級。
一、消息的過期
允許消息過期 。默認情況下,消息永不會過期。如果消息在特定周期內失去意義,那麽可以設置過期時間。
有兩種方法設置消息的過期時間,時間單位為毫秒:
1.使用消息生產者的setTimeToLive 方法為所有的消息設置過期時間。
2.使用消息生產者的send 方法為每一條消息設置過期時間。
消息過期時間,send 方法中的 timeToLive 值加上發送時刻的 GMT 時間值。如果 timeToLive 值等於零,則 JMSExpiration 被設為零, 表示該消息永不過期。
3、消息服務器接收到消息後,在指定的時間後,會從隊列中移除指定的消息,超時被移除的消息不會發送給消費者。
4、使用消息生產者的setTimeToLive(long time ) 方法來給所有的消息設置過期時間:
// 消息生產者 MessageProducer producer = null; producer = session.createProducer(queue); // 消息是否為持久性的,如果不設置默認是持久化的。 producer.setDeliveryMode(DeliveryMode.PERSISTENT); //消息過期設置 producer.setTimeToLive(1000);
5、使用消息生產者的send()方法來設置消息的過期時間
//message發送的消息,deliveryMode是否持久化,priority優先級,timeToLive 消息過期時間//producer.send(message, deliveryMode, priority, timeToLive); producer.send(message, DeliveryMode.PERSISTENT, 4, 1000);
這裏在插一段吧,上面設置消息過期的都是消息生產者這方的來設置的,也就是如果不滿足條件則消息服務器會把消息從消息隊列中刪除,但是我們也可以在消息消費端來設置接受時間(僅限於同步接受)
Message message = consumer.receive(2);
就是在接受的時候添加等待時間(單位是毫秒)如果在指定的時間內獲取不到消息則不會再等了。如果不設置等待時間則一直等待直到接收到消息或超時為止。
二、消息的選擇器
不管是在消息發送端設置消息過期時間還是在接收端設置等待時間,都是對不滿足的消息有過濾的作用,那消息選擇器就是為過濾消息而生的下面來看看消息選擇器:
ActiveMQ提供了一種機制,使用它,消息服務可根據消息選擇器中的標準來執行消息過濾。生產者可在消息中放入應用程序特有的屬性,而消費者可使用基於這些屬性的選擇標準來表明對消息是否感興趣。這就簡化了客戶端的工作,並避免了向不需要這些消息的消費者傳送消息的開銷。然而,它也使得處理選擇標準的消息服務增加了一些額外開銷。 消息選擇器是用於MessageConsumer的過濾器,可以用來過濾傳入消息的屬性和消息頭部分(但不過濾消息體),並確定是否將實際消費該消息。消息選擇器是一些字符串,它們基於某種語法,而這種語法是SQL-92的子集。可以將消息選擇器作為MessageConsumer 創建的一部分。
1、消息生產者:
package mqtest3; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { // 單例模式 // 1、連接工廠 private ConnectionFactory connectionFactory; // 2、連接對象 private Connection connection; // 3、Session對象 private Session session; // 4、生產者 private MessageProducer messageProducer; public Producer() { try { this.connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://127.0.0.1:61616"); this.connection = connectionFactory.createConnection(); this.connection.start(); // 設置自動簽收模式 this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); this.messageProducer = this.session.createProducer(null); } catch (JMSException e) { throw new RuntimeException(e); } } public Session getSession() { return this.session; } public void send1(/* String QueueName, Message message */) { try { Destination destination = this.session.createQueue("first"); MapMessage msg1 = this.session.createMapMessage(); msg1.setString("name", "張三"); msg1.setInt("age", 20); // 設置用於消息過濾器的條件 msg1.setStringProperty("name", "張三"); msg1.setIntProperty("age", 20); msg1.setStringProperty("color", "bule"); MapMessage msg2 = this.session.createMapMessage(); msg2.setString("name", "李四"); msg2.setInt("age", 25); // 設置用於消息過濾器的條件 msg2.setStringProperty("name", "李四"); msg2.setIntProperty("age", 25); msg2.setStringProperty("color", "white"); MapMessage msg3 = this.session.createMapMessage(); msg3.setString("name", "趙六"); msg3.setInt("age", 30); // 設置用於消息過濾器的條件 msg3.setStringProperty("name", "趙六"); msg3.setIntProperty("age", 30); msg3.setStringProperty("color", "black"); // 發送消息 this.messageProducer.send(destination, msg1, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10); this.messageProducer.send(destination, msg2, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10); this.messageProducer.send(destination, msg3, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10); } catch (JMSException e) { throw new RuntimeException(e); } } public void send2() { try { Destination destination = this.session.createQueue("first"); TextMessage message = this.session.createTextMessage("我是一個字符串"); message.setIntProperty("age", 25); // 發送消息 this.messageProducer.send(destination, message, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10); } catch (JMSException e) { throw new RuntimeException(e); } } public static void main(String[] args) { Producer producer = new Producer(); producer.send1(); // producer.send2(); } }
2、消息消費者:
package mqtest3; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Conmuser { // 單例模式 // 1、連接工廠 private ConnectionFactory connectionFactory; // 2、連接對象 private Connection connection; // 3、Session對象 private Session session; // 4、生產者 private MessageConsumer messageConsumer; // 5、目的地址 private Destination destination; // 消息選擇器 public final String SELECTOR_1 = "age > 25"; public final String SELECTOR_2 = " age > 20 and color=‘black‘"; public Conmuser() { try { this.connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://127.0.0.1:61616"); this.connection = connectionFactory.createConnection(); this.connection.start(); // 設置自動簽收模式 this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); this.destination = this.session.createQueue("first"); // 在構造消費者的時候,指定了 消息選擇器 // 有選擇性的消費消息 this.messageConsumer = this.session.createConsumer(destination, SELECTOR_1); } catch (JMSException e) { throw new RuntimeException(e); } } public Session getSession() { return this.session; } // 用於監聽消息隊列的消息 class MyLister implements MessageListener { @Override public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage ret = (TextMessage) message; System.out.println("results;" + ret.getText()); } if (message instanceof MapMessage) { MapMessage ret = (MapMessage) message; System.out.println(ret.toString()); System.out.println(ret.getString("name")); System.out.println(ret.getInt("age")); } } catch (JMSException e) { throw new RuntimeException(e); } } } // 用於異步監聽消息 public void receiver() { try { this.messageConsumer.setMessageListener(new MyLister()); } catch (JMSException e) { throw new RuntimeException(e); } } public static void main(String[] args) { Conmuser conmuser = new Conmuser(); conmuser.receiver(); } }
上面的demo是對MapMessage和TextMessage兩種消息的過濾條件的設置和消費,過濾條件的設置使在消息的屬性中設置,而消費消息的時候直接是在session創建MessageConsumer時傳入的參數即過濾條件(過濾條件的寫法和SQL的寫法是很像的)
在寫過濾條件的時候要註意設置的是什麽類型的條件即: int 、string 如果是int 則加引號而如果是String則要加哦!!!
三、消息的優先級
通常,可以確保將單個會話向目標發送的所有消息按其發送順序傳送至消費者。然而,如果為這些消息分配了不同的優先級,消息傳送系統將首先嘗試傳送優先級較高的消息。
有兩種方法設置消息的優先級:
1.使用 setPriority 方法,這樣所有的消息都采用此傳送模式;
2.使用 send 方法為每一條消息設置傳送模式;
消息優先級從 0-9 十個級別,0-4 是普通消息,5-9 是加急消息。如果不指定優先級,則默認為 4。JMS 不要求嚴格按照這十個優先級發送消息,但必須保證加急消息要先於普通消息到達。
JMS學習四(ActiveMQ消息過濾)