RabbitMQ知識點整理11-消費訊息
Rabb itMQ 的消費模式分兩種: 推( Push )模式和拉( Pull )模式。推模式採用Basic.Consume進行消費,而拉模式則是呼叫Basic.Get 進行消費。
推模式
在推模式中,可以通過持續訂閱的方式來消費訊息,使用到的相關類有:
import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer;
接收訊息一般通過實現Consumer介面或者繼承DefaultConsumer 類來實現。當呼叫與Consumer相關API的時候, 不同的訂閱採用不同的消費標籤(consumerTag)來區分彼此, 在同一個channel中的消費者也需要通過唯一的消費者標籤以作區分, 關鍵消費程式碼如下:
public class ConsumerTest { final private static String QUEUE_NAME = "queue_demo"; final private static String IP_ADDRESS = "172.16.176.40"; final private static int PORT = 15672; /** * */ @Test public void consumerTest() throws IOException, TimeoutException {View Code// final ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(IP_ADDRESS); connectionFactory.setPort(PORT); connectionFactory.setUsername("root"); connectionFactory.setPassword("root123"); // final Connection connection = connectionFactory.newConnection();final Channel channel = connection.createChannel(); channel.basicQos(64); boolean autoAck = false; Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope , AMQP.BasicProperties properties, byte[] body) throws IOException { // 得到路由鍵 final String routingKey = envelope.getRoutingKey(); final String contentType = properties.getContentType(); final long deliveryTag = envelope.getDeliveryTag(); // 接受的內容 String content = new String(body); // 這裡處理訊息 channel.basicAck(deliveryTag, false); } }; channel.basicConsume(QUEUE_NAME, autoAck, "myConsumerTag", consumer); } }
注意, 上面的程式碼中顯示的設定channel.basicConsume的引數autoAck為false, 然後在接收到訊息處理之後, 在進行顯式的ack操作(channel.basicAck), 對於消費者來說這是非常有必要的, 可以防止訊息不必要的丟失.
channel類中的basicConsume方法有如下幾種形式:
1.String basicConsume(String queue, Consumer callback) throws IOException;
2.String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException; 3.String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException;
4.String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException; 5.String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
其對應的引數說明如下所述:
queue: 佇列的名稱
autoAck: 設定是否自動確認, 建議設定成false, 即不自動確認
consumerTag: 消費者標籤, 用來區分多個消費者
noLocal: 設定為true, 則表示不能將同一個Connection中生產者傳送的訊息傳遞給這個Connection中的消費者
exclusive: 是否排他
arguments: 設定消費者的其他引數
callback: 設定消費者的回撥函式, 用來處理rabbitmq推送過來的訊息, 比如DefaultConsumer, 使用時需要客戶端重寫其中的方法
對於消費者來說, 重寫handleDelivery方法是十分方便的, 更復雜的消費者客戶端會重寫更多的方法, 如下:
void handleConsumeOk(String consumerTag) ; void handleCancelOk(String consumerTag); void handleCancel(String consumerTag) throws IOException; void handleShutdownSignal(String consumerTag , ShutdownSignalException sig) ; void handleRecoverOk(String consumerTag);
比如handleShutdownSignal方法, 在channel或connection關閉的時候會呼叫, 再者, handleConsumeOk方法會在其他方法之前呼叫, 返回消費者標籤.
重寫handleCancelOk和handleCancel方法, 這樣消費端可以顯式的或隱式的取消訂閱的時候呼叫, 也可以通過channel.basicCancel方法來顯式的取消一個消費者的訊息訂閱:
channel.basicCancel(consumerTag);
注意上面這行程式碼會先觸發handleConsumerOk方法, 之後觸發handleDelivery方法, 最後才觸發handleCancelOk方法
和生產者一樣,消費者客戶端同樣需要考慮執行緒安全的問題, 消費者客戶端的這些callback會被分配到與Channel不同的執行緒池上, 這意味著消費者客戶端可以安全地呼叫這些阻塞方
法,比如channel.queueDeclare 、channel.basicCancel 等
每個Channel 都擁有自己獨立的執行緒, 最常用的做法是一個Channel 對應一個消費者,也就是意味著消費者彼此之間沒有任何關聯, 當然也可以在一個Channel 中維持多個消費者,但是要注意一個問題,如果Channel 中的一個消費者一直在執行,那麼其他消費者的callback會被"耽擱"。
拉模式
通過channel.basicGet 方法可以單條地獲取訊息,其返回值是GetRespone, Channel 類的basicGet 方法沒有其他過載方法, 只有:
GetResponse basicGet(String queue, boolean autoAck) throws IOException;
其中queue 代表隊列的名稱,如果設定autoAck 為false , 那麼像推模式那樣需要呼叫channel.basicAck 來確認訊息己被成功接收。
拉模式的關鍵程式碼如下:
** * 拉模式 */ @Test public void consumerTest2() throws IOException { // final ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(IP_ADDRESS); connectionFactory.setPort(PORT); connectionFactory.setUsername("root"); connectionFactory.setPassword("root123"); // final Connection connection = connectionFactory.newConnection(); final Channel channel = connection.createChannel(); final GetResponse response = channel.basicGet(QUEUE_NAME, false); // 獲取訊息 String content = new String(response.getBody()); // 處理訊息確認 channel.basicAck(response.getEnvelope().getDeliveryTag(), false); }View Code
注意要點:
Basic.Consume將通道(Channel) 置為接收模式,直到取消佇列的訂閱為止。在接收模式期間, RabbitMQ 會不斷地推送訊息給消費者,當然推送訊息的個數還是會受到Basic.Qos的限制.如果只想從佇列獲得單條訊息而不是持續訂閱,建議還是使用Basic.Get 進行消費.但是不能將Basic.Get 放在一個迴圈裡來代替Basic.Consume,這樣做會嚴重影RabbitMQ的效能.如果要實現高吞吐量,消費者理應使用Basic.Consume 方法。