1. 程式人生 > >瞭解一下RabbitMQ

瞭解一下RabbitMQ

RabbitMQ概述

RabbitMQ是遵從AMQP協議的 通訊協議都設計到報文互動,換句話說RabbitMQ就是AMQP協議的Erlang的實現。

AMQP說到底還是一個通訊協議從low-level層面舉例來說,AMQP本身是應用層的協議,其填充於TCP協議的資料部分。

從high-level層面來說,AMQP是通過協議命令進行互動的。命令類似HTTP中的方法(GET PUT POST DELETE等)。

通道(Channel)在AMQP是一個很重要的概念,大多數操作都是在通道這個層面展開的

我們完全可以用Connection就能完成通道的工作,為什麼還要引入通道?

試想:一個程式中有很多個執行緒需要從RabbitMQ中消費訊息,或者生產訊息,那麼必然需要建立很多個Connection,也就是多個TCP連線。

釋出訂閱模式

廣播模式 topic

  所謂廣播指的是一條訊息將被所有的消費者進行處理。

直連模式 director

  直連模式的特點主要就是routingkey的使用,如果現在該訊息就要求指定一個具備有指定Routingkey的操作者進行處理,那麼只需要兩個的Routingkey匹配即可。

  可以將Routingkey比喻一個唯一標記,這樣就可以將訊息準確的推送到消費者手中了。

主題模式 fanout

  主題模式類似於廣播模式與直連模式的整合操作,所有的消費者都可以接收到主題資訊,但是如果要想進行正確的處理,則一定需要有一個合適的Routingkey完成操作。

交換器相當於投遞包裹的郵箱,Routingkey相當於包裹的地址,BindingKey相當於包裹的目的地。

當填寫在包裹上的地址和要投遞的地址相匹配時,那麼這個包裹就會正確投遞到目的地,最後這個目的地的主人(佇列)可以保留這個包裹。

如果填寫地址出錯,郵遞員不能正確的投遞到目的地,包裹可能被退回給寄件人,也有可能被丟棄。

RabbitMQ官方文件和API都把Routingkey和BingdingKey都看做Routingkey下面程式碼中紅色部分 就都當Routingkey使用

訊息生產者

public class MessageProducer {
    private
static final String EXCHANGE_NAME ="com.sunkun.topic";//訊息佇列名稱 private static final String HOST="192.168.1.105"; private static final int PORT=5672; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory();//建立一個連線工廠 factory.setHost(HOST); factory.setPort(PORT); factory.setUsername("sunkun"); factory.setPassword("123456"); //factory.setVirtualHost(virtualHost) 使用虛擬主機的最大好處 可以區分不同使用者的操作空間 每一個虛擬主機有一個自己的空間管理 Connection conn = factory.newConnection();//定義一個新的RabbitMQ的連線 Channel channel = conn.createChannel();//建立一個通訊的通道 //定義該通道要使用的佇列名稱 此時佇列已經建立過了 //第一個引數 佇列名稱(這個佇列可能存在也可能不存在) //第二個引數 是否持久儲存 //第三個引數 此佇列是否為專用的佇列資訊 //第四個引數 是否允許自動刪除 //channel.queueDeclare(QUENE_NAME, true, false, true,null); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); long start = System.currentTimeMillis(); System.out.println("訊息開始"+start); for(int i=0;i<1000;i++){ String message = "sk - "+i; if(i%2==0){ //MessageProperties.PERSISTENT_TEXT_PLAIN 訊息持久化 channel.basicPublish(EXCHANGE_NAME, "sk1", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());//進行訊息傳送 }else{ channel.basicPublish(EXCHANGE_NAME, "sk2", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());//進行訊息傳送 } } long end = System.currentTimeMillis(); System.out.println("訊息花費時間"+(end-start)); channel.close(); } }

訊息消費者

public class MessageConsumer {
    private static final String EXCHANGE_NAME ="com.sunkun.topic";//訊息佇列名稱
    private static final String HOST="192.168.1.105";
    private static final int PORT=15672;
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();//建立一個連線工廠
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername("sunkun");
        factory.setPassword("123456");
        Connection conn = factory.newConnection();//定義一個新的RabbitMQ的連線
        Channel channel = conn.createChannel();//建立一個通訊的通道
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();//通過通道獲取一個佇列名稱
        channel.queueBind(queueName, EXCHANGE_NAME, "sk2");//進行繫結處理
        //在RabbitMQ裡面,所有的消費者資訊是通過一個回撥方法完成的
        Consumer consumer = new DefaultConsumer(channel){//需要複寫指定的方法實現訊息處理
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body);
                System.out.println("消費者sk2:"+message);//可以啟動多個消費者
                super.handleDelivery(consumerTag, envelope, properties, body);
            }
        };
        channel.basicConsume(queueName,consumer);
    }
}

RabbitMQ如何保證訊息的可靠性

1)持久化

持久化可以提高RabbitMQ的可靠性,防止在異常情況(重啟,關閉,宕機)下的資料丟失。

持久化可分為三個部分:交換器的持久化,佇列的持久化和訊息的持久化。

交換器的持久化:是通過宣告交換器時將druable引數設定為true來實現的。如果交換器不設定持久化,那麼在RabbitMQ重啟之後相關的交換器元資料會丟失,不過訊息不會丟失,只是不能將訊息傳送到這個交換器中了。

對於一個長期使用的交換器來說,建議其置為持久化。(訊息不直接往佇列發,往exchange傳送 可以實現廣播模式)

佇列的持久化:是通過宣告佇列時將durable引數置為true實現的。如果佇列不設定持久化,那麼在RabbitMQ服務重啟之後,相關佇列的元資料會丟失,此時資料也會丟失。

訊息的持久化:因為佇列的持久化能保證其本身的元資料不會因為異常情況而丟失,但是不能保證內部儲存的訊息不會丟失。要確保訊息不會丟失,需求將其設定為持久化。

通過將訊息的投遞模式(MessageProperties.PERSISTENT_TEXT_PLAIN)即可實現訊息的持久化

2)叢集

本文主要講映象佇列

在持久化的訊息正確存入到RabbitMQ之後 還需要一段時間(雖然時間很短,但不可忽視)才能存入磁碟中,如果這段時間發生了宕機,訊息儲存還沒來得及落盤,那麼這些訊息將會丟失。

這裡可以引入RabbitMQ的映象佇列機制,相當於配置了副本,如果主節點(master)在此特殊時間內掛掉,可以自動切換到從節點(Slave),在實際生產環境中的關鍵業務佇列都會設定映象佇列。

提醒:所謂的映象佇列只是進行資料的副本而已,在所謂的RabbitMQ叢集裡面並不支援HA機制以及所謂的負載均衡,如果說現在一臺主機掛掉了,那麼其他主機肯定無法進行合理讀取的。

如果想要安全的使用RabbitMQ就要繼續追加負載均衡元件,列如HAProxy LVS等等,如果要保證負載均衡元件的高可用,還應該繼續追加KeepAlive元件(就像tomcat實現負載均衡 需要nginx一樣)。

3)生產者確認

除上面兩個問題外 我們還遇到一個新問題:當訊息的生產者將訊息傳送出去之後,訊息到底有沒有正確的到達伺服器呢?

如果訊息到達伺服器之前就丟失,那麼持久化也解決不了問題,因為訊息就沒有到達伺服器,何談持久化呢。

通常會有兩種方法解決此問題一時事物機制,只有訊息被成功接收,事物才能提交成功,否則便可在捕獲異常之後進行事物回滾,於此同時可以進行訊息重發。

但使用事物機制會大大降低RabbitMQ的效能,我們一般採取傳送方確認機制。

傳送方確認機制:生產者將通道設定成confirm模式,一旦通道進入confirm模式,所有在該通道上面釋出的訊息都會被指派一個唯一的ID(從1開始),

一旦訊息被投遞到所有的匹配佇列之後,RabbitMQ就會發送一個確認給生產者(包含訊息的唯一ID),這就使得生產者知曉訊息已經到達目的地了。

如果訊息和佇列是持久化的,那麼訊息確認會在訊息寫入磁碟後發出。