瞭解一下RabbitMQ
RabbitMQ概述
RabbitMQ是遵從AMQP協議的 通訊協議都設計到報文互動,換句話說RabbitMQ就是AMQP協議的Erlang的實現。
AMQP說到底還是一個通訊協議從low-level層面舉例來說,AMQP本身是應用層的協議,其填充於TCP協議的資料部分。
從high-level層面來說,AMQP是通過協議命令進行互動的。命令類似HTTP中的方法(GET PUT POST DELETE等)。
通道(Channel)在AMQP是一個很重要的概念,大多數操作都是在通道這個層面展開的
我們完全可以用Connection就能完成通道的工作,為什麼還要引入通道?
試想:一個程式中有很多個執行緒需要從RabbitMQ中消費訊息,或者生產訊息,那麼必然需要建立很多個Connection,也就是多個TCP連線。
釋出訂閱模式
廣播模式 topic
所謂廣播指的是一條訊息將被所有的消費者進行處理。
直連模式 director
直連模式的特點主要就是routingkey的使用,如果現在該訊息就要求指定一個具備有指定Routingkey的操作者進行處理,那麼只需要兩個的Routingkey匹配即可。
可以將Routingkey比喻一個唯一標記,這樣就可以將訊息準確的推送到消費者手中了。
主題模式 fanout
主題模式類似於廣播模式與直連模式的整合操作,所有的消費者都可以接收到主題資訊,但是如果要想進行正確的處理,則一定需要有一個合適的Routingkey完成操作。
交換器相當於投遞包裹的郵箱,Routingkey相當於包裹的地址,BindingKey相當於包裹的目的地。
當填寫在包裹上的地址和要投遞的地址相匹配時,那麼這個包裹就會正確投遞到目的地,最後這個目的地的主人(佇列)可以保留這個包裹。
如果填寫地址出錯,郵遞員不能正確的投遞到目的地,包裹可能被退回給寄件人,也有可能被丟棄。
RabbitMQ官方文件和API都把Routingkey和BingdingKey都看做Routingkey下面程式碼中紅色部分 就都當Routingkey使用
訊息生產者
public class MessageProducer { privatestatic final String EXCHANGE_NAME ="com.sunkun.topic";//訊息佇列名稱 private static final String HOST="192.168.1.105"; private static final int PORT=5672; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory();//建立一個連線工廠 factory.setHost(HOST); factory.setPort(PORT); factory.setUsername("sunkun"); factory.setPassword("123456"); //factory.setVirtualHost(virtualHost) 使用虛擬主機的最大好處 可以區分不同使用者的操作空間 每一個虛擬主機有一個自己的空間管理 Connection conn = factory.newConnection();//定義一個新的RabbitMQ的連線 Channel channel = conn.createChannel();//建立一個通訊的通道 //定義該通道要使用的佇列名稱 此時佇列已經建立過了 //第一個引數 佇列名稱(這個佇列可能存在也可能不存在) //第二個引數 是否持久儲存 //第三個引數 此佇列是否為專用的佇列資訊 //第四個引數 是否允許自動刪除 //channel.queueDeclare(QUENE_NAME, true, false, true,null); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); long start = System.currentTimeMillis(); System.out.println("訊息開始"+start); for(int i=0;i<1000;i++){ String message = "sk - "+i; if(i%2==0){ //MessageProperties.PERSISTENT_TEXT_PLAIN 訊息持久化 channel.basicPublish(EXCHANGE_NAME, "sk1", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());//進行訊息傳送 }else{ channel.basicPublish(EXCHANGE_NAME, "sk2", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());//進行訊息傳送 } } long end = System.currentTimeMillis(); System.out.println("訊息花費時間"+(end-start)); channel.close(); } }
訊息消費者
public class MessageConsumer { private static final String EXCHANGE_NAME ="com.sunkun.topic";//訊息佇列名稱 private static final String HOST="192.168.1.105"; private static final int PORT=15672; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory();//建立一個連線工廠 factory.setHost(HOST); factory.setPort(PORT); factory.setUsername("sunkun"); factory.setPassword("123456"); Connection conn = factory.newConnection();//定義一個新的RabbitMQ的連線 Channel channel = conn.createChannel();//建立一個通訊的通道 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue();//通過通道獲取一個佇列名稱 channel.queueBind(queueName, EXCHANGE_NAME, "sk2");//進行繫結處理 //在RabbitMQ裡面,所有的消費者資訊是通過一個回撥方法完成的 Consumer consumer = new DefaultConsumer(channel){//需要複寫指定的方法實現訊息處理 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("消費者sk2:"+message);//可以啟動多個消費者 super.handleDelivery(consumerTag, envelope, properties, body); } }; channel.basicConsume(queueName,consumer); } }
RabbitMQ如何保證訊息的可靠性
1)持久化
持久化可以提高RabbitMQ的可靠性,防止在異常情況(重啟,關閉,宕機)下的資料丟失。
持久化可分為三個部分:交換器的持久化,佇列的持久化和訊息的持久化。
交換器的持久化:是通過宣告交換器時將druable引數設定為true來實現的。如果交換器不設定持久化,那麼在RabbitMQ重啟之後相關的交換器元資料會丟失,不過訊息不會丟失,只是不能將訊息傳送到這個交換器中了。
對於一個長期使用的交換器來說,建議其置為持久化。(訊息不直接往佇列發,往exchange傳送 可以實現廣播模式)
佇列的持久化:是通過宣告佇列時將durable引數置為true實現的。如果佇列不設定持久化,那麼在RabbitMQ服務重啟之後,相關佇列的元資料會丟失,此時資料也會丟失。
訊息的持久化:因為佇列的持久化能保證其本身的元資料不會因為異常情況而丟失,但是不能保證內部儲存的訊息不會丟失。要確保訊息不會丟失,需求將其設定為持久化。
通過將訊息的投遞模式(MessageProperties.PERSISTENT_TEXT_PLAIN)即可實現訊息的持久化
2)叢集
本文主要講映象佇列
在持久化的訊息正確存入到RabbitMQ之後 還需要一段時間(雖然時間很短,但不可忽視)才能存入磁碟中,如果這段時間發生了宕機,訊息儲存還沒來得及落盤,那麼這些訊息將會丟失。
這裡可以引入RabbitMQ的映象佇列機制,相當於配置了副本,如果主節點(master)在此特殊時間內掛掉,可以自動切換到從節點(Slave),在實際生產環境中的關鍵業務佇列都會設定映象佇列。
提醒:所謂的映象佇列只是進行資料的副本而已,在所謂的RabbitMQ叢集裡面並不支援HA機制以及所謂的負載均衡,如果說現在一臺主機掛掉了,那麼其他主機肯定無法進行合理讀取的。
如果想要安全的使用RabbitMQ就要繼續追加負載均衡元件,列如HAProxy LVS等等,如果要保證負載均衡元件的高可用,還應該繼續追加KeepAlive元件(就像tomcat實現負載均衡 需要nginx一樣)。
3)生產者確認
除上面兩個問題外 我們還遇到一個新問題:當訊息的生產者將訊息傳送出去之後,訊息到底有沒有正確的到達伺服器呢?
如果訊息到達伺服器之前就丟失,那麼持久化也解決不了問題,因為訊息就沒有到達伺服器,何談持久化呢。
通常會有兩種方法解決此問題一時事物機制,只有訊息被成功接收,事物才能提交成功,否則便可在捕獲異常之後進行事物回滾,於此同時可以進行訊息重發。
但使用事物機制會大大降低RabbitMQ的效能,我們一般採取傳送方確認機制。
傳送方確認機制:生產者將通道設定成confirm模式,一旦通道進入confirm模式,所有在該通道上面釋出的訊息都會被指派一個唯一的ID(從1開始),
一旦訊息被投遞到所有的匹配佇列之後,RabbitMQ就會發送一個確認給生產者(包含訊息的唯一ID),這就使得生產者知曉訊息已經到達目的地了。
如果訊息和佇列是持久化的,那麼訊息確認會在訊息寫入磁碟後發出。