RabbitMQ簡單佇列例項及原理解析
這篇文章主要介紹了RabbitMQ簡單佇列例項及原理解析,文中通過示例程式碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
RabbitMQ 簡述
RabbitMQ是一個訊息代理:它接受並轉發訊息。 您可以將其視為郵局:當您將要把寄發的郵件投遞到郵箱中時,您可以確信Postman 先生最終會將郵件傳送給收件人。 在這個比喻中,RabbitMQ是一個郵箱,郵局和郵遞員,用來接受,儲存和轉發二進位制資料塊的訊息。
佇列就像是在RabbitMQ中扮演郵箱的角色。 雖然訊息經過RabbitMQ和應用程式,但它們只能儲存在佇列中。 佇列只受主機的記憶體和磁碟限制的限制,它本質上是一個大的訊息緩衝區。 許多生產者可以傳送到一個佇列的訊息,許多消費者可以嘗試從一個佇列接收資料。
producer即為生產者,用來產生訊息傳送給佇列。consumer是消費者,需要去讀佇列內的訊息。producer,consumer和broker(rabbitMQ server)不必駐留在同一個主機上;確實在大多數應用程式中它們是這樣分佈的。
簡單佇列
簡單佇列是最簡單的一種模式,由生產者、佇列、消費者組成。生產者將訊息傳送給佇列,消費者從佇列中讀取訊息完成消費。
在下圖中,“P”是我們的生產者,“C”是我們的消費者。 中間的框是佇列 - RabbitMQ代表消費者的訊息緩衝區。
java 方式
生產者
package com.anqi.mq.nat; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class MyProducer { private static final String QUEUE_NAME = "ITEM_QUEUE"; public static void main(String[] args) throws Exception { //1. 建立一個 ConnectionFactory 並進行設定 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); //2. 通過連線工廠來建立連線 Connection connection = factory.newConnection(); //3. 通過 Connection 來建立 Channel Channel channel = connection.createChannel(); //實際場景中,訊息多為json格式的物件 String msg = "hello"; //4. 傳送三條資料 for (int i = 1; i <= 3 ; i++) { channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); System.out.println("Send message" + i +" : " + msg); } //5. 關閉連線 channel.close(); connection.close(); } }
/** * Declare a queue * @param queue the name of the queue * @param durable true if we are declaring a durable queue (the queue will survive a server restart) * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) * @param arguments other properties (construction arguments) for the queue * @return a declaration-confirm method to indicate the queue was successfully declared * @throws java.io.IOException if an error is encountered */ Queue.DeclareOk queueDeclare(String queue,boolean durable,boolean exclusive,boolean autoDelete,Map<String,Object> arguments) throws IOException; /** * Publish a message * @see com.rabbitmq.client.AMQP.Basic.Publish * @param exchange the exchange to publish the message to * @param routingKey the routing key * @param props other properties for the message - routing headers etc * @param body the message body * @throws java.io.IOException if an error is encountered */ void basicPublish(String exchange,String routingKey,BasicProperties props,byte[] body) throws IOException; /** * Start a non-nolocal,non-exclusive consumer,with * a server-generated consumerTag. * @param queue the name of the queue * @param autoAck true if the server should consider messages * acknowledged once delivered; false if the server should expect * explicit acknowledgements * @param callback an interface to the consumer object * @return the consumerTag generated by the server * @throws java.io.IOException if an error is encountered * @see com.rabbitmq.client.AMQP.Basic.Consume * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk * @see #basicConsume(String,boolean,String,Map,Consumer) */ String basicConsume(String queue,boolean autoAck,Consumer callback) throws IOException;
消費者
package com.anqi.mq.nat; import com.rabbitmq.client.*; import java.io.IOException; public class MyConsumer { private static final String QUEUE_NAME = "ITEM_QUEUE"; public static void main(String[] args) throws Exception { //1. 建立一個 ConnectionFactory 並進行設定 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); //2. 通過連線工廠來建立連線 Connection connection = factory.newConnection(); //3. 通過 Connection 來建立 Channel Channel channel = connection.createChannel(); //4. 宣告一個佇列 channel.queueDeclare(QUEUE_NAME,true,false,null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); /* true:表示自動確認,只要訊息從佇列中獲取,無論消費者獲取到訊息後是否成功消費,都會認為訊息已經成功消費 false:表示手動確認,消費者獲取訊息後,伺服器會將該訊息標記為不可用狀態,等待消費者的反饋,如果消費者一 直沒有反饋,那麼該訊息將一直處於不可用狀態,並且伺服器會認為該消費者已經掛掉,不會再給其傳送訊息, 直到該消費者反饋。 */ //5. 建立消費者並接收訊息 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException { String message = new String(body,"UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; //6. 設定 Channel 消費者繫結佇列 channel.basicConsume(QUEUE_NAME,consumer); } }
Send message1 : hello Send message2 : hello Send message3 : hello [*] Waiting for messages. To exit press CTRL+C [x] Received 'hello' [x] Received 'hello' [x] Received 'hello'
當我們啟動生產者之後檢視RabbitMQ管理後臺可以看到有一條訊息正在等待被消費。
當我們啟動消費者之後再次檢視,可以看到積壓的一條訊息已經被消費。
總結
- 佇列宣告queueDeclare的引數:第一個引數表示佇列名稱、第二個引數為是否持久化(true表示是,佇列將在伺服器重啟時生存)、第三個引數為是否是獨佔佇列(建立者可以使用的私有佇列,斷開後自動刪除)、第四個引數為當所有消費者客戶端連線斷開時是否自動刪除佇列、第五個引數為佇列的其他引數。
- basicConsume的第二個引數autoAck: 應答模式,true:自動應答,即消費者獲取到訊息,該訊息就會從佇列中刪除掉,false:手動應答,當從佇列中取出訊息後,需要程式設計師手動呼叫方法應答,如果沒有應答,該訊息還會再放進佇列中,就會出現該訊息一直沒有被消費掉的現象。
- 這種簡單佇列的模式,系統會為每個佇列隱式地繫結一個預設交換機,交換機名稱為" (AMQP default)",型別為直連 direct,當你手動建立一個佇列時,系統會自動將這個佇列繫結到一個名稱為空的 Direct 型別的交換機上,繫結的路由鍵 routing key 與佇列名稱相同,相當於channel.queueBind(queue:"QUEUE_NAME",exchange:"(AMQP default)“,routingKey:"QUEUE_NAME");雖然例項沒有顯式宣告交換機,但是當路由鍵和佇列名稱一樣時,就會將訊息傳送到這個預設的交換機中。這種方式比較簡單,但是無法滿足複雜的業務需求,所以通常在生產環境中很少使用這種方式。
- The default exchange is implicitly bound to every queue,with a routing key equal to the queue name. It is not possible to explicitly bind to,or unbind from the default exchange. It also cannot be deleted.預設交換機隱式繫結到每個佇列,其中路由鍵等於佇列名稱。不可能顯式繫結到,或從預設交換中解除繫結。它也不能被刪除。——引自 RabbitMQ 官方文件
spring-amqp方式
引入 Maven 依賴
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.1.5.RELEASE</version> </dependency>
spring 配置檔案
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit https://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd"> <rabbit:connection-factory id="connectionFactory" host="localhost" virtual-host="/" username="guest" password="guest"/> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/> <rabbit:admin connection-factory="connectionFactory"/> <rabbit:queue name="MY-QUEUE"/> </beans>
使用測試
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Main { public static void main(String[] args) { ApplicationContext app = new ClassPathXmlApplicationContext("spring/rabbit-context.xml"); AmqpTemplate amqpTemplate = app.getBean(AmqpTemplate.class); amqpTemplate.convertAndSend("MY-QUEUE","Item"); String msg = (String) amqpTemplate.receiveAndConvert("MY-QUEUE"); System.out.println(msg); } }
參考方法
/** * Convert a Java object to an Amqp {@link Message} and send it to a specific exchange * with a specific routing key. * * @param exchange the name of the exchange * @param routingKey the routing key * @param message a message to send * @throws AmqpException if there is a problem */ void convertAndSend(String exchange,Object message) throws AmqpException; /** * Receive a message if there is one from a specific queue and convert it to a Java * object. Returns immediately,possibly with a null value. * * @param queueName the name of the queue to poll * @return a message or null if there is none waiting * @throws AmqpException if there is a problem */ @Nullable Object receiveAndConvert(String queueName) throws AmqpException;
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。