1. 程式人生 > 實用技巧 >RabbitMQ筆記-basicConsume、basicCancel、basicQos、basicPublish等方法詳解

RabbitMQ筆記-basicConsume、basicCancel、basicQos、basicPublish等方法詳解

basicConsume:由服務端主動PUSH訊息過來,方法接收到訊息後進行處理;而每個方法處理接收到的訊息相差不大,下面詳細介紹每個方法的引數詳情;

1.String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)
   /**
         * 啟動一個消費者,並返回服務端生成的消費者標識
         * queue:佇列名
         * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器
         * deliverCallback: 當一個訊息傳送過來後的回撥介面
         * cancelCallback:當一個消費者取消訂閱時的回撥介面;取消消費者訂閱佇列時除了使用{@link Channel#basicCancel}之外的所有方式都會呼叫該回調方法
         * @return 服務端生成的消費者標識
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
            System.out.println("呼叫"+consumerTag);
        });

2.String basicConsume(String queue, boolean autoAck, Consumer callback)
  /**
         * 啟動一個消費者,並返回服務端生成的消費者標識
         * queue:佇列名
         * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器
         * callback: 消費者物件的回撥介面
         * @return 服務端生成的消費者標識
         */
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){});

使用介面com.rabbitmq.client.Consumer的實現類com.rabbitmq.client.DefaultConsumer實現自定義訊息監聽器,介面中有多個不同的方法可以根據自己系統的需要實現;

3.String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
/**
         * queue:佇列名
         * deliverCallback: 當一個訊息傳送過來後的回撥介面
         * cancelCallback:當一個消費者取消訂閱時的回撥介面;取消消費者訂閱佇列時除了使用{@link Channel#basicCancel}之外的所有方式都會呼叫該回調方法
         * shutdownSignalCallback: 當channel/connection 關閉後回撥
         * @return 服務端生成的消費者標識
         */
        channel.basicConsume(QUEUE_NAME, deliverCallback, consumerTag -> {}, (consumerTag, sig) -> {});

4.String basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
/**
         * queue:佇列名
         * deliverCallback: 當一個訊息傳送過來後的回撥介面
         * shutdownSignalCallback: 當channel/connection 關閉後回撥
         * @return 服務端生成的消費者標識
         */
        channel.basicConsume(QUEUE_NAME, deliverCallback, (consumerTag, sig) -> {});

5.String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback)
 /**
         * queue:佇列名
         * deliverCallback: 當一個訊息傳送過來後的回撥介面
         * cancelCallback:當一個消費者取消訂閱時的回撥介面;取消消費者訂閱佇列時除了使用{@link Channel#basicCancel}之外的所有方式都會呼叫該回調方法
         * @return 服務端生成的消費者標識
         */
        channel.basicConsume(QUEUE_NAME, deliverCallback, consumerTag -> {});

6.String basicConsume(String queue, Consumer callback)
  /**
         * queue:佇列名
         * callback: 消費者物件的回撥介面
         * @return 服務端生成的消費者標識
         */
        channel.basicConsume(QUEUE_NAME, new DefaultConsumer(channel){});

7.String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
 /**
         * 啟動一個消費者,並返回服務端生成的消費者標識
         * queue:佇列名
         * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器
         * deliverCallback: 當一個訊息傳送過來後的回撥介面
         * shutdownSignalCallback: 當channel/connection 關閉後回撥
         * @return 服務端生成的消費者標識
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, (consumerTag, sig)->{
            //consumerTag服務端生成的消費者標識
            //sig(ShutdownSignalException):說明關閉的原因
        });

8.String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
 /**
         * 啟動一個消費者,並返回服務端生成的消費者標識
         * queue:佇列名
         * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器
         * deliverCallback: 當一個訊息傳送過來後的回撥介面
         * cancelCallback:當一個消費者取消訂閱時的回撥介面;取消消費者訂閱佇列時除了使用{@link Channel#basicCancel}之外的所有方式都會呼叫該回調方法
         * shutdownSignalCallback: 當channel/connection 關閉後回撥
         * @return 服務端生成的消費者標識
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag->{
            //consumerTag:服務端小費制標識
        }, (consumerTag, sig) -> {
            //consumerTag服務端生成的消費者標識
            //sig(ShutdownSignalException):說明關閉的原因
        });

9.String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback)
  /**
         * 啟動一個消費者,並返回服務端生成的消費者標識
         * queue:佇列名
         * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器
         * arguments: 消費的一組引數
         * callback:消費者物件介面
         * @return 服務端生成的消費者標識
         */
        channel.basicConsume(QUEUE_NAME, true, arguments, new DefaultConsumer(channel){
          //根據需要實現對應的方法
        });

10.String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback)
 /**
         * 啟動一個消費者,並返回服務端生成的消費者標識
         * queue:佇列名
         * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器
         * arguments: 消費的一組引數
         * deliverCallback: 當一個訊息傳送過來後的回撥介面
         * cancelCallback:當一個消費者取消訂閱時的回撥介面;取消消費者訂閱佇列時除了使用{@link Channel#basicCancel}之外的所有方式都會呼叫該回調方法
         * @return 服務端生成的消費者標識
         */
        channel.basicConsume(QUEUE_NAME, true, arguments, deliverCallback, consumerTag -> {});

11.String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
/**
         * 啟動一個消費者,並返回服務端生成的消費者標識
         * queue:佇列名
         * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器
         * arguments: 消費的一組引數
         * deliverCallback: 當一個訊息傳送過來後的回撥介面
         * shutdownSignalCallback: 當channel/connection 關閉後回撥
         * @return 服務端生成的消費者標識
         */
        channel.basicConsume(QUEUE_NAME, true, arguments, deliverCallback, (consumerTag, sig) -> {});

12.String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
 /**
         * 啟動一個消費者,並返回服務端生成的消費者標識
         * queue:佇列名
         * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器
         * arguments: 消費的一組引數
         * deliverCallback: 當一個訊息傳送過來後的回撥介面
         * cancelCallback:當一個消費者取消訂閱時的回撥介面;取消消費者訂閱佇列時除了使用{@link Channel#basicCancel}之外的所有方式都會呼叫該回調方法
         * shutdownSignalCallback: 當channel/connection 關閉後回撥
         * @return 服務端生成的消費者標識
         */
        channel.basicConsume(QUEUE_NAME, true, arguments, deliverCallback, consumerTag -> {}, (consumerTag, sig) -> {});

13.String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback)
 /**
         * 啟動一個消費者,並返回服務端生成的消費者標識
         * queue:佇列名
         * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器
         * consumerTag: 客戶端生成的用於建立上線文的使用者標識
         * callback:消費者物件介面
         * @return 與消費者關聯的消費者標識
         */
        channel.basicConsume(QUEUE_NAME, true, ctag, new DefaultConsumer(channel){
            //根據需要實現具體的方法
        });

14.String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback)
  /**
         * 啟動一個消費者,並返回服務端生成的消費者標識
         * queue:佇列名
         * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器
         * consumerTag:客戶端生成的一個消費者標識
         * deliverCallback: 當一個訊息傳送過來後的回撥介面
         * cancelCallback:當一個消費者取消訂閱時的回撥介面;取消消費者訂閱佇列時除了使用{@link Channel#basicCancel}之外的所有方式都會呼叫該回調方法
         * @return 與消費者關聯的消費者標識
         */
        channel.basicConsume(QUEUE_NAME, true, ctag, deliverCallback, consumerTag -> {});

15.String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
/**
         * 啟動一個消費者,並返回服務端生成的消費者標識
         * queue:佇列名
         * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器
         * consumerTag:客戶端生成的一個消費者標識
         * deliverCallback: 當一個訊息傳送過來後的回撥介面
         * shutdownSignalCallback: 當channel/connection 關閉後回撥
         * @return 與消費者關聯的消費者標識
         */
        channel.basicConsume(QUEUE_NAME, true, ctag, deliverCallback, (consumerTag, sig) -> {});

16.String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
  /**
         * 啟動一個消費者,並返回服務端生成的消費者標識
         * queue:佇列名
         * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器
         * consumerTag:客戶端生成的一個消費者標識
         * deliverCallback: 當一個訊息傳送過來後的回撥介面
         * cancelCallback:當一個消費者取消訂閱時的回撥介面;取消消費者訂閱佇列時除了使用{@link Channel#basicCancel}之外的所有方式都會呼叫該回調方法
         * shutdownSignalCallback: 當channel/connection 關閉後回撥
         * @return 與消費者關聯的消費者標識
         */
        channel.basicConsume(QUEUE_NAME, true, ctag, deliverCallback, consumerTag -> {}, (consumerTag, sig) -> {});

17.String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback)
 /**
         * queue:佇列名
         * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器
         * consumerTag:客戶端生成的一個消費者標識
         * nolocal:如果伺服器不應將在此通道連線上釋出的訊息傳遞給此使用者,則為true;請注意RabbitMQ伺服器上不支援此標記
         * exclusive: 如果是單個消費者,則為true
         * callback:消費者物件介面
         * @return 與消費者關聯的消費者標識
         */
        channel.basicConsume(QUEUE_NAME, true, ctag, false, false, arguments, new DefaultConsumer(channel){
            //根據需求實現方法
        });

18.String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback)
 /**
         * queue:佇列名
         * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器
         * consumerTag:客戶端生成的一個消費者標識
         * nolocal:如果伺服器不應將在此通道連線上釋出的訊息傳遞給此使用者,則為true;請注意RabbitMQ伺服器上不支援此標記
         * exclusive: 如果是單個消費者,則為true
         * arguments:消費的一組引數
         * deliverCallback: 當一個訊息傳送過來後的回撥介面
         * cancelCallback:當一個消費者取消訂閱時的回撥介面;取消消費者訂閱佇列時除了使用{@link Channel#basicCancel}之外的所有方式都會呼叫該回調方法
         * @return 與消費者關聯的消費者標識
         */
        channel.basicConsume(QUEUE_NAME, true, ctag, false, false, arguments, deliverCallback, consumerTag -> {});

19.String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
 /**
         * queue:佇列名
         * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器
         * consumerTag:客戶端生成的一個消費者標識
         * nolocal:如果伺服器不應將在此通道連線上釋出的訊息傳遞給此使用者,則為true;請注意RabbitMQ伺服器上不支援此標記
         * exclusive: 如果是單個消費者,則為true
         * arguments:消費的一組引數
         * deliverCallback: 當一個訊息傳送過來後的回撥介面
         * shutdownSignalCallback: 當channel/connection 關閉後回撥
         */
        channel.basicConsume(QUEUE_NAME, true, ctag, false, false, arguments, deliverCallback, (consumerTag, sig) -> {});

20.String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
	 /**
         * queue:佇列名
         * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器
         * consumerTag:客戶端生成的一個消費者標識
         * nolocal:如果伺服器不應將在此通道連線上釋出的訊息傳遞給此使用者,則為true;請注意RabbitMQ伺服器上不支援此標記
         * exclusive: 如果是單個消費者,則為true
         * arguments:消費的一組引數
         * deliverCallback: 當一個訊息傳送過來後的回撥介面
         * cancelCallback:當一個消費者取消訂閱時的回撥介面;取消消費者訂閱佇列時除了使用{@link Channel#basicCancel}之外的所有方式都會呼叫該回調方法
         * shutdownSignalCallback: 當channel/connection 關閉後回撥
         */
        channel.basicConsume(QUEUE_NAME, true, ctag, false, false, arguments, deliverCallback, consumerTag -> {}, (consumerTag, sig) -> {});

basicGet:主動拉取佇列中的一條訊息

GetResponse basicGet(String queue, boolean autoAck)

        /**
         * 從訊息佇列中取出第一條訊息;整個方法的執行過程是首先消費佇列,然後檢索第一條訊息,然後再取消訂閱
         */
        GetResponse response = channel.basicGet(QUEUE_NAME, true);
        System.out.println("消費者接收到的訊息是:"+new String(response.getBody(), "UTF-8"));

取消消費者訂閱

/**
* 取消消費者對佇列的訂閱關係
* consumerTag:伺服器端生成的消費者標識
**/
void basicCancel(String consumerTag)

basicQoc設定服務端每次傳送給消費者的訊息數量

/**
* prefetchSize:伺服器傳送最大內容量(以八位位元組計算),如果沒有限制,則為0
* prefetchCount:伺服器每次傳遞的最大訊息數,如果沒有限制,則為0;
* global:如果為true,則當前設定將會應用於整個Channel(頻道)
**/
void basicQos(int prefetchSize, int prefetchCount, boolean global)

/**
* prefetchCount:伺服器每次傳遞的最大訊息數,如果沒有限制,則為0;
* global:如果為true,則當前設定將會應用於整個Channel(頻道)
**/
void basicQos(int prefetchCount, boolean global)

/**
* prefetchCount:伺服器每次傳遞的最大訊息數,如果沒有限制,則為0;
**/
void basicQos(int prefetchCount)

Ack(確認)收到一個或者多個訊息

/**
* 消費者確認收到一個或者多個訊息
* deliveryTag:伺服器端向消費者推送訊息,訊息會攜帶一個deliveryTag引數,也可以成此引數為訊息 * 的唯一標識,是一個遞增的正整數
* multiple:true表示確認所有訊息,包括訊息唯一標識小於等於deliveryTag的訊息,false只確認 * * deliveryTag指定的訊息
**/
void basicAck(long deliveryTag, boolean multiple)

/**
* 要求代理重新發送未確認的訊息
* requeue:如果為true,訊息將會重新入隊,可能會被髮送給其它的消費者;如果為false,訊息將會發送給* 相同的消費者
**/
Basic.RecoverOk basicRecover(boolean requeue)

/**
* 要求代理重新發送未確認的訊息;訊息將會重新排隊,並且可能會發送給其它的消費者
**/
Basic.RecoverOk basicRecover()

拒絕訊息

/**
* 拒絕接收到的一個或者多個訊息
* deliveryTag:接收到訊息的唯一標識
* multiple: true表示拒絕所有的訊息,包括提供的deliveryTag;false表示僅拒絕提供的deliveryTag
* requeue:true 表示拒絕的訊息應重新入隊,而不是否丟棄
*/
void basicNack(long deliveryTag, boolean multiple, boolean requeue)

/**
* 拒絕接收到的一個或者多個訊息
* deliveryTag:接收到訊息的唯一標識
* requeue:true 表示拒絕的訊息應重新入隊,而不是否丟棄
*/
void basicReject(long deliveryTag, boolean requeue) 

傳送訊息

/**
* exchange:要將訊息傳送到的Exchange(交換器)
* routingKey:路由Key
* mandatory:true 如果mandatory標記被設定
* immediate: true 如果immediate標記被設定,注意:RabbitMQ服務端不支援此標記
* props:其它的一些屬性,如:{@link MessageProperties.PERSISTENT_TEXT_PLAIN}
* body:訊息內容
**/
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)

/**
* 釋出訊息
* 釋出到不存在的交換器將導致通道級協議異常,該協議關閉通道,
* exchange: 要將訊息傳送到的交換器
* routingKey: 路由KEY
* mandatory:true 如果mandatory標記被設定
* props: 訊息的其它屬性,如:路由頭等
* body: 訊息體
*/
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)

/**
* 釋出訊息
* 釋出到不存在的交換器將導致通道級協議異常,該協議關閉通道,
* exchange: 要將訊息傳送到的交換器
* routingKey: 路由KEY
* props: 訊息的其它屬性,如:路由頭等
* body: 訊息體
*/
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)