1. 程式人生 > 實用技巧 >RabbitMQ知識點整理11-消費訊息

RabbitMQ知識點整理11-消費訊息

Rabb itMQ 的消費模式分兩種: 推( Push )模式和拉( Pull )模式。推模式採用Basic.Consume進行消費,而拉模式則是呼叫Basic.Get 進行消費。

推模式

在推模式中,可以通過持續訂閱的方式來消費訊息,使用到的相關類有:

import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

接收訊息一般通過實現Consumer介面或者繼承DefaultConsumer 類來實現。當呼叫與Consumer相關API的時候, 不同的訂閱採用不同的消費標籤(consumerTag)來區分彼此, 在同一個channel中的消費者也需要通過唯一的消費者標籤以作區分, 關鍵消費程式碼如下:

public class ConsumerTest {

    final private static String QUEUE_NAME = "queue_demo";
    final private static String IP_ADDRESS = "172.16.176.40";
    final private static int PORT = 15672;

    /**
     *
     */
    @Test
    public void consumerTest() throws IOException, TimeoutException {
        
// final ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(IP_ADDRESS); connectionFactory.setPort(PORT); connectionFactory.setUsername("root"); connectionFactory.setPassword("root123"); // final Connection connection = connectionFactory.newConnection();
final Channel channel = connection.createChannel(); channel.basicQos(64); boolean autoAck = false; Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope , AMQP.BasicProperties properties, byte[] body) throws IOException { // 得到路由鍵 final String routingKey = envelope.getRoutingKey(); final String contentType = properties.getContentType(); final long deliveryTag = envelope.getDeliveryTag(); // 接受的內容 String content = new String(body); // 這裡處理訊息 channel.basicAck(deliveryTag, false); } }; channel.basicConsume(QUEUE_NAME, autoAck, "myConsumerTag", consumer); } }
View Code

注意, 上面的程式碼中顯示的設定channel.basicConsume的引數autoAck為false, 然後在接收到訊息處理之後, 在進行顯式的ack操作(channel.basicAck), 對於消費者來說這是非常有必要的, 可以防止訊息不必要的丟失.

channel類中的basicConsume方法有如下幾種形式:

1.String basicConsume(String queue, Consumer callback) throws IOException;
2.String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException; 3.String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException;
4.String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException; 5.
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;

其對應的引數說明如下所述:

queue: 佇列的名稱

autoAck: 設定是否自動確認, 建議設定成false, 即不自動確認

consumerTag: 消費者標籤, 用來區分多個消費者

noLocal: 設定為true, 則表示不能將同一個Connection中生產者傳送的訊息傳遞給這個Connection中的消費者

exclusive: 是否排他

arguments: 設定消費者的其他引數

callback: 設定消費者的回撥函式, 用來處理rabbitmq推送過來的訊息, 比如DefaultConsumer, 使用時需要客戶端重寫其中的方法

對於消費者來說, 重寫handleDelivery方法是十分方便的, 更復雜的消費者客戶端會重寫更多的方法, 如下:

void handleConsumeOk(String consumerTag) ;
void handleCancelOk(String consumerTag);
void handleCancel(String consumerTag) throws IOException;
void handleShutdownSignal(String consumerTag , ShutdownSignalException sig) ;
void handleRecoverOk(String consumerTag);

比如handleShutdownSignal方法, 在channel或connection關閉的時候會呼叫, 再者, handleConsumeOk方法會在其他方法之前呼叫, 返回消費者標籤.

重寫handleCancelOk和handleCancel方法, 這樣消費端可以顯式的或隱式的取消訂閱的時候呼叫, 也可以通過channel.basicCancel方法來顯式的取消一個消費者的訊息訂閱:

channel.basicCancel(consumerTag);

注意上面這行程式碼會先觸發handleConsumerOk方法, 之後觸發handleDelivery方法, 最後才觸發handleCancelOk方法

和生產者一樣,消費者客戶端同樣需要考慮執行緒安全的問題, 消費者客戶端的這些callback會被分配到與Channel不同的執行緒池上, 這意味著消費者客戶端可以安全地呼叫這些阻塞方
法,比如channel.queueDeclare 、channel.basicCancel 等

每個Channel 都擁有自己獨立的執行緒, 最常用的做法是一個Channel 對應一個消費者,也就是意味著消費者彼此之間沒有任何關聯, 當然也可以在一個Channel 中維持多個消費者,但是要注意一個問題,如果Channel 中的一個消費者一直在執行,那麼其他消費者的callback會被"耽擱"。

拉模式

通過channel.basicGet 方法可以單條地獲取訊息,其返回值是GetRespone, Channel 類的basicGet 方法沒有其他過載方法, 只有:

GetResponse basicGet(String queue, boolean autoAck) throws IOException;

其中queue 代表隊列的名稱,如果設定autoAck 為false , 那麼像推模式那樣需要呼叫channel.basicAck 來確認訊息己被成功接收。

拉模式的關鍵程式碼如下:

**
     * 拉模式
     */
    @Test
    public void consumerTest2() throws IOException {
        //
        final ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(IP_ADDRESS);
        connectionFactory.setPort(PORT);
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root123");

        //
        final Connection connection = connectionFactory.newConnection();
        final Channel channel = connection.createChannel();

        final GetResponse response = channel.basicGet(QUEUE_NAME, false);

        // 獲取訊息
        String content = new String(response.getBody());

        // 處理訊息確認
        channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
    }
View Code

注意要點:

Basic.Consume將通道(Channel) 置為接收模式,直到取消佇列的訂閱為止。在接收模式期間, RabbitMQ 會不斷地推送訊息給消費者,當然推送訊息的個數還是會受到Basic.Qos的限制.如果只想從佇列獲得單條訊息而不是持續訂閱,建議還是使用Basic.Get 進行消費.但是不能將Basic.Get 放在一個迴圈裡來代替Basic.Consume,這樣做會嚴重影RabbitMQ的效能.如果要實現高吞吐量,消費者理應使用Basic.Consume 方法。