MessageConsumer的訊息選擇器
阿新 • • 發佈:2018-12-24
Producer:
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author jeffSheng * 2018年7月3日 */ public class Producer { //1連線工廠 private ConnectionFactory connectionFactory; //2連線物件 private Connection connection; //3Session物件 private Session session; //4生產者 private MessageProducer messageProducer; public Producer(){ try { this.connectionFactory = new ActiveMQConnectionFactory( "admin", "1234", "tcp://localhost:61616"); this.connection = connectionFactory.createConnection(); this.connection.start(); this.session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); this.messageProducer = session.createProducer(null); } catch (JMSException e) { e.printStackTrace(); } } public Session getSession(){ return this.session; } public static void main(String[] args) throws Exception { Producer producer = new Producer(); producer.sender1(); } public void sender1() throws Exception{ Destination destination = this.session.createQueue("first"); MapMessage msg1 = this.session.createMapMessage(); msg1.setString("name", "張三"); msg1.setString("age", "23"); msg1.setStringProperty("color", "blue"); msg1.setIntProperty("sal", 2200); //int id = 1; //msg1.setInt("id", id); //String receiver = id % 2 == 0 ? "A" : "B"; //msg1.setStringProperty("receiver", receiver); MapMessage msg2 = this.session.createMapMessage(); msg2.setString("name", "李四"); msg2.setString("age", "26"); msg2.setStringProperty("color", "red"); msg2.setIntProperty("sal", 1300); //id = 2; //msg1.setInt("id", id); //receiver = id % 2 == 0 ? "A" : "B"; //msg2.setStringProperty("receiver", receiver); MapMessage msg3 = this.session.createMapMessage(); msg3.setString("name", "王五"); msg3.setString("age", "28"); msg3.setStringProperty("color", "green"); msg3.setIntProperty("sal", 1500); //id = 3; //msg3.setInt("id", id); //receiver = id % 2 == 0 ? "A" : "B"; //msg3.setStringProperty("receiver", receiver); MapMessage msg4 = this.session.createMapMessage(); msg4.setString("name", "趙六"); msg4.setString("age", "30"); msg4.setStringProperty("color", "blue"); msg4.setIntProperty("sal", 1800); //id = 4; //msg4.setInt("id", id); //receiver = id % 2 == 0 ? "A" : "B"; //msg4.setStringProperty("receiver", receiver); this.messageProducer.send(destination,msg1,DeliveryMode.NON_PERSISTENT,2,1000*60*10); this.messageProducer.send(destination,msg2,DeliveryMode.NON_PERSISTENT,3,1000*60*10); this.messageProducer.send(destination,msg3,DeliveryMode.NON_PERSISTENT,6,1000*60*10); this.messageProducer.send(destination,msg4,DeliveryMode.NON_PERSISTENT,9,1000*60*10); if(connection!=null){ connection.close(); } } }
Consumer:
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; public class Consumer { public final String SELECTOR_0 = "age > 25"; public final String SELECTOR_1 = "color = 'blue'"; public final String SELECTOR_2 = "color = 'blue' and sal > 2000"; public final String SELECTOR_3 = "receiver = 'A'"; // 1.連線工廠 private ConnectionFactory connectionFactory; // 2.連線物件 private Connection connection; // Session物件 private Session session; // 4.消費者 private MessageConsumer messageConsumer; // 5.目標地址 private Destination destination; //構造器中初始化四個物件 public Consumer() { this.connectionFactory = new ActiveMQConnectionFactory( "admin", "1234", "tcp://localhost:61616" ); try { this.connection = this.connectionFactory.createConnection(); this.connection.start(); this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); this.destination = this.session.createQueue("first"); //消費者 this.messageConsumer = this.session.createConsumer(this.destination); } catch (JMSException e) { e.printStackTrace(); } } public void receiver() { try { this.messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if(message instanceof MapMessage){ MapMessage ret = (MapMessage) message; try { //System.out.println(ret); //System.out.println(ret.toString()); System.out.println(ret.getString("name")+":"+ret.getString("age")); //System.out.println(ret.getStringProperty("name") + ":" + ret.getStringProperty("age")); } catch (Exception e) { e.printStackTrace(); } } } }); } catch (JMSException e) { e.printStackTrace(); } } public static void main(String[] args) { Consumer consumer = new Consumer(); consumer.receiver(); } }