消息中間件JMS
一.什麽是消息中間件
消息中間件利用高效可靠的消息傳遞機制進行平臺無關的數據交流,並基於數據通信來進行分布式系統的集成。通過提供消息傳遞和消息排隊模型,它可以在分布式環境下擴展進程間的通信。
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規範的 JMS Provider實現。
對於消息的傳遞有兩種類型:
一種是點對點的,即一個生產者和一個消費者一一對應;
另一種是發布/ 訂閱模式,即一個生產者產生消息並進行發送後,可以由多個消費者進行接收。
二.ActiveMQ 下載與安裝
1.官方網站下載:http://activemq.apache.org/
2.centos上安裝ActiveMQ
1)將apache-activemq-5.12.0-bin.tar.gz 上傳至服務器
2)解壓
tar zxvf apache-activemq-5.12.0-bin.tar.gz
3)為apache-activemq-5.12.0目錄賦權
chmod 777 apache-activemq-5.12.0
4)進入apache-activemq-5.12.0\bin目錄,賦與執行權限
chmod 755 activemq
3.啟動
./activemq start
啟動成功出現如下信息
4.打開瀏覽器輸入服務器地址
http://192.168.56.102:8161
即可訪問管理界面
點擊Manage ActiveMQ broker登錄,登錄名和密碼均為admin
5.點對點消息列表Queues
Number Of Pending Messages :等待消費的消息 這個是當前未出隊列的數量。
Number Of Consumers :消費者 這個是消費者端的消費者數量
Messages Enqueued :進入隊列的消息 進入隊列的總數量,包括出隊列的。
Messages Dequeued :出了隊列的消息 可以理解為是消費這消費掉的數量。
三.測試案例
1.點對點模式
1.1消息生產者
1)創建Maven工程,引入依賴
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.13.4</version> </dependency>
2)創建類QueueProducer main方法代碼如下:
//1.創建連接工廠 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616"); //2.獲取連接 Connection connection = connectionFactory.createConnection(); //3.啟動連接 connection.start(); //4.獲取session (參數1:是否啟動事務,參數2:消息確認模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.創建隊列對象 Queue queue = session.createQueue("test-queue"); //6.創建消息生產者 MessageProducer producer = session.createProducer(queue); //7.創建消息 TextMessage textMessage = session.createTextMessage("歡迎來到ActiveMQ世界"); //8.發送消息 producer.send(textMessage); //9.關閉資源 producer.close(); session.close(); connection.close();
上述代碼中第4步創建session 的兩個參數:
第2個參數 消息的確認模式
- AUTO_ACKNOWLEDGE = 1 自動確認
- CLIENT_ACKNOWLEDGE = 2 客戶端手動確認
- DUPS_OK_ACKNOWLEDGE = 3 自動批量確認
- SESSION_TRANSACTED = 0 事務提交並確認
運行後通過ActiveMQ管理界面查詢
1.2 消息消費者
創建類QueueConsumer ,main方法代碼如下:
//1.創建連接工廠 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.56.102:61616"); //2.獲取連接 Connection connection = connectionFactory.createConnection(); //3.啟動連接 connection.start(); //4.獲取session (參數1:是否啟動事務,參數2:消息確認模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.創建隊列對象 Queue queue = session.createQueue("test-queue"); //6.創建消息消費 MessageConsumer consumer = session.createConsumer(queue); //7.監聽消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待鍵盤輸入 System.in.read(); //9.關閉資源 consumer.close(); session.close(); connection.close();
執行後看到控制臺輸出:歡迎來到神奇的ActiveMq世界
1.3 運行測試
同時開啟2個以上的消費者,再次運行生產者,觀察每個消費者控制臺的輸出,會發現只有一個消費者會接收到消息。
2.發布訂閱模式
2.1消息生產者
創建類TopicProducer ,main方法代碼如下:
//1.創建連接工廠 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.56.102:61616"); //2.獲取連接 Connection connection = connectionFactory.createConnection(); //3.啟動連接 connection.start(); //4.獲取session (參數1:是否啟動事務,參數2:消息確認模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.創建主題對象 Topic topic = session.createTopic("test-topic"); //6.創建消息生產者 MessageProducer producer = session.createProducer(topic); //7.創建消息 TextMessage textMessage = session.createTextMessage("歡迎來到神奇的ActiveMq世界"); //8.發送消息 producer.send(textMessage); //9.關閉資源 producer.close(); session.close(); connection.close();
2.2消息消費者
創建類TopicConsumer ,main方法代碼如下:
//1.創建連接工廠 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.56.102:61616"); //2.獲取連接 Connection connection = connectionFactory.createConnection(); //3.啟動連接 connection.start(); //4.獲取session (參數1:是否啟動事務,參數2:消息確認模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.創建主題對象 //Queue queue = session.createQueue("test-queue"); Topic topic = session.createTopic("test-topic"); //6.創建消息消費 MessageConsumer consumer = session.createConsumer(topic); //7.監聽消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待鍵盤輸入 System.in.read(); //9.關閉資源 consumer.close(); session.close(); connection.close();
2.3 運行測試
同時開啟2個以上的消費者,再次運行生產者,觀察每個消費者控制臺的輸出,會發現每個消費者會接收到消息。
消息中間件JMS