1. 程式人生 > >RabbitMQ訊息推送總結

RabbitMQ訊息推送總結

藉助於 RabbitMQ 的 Web STOMP 外掛,實現瀏覽器與服務端的全雙工通訊。從本質上說,RabbitMQ 的 Web STOMP 外掛也是利用 WebSocket 對 STOMP 協議進行了一次橋接,從而實現瀏覽器與服務端的雙向通訊。

安裝 RabbitMQ 服務

mac安裝rabbitmq

brew update
brew install rabbitmq

耐心等待,安裝完成後需要將/usr/local/sbin新增到$PATH,可以將下面這兩行加到~/.bash_profile:

# RabbitMQ Config
export PATH=$PATH:/usr/local/sbin

編輯完後:wq儲存退出,使環境變數立即生效。

source ~/.bash_profile

啟動rabbitmq

rabbitmq-server

啟用相關外掛

rabbitmq-plugins enable rabbitmq_management rabbitmq_web_stomp rabbitmq_stomp

重啟 RabbitMQ 服務

service rabbitmq-server restart

驗證是否安裝成功

通過 Web 瀏覽器來檢視 RabbitMQ 的執行狀態,瀏覽器中輸入 http://{server_ip}:15672,用 guest/guest 預設的使用者和密碼登入後即可檢視 RabbitMQ 的執行狀態。

基於 RabbitMQ 的實時訊息推送

RabbitMQ 有很多第三方外掛,可以在 AMQP 協議基礎上做出許多擴充套件的應用。Web STOMP 外掛就是基於 AMQP 之上的 STOMP 文字協議外掛,利用 WebSocket 能夠輕鬆實現瀏覽器和伺服器之間的實時訊息傳遞。

Paste_Image.png

訊息傳送者

Java 作為 RabbitMQ 客戶端訊息傳送者,Web 瀏覽器作為訊息消費者。

package com.ibm.cdl.itaas.stomp;

import java.io.IOException;
import
java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Program { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("rabbitmq", "fanout"); String routingKey = "rabbitmq_routingkey"; String message = "{\"name\":\"推送測試資料!\"}"; channel.basicPublish("rabbitmq", routingKey,null, message.getBytes()); System.out.println("[x] Sent Message:"+message); channel.close(); connection.close(); } }

這裡我們利用 RabbitMQ 官方提供的 Java Client Library 來實現訊息的傳送

Paste_Image.png

exchange 接收到訊息後,會根據訊息的 key 和已經設定的 binding 進行訊息路由,最終投遞到一個或多個佇列裡進行訊息處理。RabbitMQ 預置了一些 exchange,如果客戶端未宣告 exchange 時,RabbitMQ 會根據 exchange 型別使用預設的 exchange。

預置exchange名稱

Paste_Image.png

Exchange 型別

1.Direct exchange
Direct exchange 完全根據 key 進行投遞,只有 key 與繫結時的 routing key 完全一致的訊息才會收到訊息,參考官網提供的圖 4 更直觀地瞭解 Direct exchange。

Paste_Image.png

2.Fanount exchange
Fanount 完全不關心 key,直接採取廣播的方式進行訊息投遞,與該交換機繫結的所有佇列都會收到訊息

Paste_Image.png

3.Topic exchange
Topic exchange 會根據 key 進行模式匹配然後進行投遞,與設定的 routing key 匹配上的佇列才能收到訊息。

4.Headers exchange
Header exchange 使用訊息頭代替 routing key 作為關鍵字進行路由,不過在實際應用過程中這種型別的 exchange 使用較少。

訊息持久化

RabbitMQ 支援訊息的持久化,即將訊息資料持久化到磁碟上,如果訊息伺服器中途斷開,下次開啟會將持久化的訊息重新發送,訊息佇列持久化需要保證 exchange(指定 durable=1)、queue(指定 durable=1)和訊息(delivery_mode=2)3 個部分都是持久化。出於資料安全考慮,一般訊息都會進行持久化。

訊息接收者

html程式碼

<!DOCTYPE html>
<html><head>
  <script src="jquery/jquery-1.9.1.min.js"></script>
  <script src="rabbitmq/sockjs-0.3.js"></script>
  <script src="rabbitmq/stomp.js"></script>

  <style>
      .box {
          width: 440px;
          float: left;
          margin: 0 20px 0 20px;
      }

      .box div, .box input {
          border: 1px solid;
          -moz-border-radius: 4px;
          border-radius: 4px;
          width: 100%;
          padding: 5px;
          margin: 3px 0 10px 0;
      }

      .box div {
          border-color: grey;
          height: 300px;
          overflow: auto;
      }

      div code {
          display: block;
      }

      #first div code {
          -moz-border-radius: 2px;
          border-radius: 2px;
          border: 1px solid #eee;
          margin-bottom: 5px;
      }

      #second div {
          font-size: 0.8em;
      }
  </style>
  <title>RabbitMQ Web STOMP Examples : Echo Server</title>
  <link href="main.css" rel="stylesheet" type="text/css"/>
</head><body lang="en">
    <h1><a href="index.html">RabbitMQ Web STOMP Examples</a> > Echo Server</h1>

    <div id="first" class="box">
      <h2>Received</h2>
      <div></div>
      <form><input autocomplete="off" value="Type here..."></input></form>
    </div>

    <div id="second" class="box">
      <h2>Logs</h2>
      <div></div>
    </div>

    <script>
        var has_had_focus = false;
        var pipe = function(el_name, send) {
            var div  = $(el_name + ' div');
            var inp  = $(el_name + ' input');
            var form = $(el_name + ' form');

            var print = function(m, p) {
                p = (p === undefined) ? '' : JSON.stringify(p);
                div.append($("<code>").text(m + ' ' + p));
                div.scrollTop(div.scrollTop() + 10000);
            };

            if (send) {
                form.submit(function() {
                    send(inp.val());
                    inp.val('');
                    return false;
                });
            }
            return print;
        };
        
        var print_first = pipe('#first', function(data) {
//            client.send('/topic/test', {"content-type":"text/plain"}, data);
        });

// Stomp.js boilerplate
if (location.search == '?ws') {
    var ws = new WebSocket('ws://127.0.0.1:15674/ws');
} else {
    var ws = new SockJS('http://127.0.0.1:15674/stomp');
}

// Init Client
var client = Stomp.over(ws);

// SockJS does not support heart-beat: disable heart-beats
client.heartbeat.outgoing = 0;
client.heartbeat.incoming = 0;
client.debug = pipe('#second');

// Declare on_connect
var on_connect = function(x) {
    client.subscribe("/exchange/rabbitmq/rabbitmq_routingkey", function(d) {
        print_first(d.body);
    });
};

// Declare on_error
var on_error =  function() {
  console.log('error');
};

// Conect to RabbitMQ
client.connect('guest', 'guest', on_connect, on_error, '/');

      $('#first input').focus(function() {
          if (!has_had_focus) {
              has_had_focus = true;
              $(this).val("");
          }
      });
    </script>
</body>
</html>

測試stomp外掛是否好用

Paste_Image.png

重要JavaScript程式碼

// Stomp.js boilerplate
if (location.search == '?ws') {
 var ws = new WebSocket('ws://192.168.1.102:15674/ws');
} else {
 var ws = new SockJS('http://192.168.1.102:15674/stomp');
}
// Init Client
var client = Stomp.over(ws);
// SockJS does not support heart-beat: disable heart-beats
client.heartbeat.outgoing = 0;
client.heartbeat.incoming = 0;
// Declare on_connect
var on_connect = function(x) {
 client.subscribe("/exchange/rabbitmq/rabbitmq_routingkey", function(d) {
 print_first(d.body);
 });
};
// Declare on_error
var on_error = function() {
 console.log('error');
};
// Conect to RabbitMQ
client.connect('guest', 'guest', on_connect, on_error, '/');

RabbitMQ Web STOMP 外掛可以理解為 HTML5 WebSocket 與 STOMP 協議間的橋接,目的也是為了讓瀏覽器能夠使用 RabbitMQ。當 RabbitMQ 訊息伺服器開啟了 STOMP 和 Web STOMP 外掛後,瀏覽器端就可以輕鬆地使用 WebSocket 或者 SockerJS 客戶端實現與 RabbitMQ 伺服器進行通訊。

RabbitMQ Web STOMP 是對 STOMP 協議的橋接,因此其語法也完全遵循 STOMP 協議。STOMP 是基於 frame 的協議,與 HTTP 的 frame 相似。一個 Frame 包含一個 command,一系列可選的 headers 和一個 body。STOMP client 的使用者代理可以充當兩個角色,當然也可能同時充當:作為生產者,通過 SEND frame 傳送訊息到伺服器;作為消費者,傳送 SUBCRIBE frame 到目的地並且通過 MESSAGE frame 從伺服器獲取訊息。

在 Web 頁面中利用 WebSocket 使用 STOMP 協議只需要下載 stomp.js 即可,考慮到老版本的瀏覽器不支援 WebSocket,SockJS 則提供了 WebSocket 的模擬支援。Web 頁面中使用 STOMP 協議詳見下列程式碼清單如下

// 初始化 ws 物件
if (location.search == '?ws') {
 var ws = new WebSocket('ws://192.168.1.102:15674/ws');
} else {
 var ws = new SockJS('http://192.168.1.102:15674/stomp');
}
// 建立連線
var client = Stomp.over(ws);
// SockJS does not support heart-beat: disable heart-beats
client.heartbeat.outgoing = 0;
client.heartbeat.incoming = 0;
// 定義連線成功回撥函式
var on_connect = function(x) {
console.log('connect successfully');
 // 傳送訊息
client.send(destination,head,body);
 // 傳送訊息
client.subcribe(destination,callback);
 // 預設主動 ACK,手動 ACK
client.subcribe(destination,function(message){
 Message.ack();
},{ack:'client'});
// 事務支援
var tx = client.begin();
client.send(destination,head,body);
tx.commit();
};
// 定義連線失敗回撥函式
var on_error = function(error) {
 console.log(error.headers.message);
};
// 連線訊息伺服器
client.connect(login, password, on_connect, on_error, '/');

destination 在 RabbitMQ Web STOM 中進行了相關的定義,根據使用場景的不同,主要有以下 4 種:

  • 1./exchange/<exchangeName>
    對於 SUBCRIBE frame,destination 一般為/exchange/<exchangeName>/[/pattern] 的形式。該 destination 會建立一個唯一的、自動刪除的、名為<exchangeName>的 queue,並根據 pattern 將該 queue 繫結到所給的 exchange,實現對該佇列的訊息訂閱。
    對於 SEND frame,destination 一般為/exchange/<exchangeName>/[/routingKey] 的形式。這種情況下訊息就會被髮送到定義的 exchange 中,並且指定了 routingKey。

  • 2./queue/<queueName>
    對於 SUBCRIBE frame,destination 會定義<queueName>的共享 queue,並且實現對該佇列的訊息訂閱。
    對於 SEND frame,destination 只會在第一次傳送訊息的時候會定義<queueName>的共享 queue。該訊息會被髮送到預設的 exchange 中,routingKey 即為<queueName>。

  • 3./amq/queue/<queueName>
    這種情況下無論是 SUBCRIBE frame 還是 SEND frame 都不會產生 queue。但如果該 queue 不存在,SUBCRIBE frame 會報錯。
    對於 SUBCRIBE frame,destination 會實現對佇列<queueName>的訊息訂閱。
    對於 SEND frame,訊息會通過預設的 exhcange 直接被髮送到佇列<queueName>中。

  • 4./topic/<topicName>
    對於 SUBCRIBE frame,destination 創建出自動刪除的、非持久的 queue 並根據 routingkey 為<topicName>繫結到 amq.topic exchange 上,同時實現對該 queue 的訂閱。
    對於 SEND frame,訊息會被髮送到 amq.topic exchange 中,routingKey 為<topicName>。
    執行java類

Paste_Image.png

瀏覽器訪問效果

Paste_Image.png

WebSocket 作為 HTML5 提供的新一代客戶端-伺服器非同步通訊方法,能夠輕鬆完成前端與後臺的雙向通訊。RabbitMQ 服務提供了一個 STOMP 外掛,能夠實現與 WebSocket 的橋接,這樣既能夠實現訊息的主動推送,同時也能夠實現訊息的非同步處理。在傳統的 Web 開發中存在許多狀態變更實時性的需求,比如資源被佔用後需要廣播它的實時狀態,利用本文提出的解決方案,可以方便將其推送到所有監聽的客戶端。因此在新 J2EE 開發專案中,建議使用本文提出的方案替代原來 ajax 輪詢方法重新整理狀態。

實現伺服器端推送的幾種方式

Web 應用都是基於 HTTP 協議的請求/響應模式,無法像 TCP 協議那樣保持長連線,因此 Web 應用就很難像手機那樣實現實時的訊息推送。就目前來看,Web 應用的訊息推送方式主要有以下幾種

  • Ajax 短輪詢
    Ajax 輪詢主要通過頁面端的 JS 定時非同步重新整理任務來實現資料的載入,但這種方式實時效果較差,而且對服務端的壓力也較大。

  • 長輪詢
    長輪詢主要也是通過 Ajax 機制,但區別於傳統的 Ajax 應用,長輪詢的伺服器端會在沒有資料時阻塞請求直到有新的資料產生或者請求超時才返回,之後客戶端再重新建立連接獲取資料,具體實現方式見圖 1 所示。但長輪詢服務端會長時間地佔用資源,如果訊息頻繁傳送的話會給服務端帶來較大的壓力。

  • WebSocket 雙向通訊
    WebSocket 是 HTML5 中一種新的通訊協議,能夠實現瀏覽器與伺服器之間全雙工通訊。如果瀏覽器和服務端都支援 WebSocket 協議的話,該方式實現的訊息推送無疑是最高效、簡潔的。並且最新版本的 IE、Firefox、Chrome 等瀏覽器都已經支援 WebSocket 協議,Apache Tomcat 7.0.27 以後的版本也開始支援 WebSocket。