ActiveMQ5.15 - 入門 demo
阿新 • • 發佈:2018-11-11
第一步:先下載activeMQ http://activemq.apache.org/ 我下載的是5.12,最新
匯入pom依賴:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.3</version> </dependency>
第二部:解壓檔案apache-activemq-5.15.3-bin.zip 後啟動apache-activemq-5.15.3\bin\win64\activemq.bat
第三部:訪問http://localhost:8161/admin/ 提示登陸:輸入使用者:admin,密碼:admin ,檢視你的訊息
接下來說重點,生產者producer和消費者consumer,producer生成訊息放到指定的佇列中,consumer從指定的佇列中消費訊息。
程式碼:
生產者:
package com.test; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { static String NAME = ActiveMQConnection.DEFAULT_USER;//admin static String PSWD = ActiveMQConnection.DEFAULT_PASSWORD;//admin static String URL = "tcp://localhost:61616"; static ConnectionFactory connectionFactory = null; static Connection connection = null; static Session session; static Queue queue; static MessageProducer producer; static Integer index = 0; public static void main(String[] args) { //連結工廠,用來建立連結 connectionFactory = new ActiveMQConnectionFactory(Producer.NAME, Producer.PSWD, Producer.URL); try { //建立連結 connection = connectionFactory.createConnection(); //啟動連結 connection.start(); //建立會話 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //建立列隊 queue = session.createQueue("cmjQueue"); //建立生產者,將資訊生產到queue中 producer = session.createProducer(queue); //設定生成資訊的策略.不序列化,實際看自己的專案業務情況而定,這裡只是學習 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); while(true){ index++; //建立資訊 MapMessage mapMessage = session.createMapMessage(); mapMessage.setString("name", "曹明傑"+index); mapMessage.setString("age", "23"); //傳送資訊 System.out.println("訊息已傳送"); producer.send(mapMessage); Thread.sleep(3000); session.commit();//提交,資訊真正的傳送 } } catch (Exception e) { // TODO: handle exception }finally { try { if (null != connection) connection.close(); //關閉 } catch (Throwable ignore){ } } } }
消費者:
package com.test; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Consumer { static String NAME = ActiveMQConnection.DEFAULT_USER; static String PSWD = ActiveMQConnection.DEFAULT_PASSWORD; static String URL = "tcp://localhost:61616"; static ConnectionFactory connectionFactory = null; static Connection connection = null; static Session session; static Queue queue; static MessageConsumer consumer; public static void main(String[] args) { //建立連結 connectionFactory = new ActiveMQConnectionFactory(NAME, PSWD, URL); try { //建立連結 connection = connectionFactory.createConnection(); //啟動 connection.start(); //建立會話 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //建立目的地即queue queue = session.createQueue("cmjQueue"); //建立消費者 consumer = session.createConsumer(queue); System.out.println("開始消費"); while(true){ //等待3秒向訊息佇列中檢視是否有訊息,有就開始消費 MapMessage receive = (MapMessage)consumer.receive(3000); System.out.println("消費資訊為:"); if(receive!=null){ System.out.println(receive.getString("name")); System.out.println(receive.getString("age")); }else{ } } } catch (Exception e) { // TODO: handle exception } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore){} } } }
我們可以多起幾個執行緒進行消費。並檢視console輸出。
總結:
queue是一對一的,即一個訊息只能被一個consumer消費,但是一個queue可以關聯好多個消費者,這時消費者也要排隊,先來者進行消費然後是下一個消費者。queue中的訊息被consumer申請並消費(自動拉取),息不能被重複消費,
topic是廣播模式。即一對多,多個consumer同時進行消費,不過是topic主動的推送給消費者(自動推送),就好像我們訂閱的新聞資訊,自動推送給使用者檢視,資訊可以重複消費。即一個訊息傳送給多個consumer。