1. 程式人生 > >訊息佇列學習基礎

訊息佇列學習基礎

什麼是MOM

MOM 就是面向訊息中介軟體(Message-oriented middleware),是用於以分散式應用或系統中的非同步、鬆耦合、可靠、可擴充套件和安全通訊的一類軟體。MOM 的總體思想是它作為訊息傳送器和訊息接收器之間的訊息中介,這種中介提供了一個全新水平的鬆耦合。

MOM思想就是A和B兩個應用程式不直接傳送訊息。之前A和B直接傳送訊息有很多效率問題,如A傳送之後B沒有及時接受,那麼A就一直再在那裡堵塞併發性不好,A必須等B接受完之後有返結果了A才可以結束。而MOM就是為了解決這樣的問題,不讓A與B之間互動,在A和B之間加一個訊息中介軟體,A把訊息放到訊息中間上,就可以走了,去做別的事情,B什麼時候來訊息中介軟體取訊息A不用知道也不用管。這樣就提高了效率提供併發性,等B去走後可以通過狀態,通知,回撥等方式通知A就可以。市面上實現這種思想的技術有很多,IBM(MQSEVICES)、Microsoft(MSMQ)以及BEA的MessageMQ等。處於百家爭鳴階段都是各自實現各自的,沒有統一實現標準。此時SUN為了實現統一標準就出現了JMS統一實現規範。JMS主要有2種訊息模型,點到點和釋出訂閱兩種。

什麼是訊息佇列

訊息佇列是在訊息的傳輸過程中儲存訊息的容器,用於接收訊息並以檔案的方式儲存,一個訊息佇列可以被一個也可以被多個消費者消費。

訊息佇列中介軟體是分散式系統中重要的元件,主要解決應用耦合、非同步訊息、流量削鋒等問題。實現高效能、高可用、可伸縮和最終一致性架構。是大型分散式系統不可缺少的中介軟體。

目前在生產環境,使用較多的訊息佇列有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等。

訊息佇列優點

  1. 將資料從一個應用程式傳到另一個應用程式,或者從軟體的一個模組傳送到另外一個模組
  2. 負責建立網路通訊的通道,進行資料的可靠傳送
  3. 保證資料不重發,不丟失
  4. 能夠實現跨平臺操作,能夠為不同作業系統上的軟體整合技工資料傳送服務

訊息佇列的應用場景

下面詳細介紹一下訊息佇列在實際應用中常用的使用場景。場景分為非同步處理、應用解耦、流量削鋒和訊息通訊四個場景。

非同步處理

場景說明 使用者註冊後,需要傳送註冊郵件和傳送註冊資訊,傳統的做法有兩種:序列方式並行方式

序列方式

將註冊資訊寫入資料庫成功後,傳送註冊郵件,然後傳送註冊簡訊,而所有任務執行完成後,返回資訊給客戶端


並行方式

將註冊資訊寫入資料庫成功後,同時進行傳送註冊郵件和傳送註冊簡訊的操作。而所有任務執行完成後,返回資訊給客戶端。同序列方式相比,並行方式可以提高執行效率,減少執行時間。

上面的比較可以發現,假設三個操作均需要50ms的執行時間,排除網路因素,則最終執行完成,序列方式需要150ms,而並行方式需要100ms。

因為cpu在單位時間內處理的請求數量是一致的,假設:CPU每1秒吞吐量是100此,則序列方式1秒內可執行的請求量為1000/150,不到7次;並行方式1秒內可執行的請求量為1000/100,為10次。

由上可以看出,傳統序列和並行的方式會受到系統性能的侷限,那麼如何解決這個問題?
我們需要引入訊息佇列,將不是必須的業務邏輯,非同步進行處理,由此改造出來的流程為


根據上述的流程,使用者的響應的時間基本相當於將資料寫入資料庫的時間,傳送註冊郵件,傳送註冊簡訊的訊息在寫入訊息佇列後,即可返回執行結果,寫入訊息佇列的時間很快,幾乎可以忽略,也有此可以將系統吞吐量提升至20QPS,比序列方式提升近3倍,比並行方式提升2倍。

應用解耦

場景說明 使用者下單後,訂單系統需要通知庫存系統。

傳統的做法為:訂單系統呼叫庫存系統的介面。如下圖所示:


傳統方式具有如下缺點:
  1. 假設庫存系統訪問失敗,則訂單減少庫存失敗,導致訂單建立失敗
  2. 訂單系統同庫存系統過度耦合

如何解決上述的缺點呢?需要引入訊息佇列,引入訊息佇列後的架構如下圖所示:


引入訊息佇列,實現應用解耦
  • 訂單系統:使用者下單後,訂單系統進行資料持久化處理,然後將訊息寫入訊息佇列,返回訂單建立成功
  • 庫存系統:使用拉/推的方式,獲取下單資訊,庫存系統根據訂單資訊,進行庫存操作。

假如在下單時庫存系統不能正常使用。也不影響正常下單,因為下單後,訂單系統寫入訊息佇列就不再關心其後續操作了。由此實現了訂單系統與庫存系統的應用解耦。

流量削鋒

流量削峰 也是訊息對列中的常用場景,一般在秒殺或團搶活動中使用廣泛。

應用場景 秒殺活動,一般會因為流量過大,導致流量暴增,應用掛掉。為解決這個問題,一般需要在應用前端加入訊息佇列。

  1. 可以控制參與活動的人數;
  2. 可以緩解短時間內高流量對應用的巨大壓力;

流量削鋒處理方式系統圖如下:


流量削鋒方式系統圖
  1. 伺服器在接收到使用者請求後,首先寫入訊息佇列。這時如果訊息佇列中訊息數量超過最大數量,則直接拒絕使用者請求或返回跳轉到錯誤頁面;
  2. 秒殺業務根據秒殺規則讀取訊息佇列中的請求資訊,進行後續處理。

日誌處理

日誌處理是指將訊息佇列用在日誌處理中,比如Kafka的應用,解決大量日誌傳輸的問題。

日誌處理是指將訊息佇列用在日誌處理中,比如Kafka的應用,解決大量日誌傳輸的問題。架構簡化如下:


訊息佇列應用於日誌處理的架構
  • 日誌採集客戶端:負責日誌資料採集,定時寫受寫入Kafka佇列;
  • Kafka訊息佇列:負責日誌資料的接收,儲存和轉發;
  • 日誌處理應用:訂閱並消費kafka佇列中的日誌資料;

這種架構在實際開發中的應用,可以參照案例:新浪技術分享:我們如何扛下32億條實時日誌的分析處理

服務的技術架構設計

  1. Kafka:接收使用者日誌的訊息佇列。
  2. Logstash:做日誌解析,統一成JSON輸出給Elasticsearch。
  3. Elasticsearch:實時日誌分析服務的核心技術,一個schemaless,實時的資料儲存服務,通過index組織資料,兼具強大的搜尋和統計功能。
  4. Kibana:基於Elasticsearch的資料視覺化元件,超強的資料視覺化能力是眾多公司選擇ELK stack的重要原因。

訊息通訊

訊息通訊是指,訊息佇列一般都內建了高效的通訊機制,因此也可以用在純的訊息通訊。比如實現點對點訊息佇列、聊天室等。

點對點通訊

點對點通訊架構設計

在點對點通訊架構設計中,客戶端A和客戶端B共用一個訊息佇列,即可實現訊息通訊功能。

聊天室通訊

聊天室通訊架構設計

客戶端A、客戶端B、直至客戶端N訂閱同一訊息佇列,進行訊息的釋出與接收,即可實現聊天通訊方案架構設計。

JMS訊息服務

講訊息佇列就不得不提JMS。JMS(Java Message Service,Java訊息服務) JMS 叫做 Java 訊息服務(Java Message Service),是 Java 平臺上有關面向 MOM 的技術規範,旨在通過提供標準的產生、傳送、接收和處理訊息的 API 簡化企業應用的開發,類似於 JDBC 和關係型資料庫通訊方式的抽象。

API是一個訊息服務的標準/規範,允許應用程式元件基於JavaEE平臺建立,傳送,接收和讀取訊息。他是分散式通訊耦合度更低,訊息服務更加可靠以及非同步性。

在EJB架構中,有訊息bean可以無縫的與JM訊息服務整合。在J2EE架構模式中,有訊息服務者模式,用於實現訊息與應用直接的解耦。


常用概念

  • Provider:純 Java 語言編寫的 JMS 介面實現(比如 ActiveMQ 就是)
  • Domains:訊息傳遞方式,包括點對點(P2P)、釋出/訂閱(Pub/Sub)兩種
  • Connection factory:客戶端使用連線工廠來建立與 JMS provider 的連線
  • Destination:訊息被定址、傳送以及接收的物件

訊息模型

在JMS標準中,有兩種訊息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)

P2P 模式


P2P(點對點)模式包含三個角色:訊息佇列(Queue),傳送者(Sender),接收者(Receiver)。每個訊息都被髮送到一個特定的佇列,接收者從佇列中獲取訊息。佇列保留著訊息,知道他們被消費或者超時。

P2P 訊息域使用 queue 作為 Destination,訊息可以被同步或非同步的傳送和接收,每個訊息只會給一個 Consumer 傳送一次。Consumer 可以使用 MessageConsumer.receive() 同步地接收訊息,也可以通過使用MessageConsumer.setMessageListener() 註冊一個 MessageListener 實現非同步接收。

多個 Consumer 可以註冊到同一個 queue 上,但一個訊息只能被一個 Consumer 所接收,然後由該 Consumer 來確認訊息。並且在這種情況下,Provider 對所有註冊的 Consumer 以輪詢的方式傳送訊息。


P2P的特點

  1. 每個訊息只有一個消費者(Consumer)(即一旦被消費,訊息就不再在訊息佇列中,其他的消費者就不能得到這條訊息了。)
  2. 傳送者和接收者質檢在時間上沒有依賴性,也就是說當傳送者傳送了訊息之後,不管接收者有沒有正在執行,他不會影響到訊息傳送到佇列。
  3. 消費者必須確認對訊息的接收

    收到訊息後消費者必須確認訊息已被接收,否則JMS服務提供者會認為該訊息沒有被接收,那麼這條訊息仍然可以被其他人接收。程式可以自動進行確認,不需要人工干預。

  4. 非持久的訊息最多隻傳送一次

    非持久的訊息最多隻傳送一次,表示訊息有可能未被髮送,造成未被髮送的原因可能有:

    1、 JMS服務提供者出現宕機等情況,造成非持久資訊的丟失

    2、 佇列中的訊息過期,未被接收

  5.  持久的訊息嚴格傳送一次

    我們可以將比較重要的訊息設定為持久化的訊息,持久化後的訊息不會因為JMS服務提供者的故障或者其他原因造成訊息丟失。

如果希望傳送的每個訊息都會被成功處理的話,那麼需要p2p 模式

Pub/Sub模式



包含三個角色:主題(Topic),釋出者(Publisher),訂閱者(Subscriber)。多個釋出者將訊息傳送到Topic,系統將這些訊息傳遞給多個訂閱者。

Pub/Sub(釋出/訂閱,Publish/Subscribe)訊息域使用 topic 作為 Destination,釋出者向 topic 傳送訊息,訂閱者註冊接收來自 topic 的訊息。傳送到 topic 的任何訊息都將自動傳遞給所有訂閱者。接收方式(同步和非同步)與 P2P 域相同。

除非顯式指定,否則 topic 不會為訂閱者保留訊息。當然,這可以通過持久化(Durable)訂閱來實現訊息的儲存。這種情況下,當訂閱者與 Provider 斷開時,Provider 會為它儲存訊息。當持久化訂閱者重新連線時,將會受到所有的斷連期間未消費的訊息。

Pub/Sub的特點

  • 每個訊息都可以有多個(0,1,……)訂閱者,每條訊息可以有多個消費者,如果報紙和雜誌一樣,誰訂閱了誰都可以獲得。

  • 釋出者和訂閱者之間有時間上的依賴性。訂閱者只能消費他們訂閱之後出版的訊息,針對某個主題(Topic)的訂閱者,它必須建立一個訂閱者之後,才能消費釋出者的訊息。這就要求訂閱者必須先訂閱,生產者再發布。即訂閱者必須先執行,再等待生產者的執行,這和點對點型別有所差異。
  • 為了消費訊息,訂閱者必須保持執行的狀態。即訂閱者必須保持活動狀態等待發布者釋出的訊息,如果訂閱者在釋出者釋出訊息之後才執行,則不能獲得先前釋出者釋出的訊息。

為了緩和這樣嚴格的時間相關性,JMS允許訂閱者建立一個可持久化的訂閱。這樣,即使訂閱者沒有被啟用(執行),它也能接收到釋出者的訊息。
如果希望傳送的訊息可以不被做任何處理、或者只被一個訊息者處理、或者可以被多個消費者處理的話,那麼可以採用Pub/Sub模型。

訊息消費

在JMS中,訊息的產生和消費都是非同步的。對於消費來說,JMS的訊息者可以通過兩種方式來小消費訊息。

  1. 同步
    訂閱者或接收者通過receive方法來接受訊息,receive在接收到訊息之前(或超時之前)將一直阻塞。
  2. 非同步
    訂閱者或接收者亦可以註冊未一個訊息監聽器。當訊息到達之後,系統自動呼叫監聽器的onMessage的方法。
JDNI:Java命名和目錄介面,是一種標準的Java命名系統介面。可以在網路上查詢和訪問服務。通過指定一個資源名稱,該名稱對應於資料庫或者命名服務中的一個記錄,同時返回資源連線建立所必需的資訊。

JNDI在JMS中起到查詢二號訪問傳送目標或訊息來源的作用。

JMS程式設計

JMS通用步驟

  • 獲取連線工廠
  • 使用連線工廠建立連線
  • 啟動連線
  • 從連線建立會話
  • 獲取 Destination
  • 建立 Producer,或
    • 建立 Producer
    • 建立 message
  • 建立 Consumer,或傳送或接收message傳送或接收 message
    • 建立 Consumer
    • 註冊訊息監聽器(可選)
  • 傳送或接收 message
  • 關閉資源(connection, session, producer, consumer 等)

JMS程式設計模型

1.ConnectionFactory

建立Connection物件的工廠,針對兩週不同的JMS訊息模型,分別有QueueConnectionFactory和TopicConnectionFactory兩種。可以通過JNDI來查詢ConnectionFactory物件。

2.Destination

Destination的意思是訊息生產者的訊息傳送目標或著說訊息消費者的訊息來源。對於訊息生產者來說。他的Destination是某個佇列(queue)或者某個主題(Topic);對於訊息消費者來說,他的Destination也是某個佇列或主題(即訊息來源)。

所以,Destination實際上就是兩種型別的物件:Queue,Topic可以通過JNDI來查詢Destination

3.Connection

Connection表示在客戶端和JMS系統之間建立的連結(對TCP/IP Socket的包裝)。Connection可以產生一個或多個Session。跟ConnectionFactory一樣,Connection也有兩種型別:QueueConnection和TopicConnection。

4.Session

Session是操作訊息的介面。可以通過session建立生產者、消費者、訊息等。Session提供了事務的功能。當需要使用session傳送/接收多個訊息時,可以將這些傳送/接收動作放到一個事務中。同樣,也分QueueSession和TopicSession。

5.訊息的生產者

訊息生產者由Session建立,並用於將訊息傳送到Destination。同樣,訊息生產者分兩種型別:QueueSender和TopicPublisher。可以呼叫訊息生產者的方法(send或publish方法)傳送訊息。

6.訊息消費者

訊息消費者由Session建立,用於接收被髮送到Destination的訊息。兩種型別:QueueReceiver和TopicSubscriber。可分別通過session的createReceiver(Queue)或createSubscriber(Topic)來建立。當然,也可以session的creatDurableSubscriber方法來建立持久化的訂閱者。

7. MessageListener

訊息監聽器。如果註冊了訊息監聽器,一旦訊息到達,將自動呼叫監聽器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一種MessageListener。

深入學習JMS對掌握JAVA架構、EJB架構有很好的幫助,訊息中介軟體也是大型分散式系統必須的元件。本次分享主要做全域性性介紹,具體的深入需要大家學習,實踐,總結,領會。

JMS程式設計實戰

這裡拿ActiveMQ 舉例

public class JMSDemo {
        ConnectionFactory connectionFactory;
        Connection connection;
        Session session;
        Destination destination;
        MessageProducer producer;
        MessageConsumer consumer;
        Message message;
        boolean useTransaction = false;
        try {
                Context ctx = new InitialContext();
                connectionFactory = (ConnectionFactory) ctx.lookup("ConnectionFactoryName");
                //使用ActiveMQ時:connectionFactory = new ActiveMQConnectionFactory(user, password, getOptimizeBrokerUrl(broker));
                connection = connectionFactory.createConnection();
                connection.start();
                session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
                destination = session.createQueue("TEST.QUEUE");
                //生產者傳送訊息
                producer = session.createProducer(destination);
                message = session.createTextMessage("this is a test");

                //消費者同步接收
                consumer = session.createConsumer(destination);
                message = (TextMessage) consumer.receive(1000);
                System.out.println("Received message: " + message);
                //消費者非同步接收
                consumer.setMessageListener(new MessageListener() {
                        @Override
                        public void onMessage(Message message) {
                                if (message != null) {
                                        doMessageEvent(message);
                                }
                        }
                });
        } catch (JMSException e) {
                ...
        } finally {
                producer.close();
                session.close();
                connection.close();
        }
}複製程式碼