訊息中介軟體 與 JMS的原生寫法
阿新 • • 發佈:2018-11-16
第一部分: 點對點Queue佇列模式
queque模式特點:訊息持久化,只要消費者上線就可以消費
原生的生產者步驟總結:
1 new一個ActiveMQConnectionFactory工廠跟安裝有訊息中介軟體的連線上
2 通過連線物件獲取session
3 通過session封裝目的地
4 通過session封裝訊息
5 關閉資源持久化
原生消費者步驟總結:
前面的步驟一模一樣,只是他拿到session後,用session1 封裝目的地 ,
2 通過session獲取監控物件MessageListener拿到message
第一步:導包
匯入訊息中介軟體的包
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.13.4</version> </dependency>
第二步:直接擼程式碼
生產者productor
package queuejms; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class QueueProduct { public static void main(String[] args) throws JMSException { //1 建立工廠 ConnectionFactory factory=new ActiveMQConnectionFactory("tcp://192.168.5.111:61616"); //2 獲取連線物件 Connection connection = factory.createConnection(); //3 開啟連線 connection.start(); //4 根據連線物件獲取session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5 根據ssession建立佇列物件,封裝目的地名稱 Queue queue = session.createQueue("queue-product-demo"); //6 根據session建立生產者 MessageProducer producer = session.createProducer(queue); //7 建立併發送訊息 TextMessage textMessage = session.createTextMessage("我的第一個jms,訊息中介軟體"); producer.send(textMessage); //關閉資源 session.close(); connection.close(); } }
消費者consumer
package queuejms; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.io.IOException; public class QueueConsumer { public static void main(String[] args) throws JMSException { //1 建立訊息中介軟體工廠activeMQ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.5.111:61616"); //2 根據工廠獲取連線 Connection connection = factory.createConnection(); //3 開啟連線 connection.start(); //4 根據連線建立session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5 建立佇列物件 Queue queue = session.createQueue("queue-product-demo"); //6 建立消費者 MessageConsumer consumer = session.createConsumer(queue); //7 監聽訊息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { try { TextMessage textMessage= (TextMessage) message; System.out.println("接收到了訊息:"+textMessage.getText()); } catch (Exception e) { e.printStackTrace(); } } }); try { System.in.read(); } catch (IOException e) { e.printStackTrace(); } //關閉資源 session.close(); connection.close(); } }
------------------------------------------------------------------------------------------------------------------------------------------------------------
第二部分: 點對點Topic釋出和訂閱模式
導包後直接擼程式碼
生產者:productor
package topicjms; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class TopicProduct { public static void main(String[] args) throws JMSException { //建立工廠 ConnectionFactory factory=new ActiveMQConnectionFactory("tcp://192.168.5.111:61616"); //獲取連線物件 Connection connection = factory.createConnection(); //開啟連線 connection.start(); //根據連線物件獲取session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //根據ssession建立佇列物件,封裝目的地名稱 Topic topic = session.createTopic("topic-product-demo"); //根據session建立生產者 MessageProducer producer = session.createProducer(topic); //建立併發送訊息 TextMessage textMessage = session.createTextMessage("我的第一個jms,訊息中介軟體,使用topic方式傳送"); producer.send(textMessage); //關閉資源 session.close(); connection.close(); } } 消費者consumer
package topicjms; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.io.IOException; public class TopicConsumer { public static void main(String[] args) throws JMSException { //建立訊息中介軟體工廠activeMQ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.5.111:61616"); //根據工廠獲取連線 Connection connection = factory.createConnection(); //開啟連線 connection.start(); //根據連線建立session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立佇列物件 Topic topic = session.createTopic("topic-product-demo"); //建立消費者 MessageConsumer consumer = session.createConsumer(topic); //監聽訊息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { try { TextMessage textMessage= (TextMessage) message; System.out.println("接收到了訊息:"+textMessage.getText()); } catch (Exception e) { e.printStackTrace(); } } }); try { System.in.read(); } catch (IOException e) { e.printStackTrace(); } //關閉資源 session.close(); connection.close(); } }