訊息中介軟體JMS
一.什麼是訊息中介軟體
訊息中介軟體利用高效可靠的訊息傳遞機制進行平臺無關的資料交流,並基於資料通訊來進行分散式系統的整合。通過提供訊息傳遞和訊息排隊模型,它可以在分散式環境下擴充套件程序間的通訊。
ActiveMQ 是Apache出品,最流行的,能力強勁的開源訊息匯流排。ActiveMQ 是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實現。
對於訊息的傳遞有兩種型別:
一種是點對點的,即一個生產者和一個消費者一一對應;
另一種是釋出/ 訂閱模式,即一個生產者產生訊息並進行傳送後,可以由多個消費者進行接收。
二.ActiveMQ下載與安裝
1.官方網站下載:http://activemq.apache.org/
2.centos上安裝ActiveMQ
1)將apache-activemq-5.12.0-bin.tar.gz 上傳至伺服器
2)解壓
tar zxvf apache-activemq-5.12.0-bin.tar.gz
3)為apache-activemq-5.12.0目錄賦權
chmod 777 apache-activemq-5.12.0
4)進入apache-activemq-5.12.0\bin目錄,賦與執行許可權
chmod 755 activemq
3.啟動
./activemq start
啟動成功出現如下資訊
4.開啟瀏覽器輸入伺服器地址
http://192.168.56.102:8161
即可訪問管理介面
點選Manage ActiveMQ broker登入,登入名和密碼均為admin
5.點對點訊息列表Queues
Number Of Pending Messages :等待消費的訊息 這個是當前未出佇列的數量。
Number Of Consumers :消費者 這個是消費者端的消費者數量
Messages Enqueued :進入佇列的訊息 進入佇列的總數量,包括出佇列的。
Messages Dequeued :出了佇列的訊息 可以理解為是消費這消費掉的數量。
三.測試案例
1.點對點模式
1.1訊息生產者
1)建立Maven工程,引入依賴
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.13.4</version> </dependency>
2)建立類QueueProducer main方法程式碼如下:
//1.建立連線工廠 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616"); //2.獲取連線 Connection connection = connectionFactory.createConnection(); //3.啟動連線 connection.start(); //4.獲取session (引數1:是否啟動事務,引數2:訊息確認模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.建立佇列物件 Queue queue = session.createQueue("test-queue"); //6.建立訊息生產者 MessageProducer producer = session.createProducer(queue); //7.建立訊息 TextMessage textMessage = session.createTextMessage("歡迎來到ActiveMQ世界"); //8.傳送訊息 producer.send(textMessage); //9.關閉資源 producer.close(); session.close(); connection.close();
上述程式碼中第4步建立session 的兩個引數:
第2個引數 訊息的確認模式
- AUTO_ACKNOWLEDGE = 1 自動確認
- CLIENT_ACKNOWLEDGE = 2 客戶端手動確認
- DUPS_OK_ACKNOWLEDGE = 3 自動批量確認
- SESSION_TRANSACTED = 0 事務提交併確認
執行後通過ActiveMQ管理介面查詢
1.2 訊息消費者
建立類QueueConsumer ,main方法程式碼如下:
//1.建立連線工廠 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.56.102:61616"); //2.獲取連線 Connection connection = connectionFactory.createConnection(); //3.啟動連線 connection.start(); //4.獲取session (引數1:是否啟動事務,引數2:訊息確認模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.建立佇列物件 Queue queue = session.createQueue("test-queue"); //6.建立訊息消費 MessageConsumer consumer = session.createConsumer(queue); //7.監聽訊息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到訊息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待鍵盤輸入 System.in.read(); //9.關閉資源 consumer.close(); session.close(); connection.close();
執行後看到控制檯輸出:歡迎來到神奇的ActiveMq世界
1.3 執行測試
同時開啟2個以上的消費者,再次執行生產者,觀察每個消費者控制檯的輸出,會發現只有一個消費者會接收到訊息。
2.釋出訂閱模式
2.1訊息生產者
建立類TopicProducer ,main方法程式碼如下:
//1.建立連線工廠 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.56.102:61616"); //2.獲取連線 Connection connection = connectionFactory.createConnection(); //3.啟動連線 connection.start(); //4.獲取session (引數1:是否啟動事務,引數2:訊息確認模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.建立主題物件 Topic topic = session.createTopic("test-topic"); //6.建立訊息生產者 MessageProducer producer = session.createProducer(topic); //7.建立訊息 TextMessage textMessage = session.createTextMessage("歡迎來到神奇的ActiveMq世界"); //8.傳送訊息 producer.send(textMessage); //9.關閉資源 producer.close(); session.close(); connection.close();
2.2訊息消費者
建立類TopicConsumer ,main方法程式碼如下:
//1.建立連線工廠 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.56.102:61616"); //2.獲取連線 Connection connection = connectionFactory.createConnection(); //3.啟動連線 connection.start(); //4.獲取session (引數1:是否啟動事務,引數2:訊息確認模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.建立主題物件 //Queue queue = session.createQueue("test-queue"); Topic topic = session.createTopic("test-topic"); //6.建立訊息消費 MessageConsumer consumer = session.createConsumer(topic); //7.監聽訊息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到訊息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待鍵盤輸入 System.in.read(); //9.關閉資源 consumer.close(); session.close(); connection.close();
2.3 執行測試
同時開啟2個以上的消費者,再次執行生產者,觀察每個消費者控制檯的輸出,會發現每個消費者會接收到訊息。