ActiveMQ Topic 釋出訂閱
阿新 • • 發佈:2018-12-24
Publish:
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Publish { //1連線工廠 private ConnectionFactory connectionFactory; //2連線物件 private Connection connection; //3Session物件 private Session session; //4生產者 private MessageProducer messageProducer; public Publish(){ try { this.connectionFactory = new ActiveMQConnectionFactory( "admin", "1234", "tcp://localhost:61616"); this.connection = connectionFactory.createConnection(); this.connection.start(); this.session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); this.messageProducer = session.createProducer(null); } catch (JMSException e) { e.printStackTrace(); } } public Session getSession(){ return this.session; } public static void main(String[] args) throws Exception { Publish p = new Publish(); p.sender1(); } public void sender1() throws Exception{ Destination destination = this.session.createTopic("topic1"); TextMessage textMessage = session.createTextMessage("我是內容"); messageProducer.send(destination,textMessage); if(connection!=null){ connection.close(); } } }
Consumer1:
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Consumer1 { public final String SELECTOR_0 = "age > 25"; public final String SELECTOR_1 = "color = 'blue'"; public final String SELECTOR_2 = "color = 'blue' and sal > 2000"; public final String SELECTOR_3 = "receiver = 'A'"; // 1.連線工廠 private ConnectionFactory connectionFactory; // 2.連線物件 private Connection connection; // Session物件 private Session session; // 4.消費者 private MessageConsumer messageConsumer; // 5.目標地址 private Destination destination; //構造器中初始化四個物件 public Consumer1() { this.connectionFactory = new ActiveMQConnectionFactory( "admin", "1234", "tcp://localhost:61616" ); try { this.connection = this.connectionFactory.createConnection(); this.connection.start(); this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); this.destination = this.session.createTopic("topic1"); //消費者 this.messageConsumer = this.session.createConsumer(this.destination); } catch (JMSException e) { e.printStackTrace(); } } public void receiver() { try { this.messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if(message instanceof TextMessage){ TextMessage m = (TextMessage) message; try { System.out.println("c1收到訊息:--------------------"); System.out.println(m.getText()); } catch (Exception e) { e.printStackTrace(); } } } }); } catch (JMSException e) { e.printStackTrace(); } } public static void main(String[] args) { Consumer1 consumer = new Consumer1(); consumer.receiver(); } }
Consumer2:
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Consumer2 { public final String SELECTOR_0 = "age > 25"; public final String SELECTOR_1 = "color = 'blue'"; public final String SELECTOR_2 = "color = 'blue' and sal > 2000"; public final String SELECTOR_3 = "receiver = 'A'"; // 1.連線工廠 private ConnectionFactory connectionFactory; // 2.連線物件 private Connection connection; // Session物件 private Session session; // 4.消費者 private MessageConsumer messageConsumer; // 5.目標地址 private Destination destination; //構造器中初始化四個物件 public Consumer2() { this.connectionFactory = new ActiveMQConnectionFactory( "admin", "1234", "tcp://localhost:61616" ); try { this.connection = this.connectionFactory.createConnection(); this.connection.start(); this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); this.destination = this.session.createTopic("topic1"); //消費者 this.messageConsumer = this.session.createConsumer(this.destination); } catch (JMSException e) { e.printStackTrace(); } } public void receiver() { try { this.messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if(message instanceof TextMessage){ TextMessage m = (TextMessage) message; try { System.out.println("c2收到訊息:--------------------"); System.out.println(m.getText()); } catch (Exception e) { e.printStackTrace(); } } } }); } catch (JMSException e) { e.printStackTrace(); } } public static void main(String[] args) { Consumer2 consumer = new Consumer2(); consumer.receiver(); } }