1. 程式人生 > >Java中介軟體JMS之ActiveMQ入門

Java中介軟體JMS之ActiveMQ入門

1.ActiveMQ概述

企業訊息軟體從80年代起就存在,它不只是一種應用間訊息傳遞風格,也是一種整合風格。因此,訊息傳遞可以滿足應用間的通知和互相操作。但是開源的解決方案是到最近10年才出現的。Apache ActiveMQ就是其中一種。它使應用間能以非同步,鬆耦合方式交流。ActiveMQ 是Apache出品,最流行的,能力強勁的開源訊息匯流排。

 ‍   ActiveMQ是Apache軟體基金下的一個開源軟體,它遵循JMS規範(Java Message Service),是訊息驅動中介軟體軟體(MOM)。它為企業訊息傳遞提供高可用,出色效能,可擴充套件,穩定和安全保障。ActiveMQ使用Apache許可協議。因此,任何人都可以使用和修改它而不必反饋任何改變。這對於商業上將ActiveMQ用在重要用途的人尤為關鍵。MOM的工作是在分散式的各應用之間排程事件和訊息,使之到達指定的接收者。所以高可用,高效能,高可擴充套件性尤為關鍵。

2.ActiveMQ特性

    ⒈支援多種語言客戶端,如:Java,C,C++,C#,Ruby,Perl,Python,PHP。應用協議有 OpenWire,Stomp REST,WS Notification,XMPP,AMQP。

    ⒉ 完全支援JMS1.1和J2EE1.4規範,它們包括同步和非同步訊息傳遞,一次和只有一次的訊息傳遞,對於預訂者的持久訊息等等。依附於JMS規範意味著,不論JMS訊息提供者是誰,同樣的基本特性(持久化,XA訊息,事務)都是有效的。

    ⒊ 對Spring的支援,ActiveMQ可以很容易內嵌到使用Spring的系統裡面去。

    ⒋ 通過了常見J2EE伺服器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何相容J2EE 1.4 商業伺服器上。

    ⒌ ActiveMQ提供各種連線選擇,包括HTTP,HTTPS,IP多點傳送,SSL,STOMP,TCP,UDP,XMPP等。大量的連線協議支援使之具有更好的靈活性。很多現有的系統使用一種特定協議並且不能改變,所以一個支援多種協議的訊息平臺降低了使用的門檻。雖然連線很重要,但是和其他容器整合也同樣重要。

    6.ActiveMQ提供多種永續性方案可供選擇,也可以完全按自己需求定製驗證和授權。例如,ActiveMQ通過KahaDB提供自己的超快速訊息持久方案(ultra-fast message persistence),但也支援標準的JDBC方案。ActiveMQ可以通過配置檔案提供簡單的驗證和授權,也提供標準的JAAS登陸模組。

    7.ActiveMQ是為開發者設計的。它並不需要專門的管理工具,因為它提供各種易用且強大的管理特性。有很多方法去監控ActiveMQ的各個方面,可以通過JMX使用JConsole或ActiveMQ web console;可以執行ActiveMQ訊息報告;可以用命令列指令碼;可以通過日誌。

    8.代理器叢集(Broker clustering)----為了利於擴充套件,多個ActiveMQ broker能夠聯合工作。這個方式就是network of brokers並且能支援多種拓撲結構;支援客戶端-伺服器,點對點。

    9.支援Ajax, 支援與Axis的整合

3.ActiveMQ優勢

    1.與OpenJMS、JbossMQ等開源jms provider相比,ActiveMQ有Apache的支援,持續發展的優勢明顯。

    2.速度很快,JBossMQ的十倍(沒有測試)

    3.提高系統資源的利用率,主要是任務的派發不是24小時平均的,而是高峰時期任務量很多,比如1秒1000多個,有的時候很低,比如十幾秒鐘才來一個。應用服務通過JMS佇列一個一個的取任務,做完一個再領一個,使系統資源的運用趨於平均。而JMS,比如JMS接收訊息的效率是很高的,比如ActiveMQ,在賽揚(2.40GHz)機器上能夠達到2000/s,訊息大小為1-2k。好一些的伺服器可以達到2萬以上/秒。

4.ActiveMQ安裝

目錄如下:

   +bin (windows下面的bat和unix/linux下面的sh)

   +conf (activeMQ配置目錄,包含最基本的activeMQ配置檔案)

   +data (預設是空的)

   +docs (只有index.html)

   +example (幾個例子)

   +lib (activemMQ使用到的lib)

   +webapps(後臺管理頁面)

+webapps-demo(後臺管理訊息傳送頁面)

   +activemq-all-5.8.0.jar (java開發的jar包)

   -LICENSE.txt

   -NOTICE.txt

   -README.txt

   -user-guide.html

你可以使用bin\activemq.bat(activemq)啟動

注意:

⒈ 這個僅僅是最基礎的ActiveMQ的配置,很多地方都沒有配置因此不要直接使用這個配置用於生存環境。

⒉ 有的時候由於埠被佔用,導致ActiveMQ錯誤,ActiveMQ可能需要以下埠1099(JMX),61616(預設的TransportConnector)。

⒊ 如果沒有物理網絡卡,或者MS的LoopBackAdpater Multicast會報一個錯誤

5.執行附帶的示例程式 

   1、Queue訊息示例: 

     * 啟動Queue訊息消費者 

       cd example 

ant consumer 

     * 啟動Queue訊息生產者 

       cd example 

       ant producer 
     簡要說明:生產者(producer)發訊息,消費者(consumer)接訊息,傳送/接收2000個訊息後自動關閉 

   2、Topic訊息示例: 

     * 啟動Topic訊息消費者 

       cd example 

       ant topic-listener 

     * 啟動Topic訊息生產者 

       cd example 

       ant topic-publisher 
     簡要說明:重複10輪,publisher每輪發送2000個訊息,並等待獲取listener的處理結果報告,然後進入下一輪      傳送,最後

統計全域性傳送時間。 

   3、Queue訊息和Topic訊息傳送之後,可以通過後臺點選Queues和Topics檢視傳送訊息具體資訊。

 

6.ActiveMQ類別及開發流程

   1)、Point-to-Point (點對點)訊息模式開發流程 :
       1、生產者(producer)開發流程(ProducerTool.java): 

         1.1 建立Connection: 根據url,user和password建立一個jms Connection。 

         1.2 建立Session: 在connection的基礎上建立一個session,同時設定是否支援事務和ACKNOWLEDGE標識。 

         1.3 建立Destination物件: 需指定其對應的主題(subject)名稱,producer和consumer將根據subject來發送/接收對應的訊息。 

         1.4 建立MessageProducer: 根據Destination建立MessageProducer物件,同時設定其持久模式。 

         1.5 傳送訊息到佇列(Queue): 封裝TextMessage訊息,使用MessageProducer的send方法將訊息傳送出去。 

       2、消費者(consumer)開發流程(ConsumerTool.java): 

         2.1 實現MessageListener介面: 消費者類必須實現MessageListener介面,然後在onMessage()方法中監聽訊息的到達並處理。 

         2.2 建立Connection: 根據url,user和password建立一個jms Connection,如果是durable模式,還需要給connection設定一個clientId。 

         2.3 建立Session和Destination: 與ProducerTool.java中的流程類似,不再贅述。 

         2.4 建立replyProducer【可選】:可以用來將訊息處理結果傳送給producer。 

         2.5 建立MessageConsumer:  根據Destination建立MessageConsumer物件。 

                          2.6 消費message:  在onMessage()方法中接收producer傳送過來的訊息進行處理,並可以通過replyProducer反饋資訊給producer 

  1. if (message.getJMSReplyTo() != null) {    
  2.   replyProducer.send(message.getJMSReplyTo(),     
  3.  session.createTextMessage("Reply: " + message.getJMSMessageID()));  

2)、Publisher/Subscriber(釋出/訂閱者)訊息模式開發流程 

       1、訂閱者(Subscriber)開發流程(TopicListener.java): 

         1.1 實現MessageListener介面: 在onMessage()方法中監聽釋出者發出的訊息佇列,並做相應處理。 

         1.2 建立Connection: 根據url,user和password建立一個jms Connection。 

         1.3 建立Session: 在connection的基礎上建立一個session,同時設定是否支援事務和ACKNOWLEDGE標識。 

         1.4 建立Topic:  建立2個Topic, topictest.messages用於接收發布者發出的訊息,topictest.control 用於向釋出者傳送訊息,實現雙方的互動。 

         1.5 建立consumer和producer物件:根據topictest.messages建立consumer,根據topictest.control建立 producer。 

         1.6 接收處理訊息:在onMessage()方法中,對收到的訊息進行處理,可直接簡單在本地顯示消息,或者根 據訊息內容不同處理對應的業務邏輯(比如:資料庫更新、檔案操作等等),並且可以使用producer物件將處理結果返回給釋出者。 

       2、釋出者(Publisher)開發流程(TopicPublisher.java):

         2.1 實現MessageListener介面:在onMessage()方法中接收訂閱者的反饋訊息。 

         2.2 建立Connection: 根據url,user和password建立一個jms Connection。 

                           2.3 建立Session: 在connection的基礎上建立一個session,同時設定是否支援事務和ACKNOWLEDGE標識。 

         2.4 建立Topic: 建立2個Topic,topictest.messages用於向訂閱者釋出訊息,topictest.control用於接 收訂閱者反饋的訊息。這2個topic與訂閱者開發流程中的topic是一一對應的。 

         2.5 建立consumer和producer物件: 根據topictest.messages建立publisher; 根據topictest.control 建立consumer,同時監聽訂閱者反饋的訊息。

         2.6 給所有訂閱者傳送訊息,並接收反饋訊息:  示例程式碼中,一共重複10輪操作。 每輪先向所有訂閱者 傳送2000個訊息; 然後堵塞執行緒,開始等待; 最後通過onMessage()方法,接收到訂閱者反饋的“REPORT”類資訊後,才print反饋資訊並解除執行緒堵塞,進入下一輪。 
             注:可同時執行多個訂閱者測試檢視此模式效果 

7.Eclipse程式碼開發

   1.建立一個Web Probject 專案,將activemq-all-5.8.0.jar放在lib裡面

   2.Queue(點對點)方式:生產者

  1. package jms;  
  2. import javax.jms.Connection;  
  3. import javax.jms.ConnectionFactory;  
  4. import javax.jms.DeliveryMode;  
  5. import javax.jms.Destination;  
  6. import javax.jms.MessageProducer;  
  7. import javax.jms.Queue;  
  8. import javax.jms.Session;  
  9. import javax.jms.TextMessage;  
  10. import org.apache.activemq.ActiveMQConnection;  
  11. import org.apache.activemq.ActiveMQConnectionFactory;  
  12. //Queue(點對點)方式  生存者Producer
  13. publicclass QueueProducer {  
  14.     privatestatic String user = ActiveMQConnection.DEFAULT_USER;  
  15.     privatestatic String password =ActiveMQConnection.DEFAULT_PASSWORD;  
  16.     privatestatic String url =  "tcp://localhost:61616";  
  17.     publicstaticvoid main(String[] args)throws Exception {  
  18.          // ConnectionFactory :連線工廠,JMS 用它建立連線
  19.         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);  
  20.         // Connection :JMS 客戶端到JMS Provider 的連線
  21.         Connection connection = connectionFactory.createConnection();  
  22.         // Connection 啟動
  23.         connection.start();  
  24.         System.out.println("Connection is start...");  
  25.         // Session: 一個傳送或接收訊息的執行緒
  26.         Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);  
  27.         // Queue :訊息的目的地;訊息傳送給誰.
  28.         Queue  destination = session.createQueue("example.A");  
  29.         // MessageProducer:訊息傳送者
  30.         MessageProducer producer = session.createProducer(destination);  
  31.         // 設定不持久化,此處學習,實際根據專案決定
  32.         producer.setDeliveryMode(DeliveryMode.PERSISTENT);  
  33.          // 構造訊息,此處寫死,專案就是引數,或者方法獲取
  34.         sendMessage(session, producer);  
  35.         session.commit();  
  36.         connection.close();  
  37.         System.out.println("send text ok.");  
  38.     }  
  39.     publicstaticvoid sendMessage(Session session, MessageProducer producer)  
  40.             throws Exception {  
  41.         for (int i = 1; i <= 100; i++) {//有限制,達到1000就不行
  42.             TextMessage message = session.createTextMessage("ActiveMq 傳送的訊息" + i);  
  43.             // 傳送訊息到目的地方
  44.             System.out.println("傳送訊息:" + "ActiveMq 傳送的訊息" + i);  
  45.             producer.send(message);  
  46.         }  
  47.     }  
  48. }  

  3.Queue(點對點)方式:消費者

  1. package jms;  
  2. import javax.jms.Connection;  
  3. import javax.jms.ConnectionFactory;  
  4. import javax.jms.Destination;  
  5. import javax.jms.JMSException;  
  6. import javax.jms.Message;  
  7. import javax.jms.MessageConsumer;  
  8. import javax.jms.MessageListener;  
  9. import javax.jms.Queue;  
  10. import javax.jms.Session;  
  11. import javax.jms.TextMessage;  
  12. import org.apache.activemq.ActiveMQConnection;  
  13. import org.apache.activemq.ActiveMQConnectionFactory;  
  14. //Queue(點對點)方式  消費這Consumer
  15. publicclass QueueConsumer {  
  16.     privatestatic String user = ActiveMQConnection.DEFAULT_USER;  
  17.     privatestatic String password =ActiveMQConnection.DEFAULT_PASSWORD;  
  18.     privatestatic String url = "tcp://localhost:61616";  
  19.     publicstaticvoid main(String[] args) throws Exception{  
  20.         // ConnectionFactory :連線工廠,JMS 用它建立連線
  21.         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);  
  22.         // Connection :JMS 客戶端到JMS Provider 的連線
  23.         Connection connection = connectionFactory.createConnection();  
  24.         connection.start();  
  25.         // Session: 一個傳送或接收訊息的執行緒
  26.         final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);  
  27.         // Destination :訊息的目的地;訊息傳送給誰.
  28.         Queue destination=session.createQueue("example.A");  
  29.         // 消費者,訊息接收者
  30.         MessageConsumer consumer = session.createConsumer(destination);  
  31.         consumer.setMessageListener(new MessageListener(){//有事務限制
  32.             @Override
  33.             publicvoid onMessage(Message message) {  
  34.                 try {  
  35.                     TextMessage textMessage=(TextMessage)message;  
  36.                     System.out.println(textMessage.getText());  
  37.                 } catch (JMSException e1) {  
  38.                     e1.printStackTrace();  
  39.                 }  
  40.                 try {  
  41.                     session.commit();  
  42.                 } catch (JMSException e) {  
  43.                     e.printStackTrace();  
  44.                 }  
  45.             }  
  46.         });  
  47. /*  另外一種接受方式 
  48. 相關推薦

    Java中介軟體JMSActiveMQ入門

    1.ActiveMQ概述 企業訊息軟體從80年代起就存在,它不只是一種應用間訊息傳遞風格,也是一種整合風格。因此,訊息傳遞可以滿足應用間的通知和互相操作。但是開源的解決方案是到最近10年才出現的。Apache ActiveMQ就是其中一種。它使應用間能以非同步,鬆耦合方式交流。ActiveMQ 是Apa

    Java中介軟體JMS(四)ActiveMQ整合spring之類轉換(三)

    前幾章都是直接傳送MapMessage型別的資料,拿前面的例子來講,如果生產者傳送的是TextMessage,消費者也是必須TextMessage;如果我們自己要傳送的資料不是TextMessage型別,而消費者還是TextMessage的,那該怎麼辦?難道每次接受後都要增

    Java中介軟體JMS(五)JMS入門

      3).JBoss 是 JBoss 公司開發的一個免費開源的應用伺服器,它提供了 EJB 執行的環境,並能夠結合 EJB 進行 JMS 訊息的收取,支援點到點模型和釋出/訂閱模型。  4).ActiveMQ 是一個基於 Apache 2.0 Licenced 釋出的開放原始碼的 JMS 產品,它能夠提供點到

    Java中介軟體JMS(三)ActiveMQ整合spring監聽器(二)

    對於讓spring管理監聽的實現方式有兩種方法,一種是自己寫監聽器,然後交給spring的監聽介面卡管理,再由監聽容器管理監聽介面卡,另一種是寫一個實現MessageListener介面的類。第一種在第一章涉及到,但是沒有交給spring託管.其實實現的方法很簡單,在j2e

    Java中介軟體JMS(四)ActiveMQ整合spring之類轉換

    前幾章都是直接傳送MapMessage型別的資料,拿前面的例子來講,如果生產者傳送的是TextMessage,消費者也是必須TextMessage;如果我們自己要傳送的資料不是TextMessage型別,而消費者還是TextMessage的,那該怎麼辦?難道每次接受後都要增加一個轉換方法麼?其實sprin

    訊息中介軟體介紹 ActiveMQ的安裝

    訊息中介軟體簡介:https://blog.csdn.net/leexide/article/details/80035462 JMS其實就是訊息中介軟體的java訊息服務   訊息中介軟體的安裝 安裝非常簡單,直接上傳到linux系統中,然後解壓,開啟它裡面bin目錄裡面

    01-訊息中介軟體概述和ActiveMq入門

    1.mq解決的問題 系統非同步處理 應用解耦 流量削峰 日誌處理 訊息通訊 2.訊息中介軟體的2中模型 2.1 Point-to-Point(P2P) / 點對點 / 類比:送快遞 特點: + 一個消費生產者必須有一個訊息消費者。一對一的關係 + 一個訊息傳送到queue

    activemq訊息中介軟體--JMS概述(1)

    1 JMS概述 目前現在很多的RPC中介軟體技術都有如下問題: (1)同步通訊,客戶端發出呼叫請求,必須等待服務端處理完成以後返回結果才能繼續執行。 (2)客戶和服務物件的生命週期緊密耦合,客戶程序和服務程序都必須正常進行,如果由於服務物件的崩潰和網路故障導致客戶請求不可達,客戶收到

    JMS訊息中介軟體原理及ActiveMQ在企業中的應用(接上篇)

    程式碼實現:傳送訊息---》接受訊息---》伺服器配置 //1 傳送訊息(接受回覆訊息) public class SenderMessageService { //釋出指定訊息到指定地址(在釋出之前,建議將訊息儲存到資料庫)  public void publish(St

    Java訊息中介軟體學習九 -- ActiveMQ與HA架構(master/slave)

    HA(高可用性)幾乎在所有的架構中都需要有一定的保證 ,在生產環境中,我們也需要面對broker失效、網路故障等各種問題,ActiveMQ也不例外。activeMQ作為消費分發和儲存系統,它的HA模型只有master-slave,我們通過broker節點“訊息互

    JAVA訊息佇列ActiveMQ入門

    訊息中介軟體前言本文只適合初級入門,闡述了訊息中介軟體的基礎概念,和部分實踐,理論性比較強。為什麼要使用訊息中介軟體有一家專業做保險的大公司,他有一套保險系統,記錄著保險銷售的所有資訊,他在全國有38家分公司,每一家分公司都有自己的保險系統,總公司與分公司之間有著保單資料共享

    《大型網站系統與JAVA中介軟體實踐》 第六章 訊息中介軟體

        如何保證一致性                     &

    《大型網站系統與JAVA中介軟體實踐》 第五章 資料訪問層

             兩階段提交                 &nbs

    《大型網站系統與JAVA中介軟體實踐》 第一章

     主要內容  1、將單機發展為分散式 分散式也可以看為大型單機 控制儲存和計算單元變得更大了 2、 NIO BIO AIO 3、 互動呼叫常見方式: 名稱 、規則 、master、 4、分散式的難點     1)、單點故障 &nb

    訊息中介軟體JMS

    一.什麼是訊息中介軟體      訊息中介軟體利用高效可靠的訊息傳遞機制進行平臺無關的資料交流,並基於資料通訊來進行分散式系統的整合。通過提供訊息傳遞和訊息排隊模型,它可以在分散式環境下擴充套件程序間的通訊。   ActiveMQ 是Apache出品,最流行的,能力強勁的開源訊息匯流排。Acti

    訊息中介軟體kafka與activeMQ、rabbitMQ、zeroMQ、rocketMQ的比較

    一、kafka 1、不完全符合jms規範,注重吞吐量,類似udp 和 tcp 2、一般做大資料吞吐的管道 我們現在的用途就是負責在各個idc之間通訊 3、量大對資料不是百分之百保證的,會有資料丟失,不是百分百送達(amq和rmq等有重發機制,而kafka沒有);在吞吐量有提升 ,在這方面