ActiveMQ生產者消費者編碼
阿新 • • 發佈:2020-08-11
匯入依賴
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.16.0</version> </dependency> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>4.0</version> </dependency>
一個簡單的生產者
public class JmsProduce { // linux 上部署的activemq 的 IP 地址 + activemq 的埠號 public static final String ACTIVEMQ_URL = "tcp://192.168.17.3:61616"; public static final String QUEUE_NAME = "queue01"; public static void main(String[] args) throws Exception{ // 1 按照給定的url建立連線工產,這個構造器採用預設的使用者名稱密碼 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 2 通過連線工廠連線 connection 和 啟動 javax.jms.Connection connection = activeMQConnectionFactory.createConnection(); // 啟動 connection.start(); // 3 建立回話 session // 兩個引數,第一個事務, 第二個簽收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 4 建立目的地 (兩種 : 佇列/主題 這裡用佇列) Queue queue = session.createQueue(QUEUE_NAME); // 5 建立訊息的生產者 MessageProducer messageProducer = session.createProducer(queue); // 6 通過messageProducer 生產 3 條 訊息傳送到訊息佇列中 for (int i = 1; i < 4 ; i++) { // 7 建立字訊息 TextMessage textMessage = session.createTextMessage("msg--" + i); // 8 通過messageProducer釋出訊息 messageProducer.send(textMessage); } // 9 關閉資源 messageProducer.close(); session.close(); connection.close(); System.out.println(" **** 訊息傳送到MQ完成 ****"); } }
在頁面上顯示
與之對應的訊息消費者
public class JmsConsumer { public static final String ACTIVEMQ_URL = "tcp://192.168.17.3:61616"; public static final String QUEUE_NAME = "queue01"; // 1對1 的佇列 public static void main(String[] args) throws Exception{ // 1 按照給定的url建立連線工程,這個構造器採用預設的使用者名稱密碼 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 2 通過連線工廠連線 connection 和 啟動 javax.jms.Connection connection = activeMQConnectionFactory.createConnection(); // 啟動 connection.start(); // 3 建立回話 session // 兩個引數,第一個事務, 第二個簽收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 4 建立目的地 (兩種 : 佇列/主題 這裡用佇列) Queue queue = session.createQueue(QUEUE_NAME); // 5 建立訊息的消費者 MessageConsumer messageConsumer = session.createConsumer(queue); while(true){ // 這裡是 TextMessage 是因為訊息傳送者是 TextMessage , 接受處理的 // 也應該是這個型別的訊息 //receive()方法 帶參receive(4000L)超時4秒就會走 TextMessage message = (TextMessage)messageConsumer.receive(); if (null != message){ System.out.println("****消費者的訊息:"+message.getText()); }else { break; } } messageConsumer.close(); session.close(); connection.close(); } }
這個代表有一個訊息消費者處理訊息,並且處理了三條訊息
先啟動兩個消費者,再生產6條訊息,請問消費情況如何?
一人一半
activemq 好像自帶負載均衡,當先啟動兩個佇列(Queue)的消費者時,在啟動生產者發出訊息,此時的訊息平均的被兩個消費者消費。 並且消費者不會消費已經被消費的訊息(即為已經出隊的訊息)