RabbitMQ筆記-basicConsume、basicCancel、basicQos、basicPublish等方法詳解
阿新 • • 發佈:2020-09-09
basicConsume:由服務端主動PUSH訊息過來,方法接收到訊息後進行處理;而每個方法處理接收到的訊息相差不大,下面詳細介紹每個方法的引數詳情;
1.String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) /** * 啟動一個消費者,並返回服務端生成的消費者標識 * queue:佇列名 * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器 * deliverCallback: 當一個訊息傳送過來後的回撥介面 * cancelCallback:當一個消費者取消訂閱時的回撥介面;取消消費者訂閱佇列時除了使用{@link Channel#basicCancel}之外的所有方式都會呼叫該回調方法 * @return 服務端生成的消費者標識 */ channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { System.out.println("呼叫"+consumerTag); }); 2.String basicConsume(String queue, boolean autoAck, Consumer callback) /** * 啟動一個消費者,並返回服務端生成的消費者標識 * queue:佇列名 * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器 * callback: 消費者物件的回撥介面 * @return 服務端生成的消費者標識 */ channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){}); 使用介面com.rabbitmq.client.Consumer的實現類com.rabbitmq.client.DefaultConsumer實現自定義訊息監聽器,介面中有多個不同的方法可以根據自己系統的需要實現; 3.String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) /** * queue:佇列名 * deliverCallback: 當一個訊息傳送過來後的回撥介面 * cancelCallback:當一個消費者取消訂閱時的回撥介面;取消消費者訂閱佇列時除了使用{@link Channel#basicCancel}之外的所有方式都會呼叫該回調方法 * shutdownSignalCallback: 當channel/connection 關閉後回撥 * @return 服務端生成的消費者標識 */ channel.basicConsume(QUEUE_NAME, deliverCallback, consumerTag -> {}, (consumerTag, sig) -> {}); 4.String basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) /** * queue:佇列名 * deliverCallback: 當一個訊息傳送過來後的回撥介面 * shutdownSignalCallback: 當channel/connection 關閉後回撥 * @return 服務端生成的消費者標識 */ channel.basicConsume(QUEUE_NAME, deliverCallback, (consumerTag, sig) -> {}); 5.String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) /** * queue:佇列名 * deliverCallback: 當一個訊息傳送過來後的回撥介面 * cancelCallback:當一個消費者取消訂閱時的回撥介面;取消消費者訂閱佇列時除了使用{@link Channel#basicCancel}之外的所有方式都會呼叫該回調方法 * @return 服務端生成的消費者標識 */ channel.basicConsume(QUEUE_NAME, deliverCallback, consumerTag -> {}); 6.String basicConsume(String queue, Consumer callback) /** * queue:佇列名 * callback: 消費者物件的回撥介面 * @return 服務端生成的消費者標識 */ channel.basicConsume(QUEUE_NAME, new DefaultConsumer(channel){}); 7.String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) /** * 啟動一個消費者,並返回服務端生成的消費者標識 * queue:佇列名 * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器 * deliverCallback: 當一個訊息傳送過來後的回撥介面 * shutdownSignalCallback: 當channel/connection 關閉後回撥 * @return 服務端生成的消費者標識 */ channel.basicConsume(QUEUE_NAME, true, deliverCallback, (consumerTag, sig)->{ //consumerTag服務端生成的消費者標識 //sig(ShutdownSignalException):說明關閉的原因 }); 8.String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) /** * 啟動一個消費者,並返回服務端生成的消費者標識 * queue:佇列名 * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器 * deliverCallback: 當一個訊息傳送過來後的回撥介面 * cancelCallback:當一個消費者取消訂閱時的回撥介面;取消消費者訂閱佇列時除了使用{@link Channel#basicCancel}之外的所有方式都會呼叫該回調方法 * shutdownSignalCallback: 當channel/connection 關閉後回撥 * @return 服務端生成的消費者標識 */ channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag->{ //consumerTag:服務端小費制標識 }, (consumerTag, sig) -> { //consumerTag服務端生成的消費者標識 //sig(ShutdownSignalException):說明關閉的原因 }); 9.String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) /** * 啟動一個消費者,並返回服務端生成的消費者標識 * queue:佇列名 * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器 * arguments: 消費的一組引數 * callback:消費者物件介面 * @return 服務端生成的消費者標識 */ channel.basicConsume(QUEUE_NAME, true, arguments, new DefaultConsumer(channel){ //根據需要實現對應的方法 }); 10.String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) /** * 啟動一個消費者,並返回服務端生成的消費者標識 * queue:佇列名 * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器 * arguments: 消費的一組引數 * deliverCallback: 當一個訊息傳送過來後的回撥介面 * cancelCallback:當一個消費者取消訂閱時的回撥介面;取消消費者訂閱佇列時除了使用{@link Channel#basicCancel}之外的所有方式都會呼叫該回調方法 * @return 服務端生成的消費者標識 */ channel.basicConsume(QUEUE_NAME, true, arguments, deliverCallback, consumerTag -> {}); 11.String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) /** * 啟動一個消費者,並返回服務端生成的消費者標識 * queue:佇列名 * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器 * arguments: 消費的一組引數 * deliverCallback: 當一個訊息傳送過來後的回撥介面 * shutdownSignalCallback: 當channel/connection 關閉後回撥 * @return 服務端生成的消費者標識 */ channel.basicConsume(QUEUE_NAME, true, arguments, deliverCallback, (consumerTag, sig) -> {}); 12.String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) /** * 啟動一個消費者,並返回服務端生成的消費者標識 * queue:佇列名 * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器 * arguments: 消費的一組引數 * deliverCallback: 當一個訊息傳送過來後的回撥介面 * cancelCallback:當一個消費者取消訂閱時的回撥介面;取消消費者訂閱佇列時除了使用{@link Channel#basicCancel}之外的所有方式都會呼叫該回調方法 * shutdownSignalCallback: 當channel/connection 關閉後回撥 * @return 服務端生成的消費者標識 */ channel.basicConsume(QUEUE_NAME, true, arguments, deliverCallback, consumerTag -> {}, (consumerTag, sig) -> {}); 13.String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) /** * 啟動一個消費者,並返回服務端生成的消費者標識 * queue:佇列名 * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器 * consumerTag: 客戶端生成的用於建立上線文的使用者標識 * callback:消費者物件介面 * @return 與消費者關聯的消費者標識 */ channel.basicConsume(QUEUE_NAME, true, ctag, new DefaultConsumer(channel){ //根據需要實現具體的方法 }); 14.String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) /** * 啟動一個消費者,並返回服務端生成的消費者標識 * queue:佇列名 * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器 * consumerTag:客戶端生成的一個消費者標識 * deliverCallback: 當一個訊息傳送過來後的回撥介面 * cancelCallback:當一個消費者取消訂閱時的回撥介面;取消消費者訂閱佇列時除了使用{@link Channel#basicCancel}之外的所有方式都會呼叫該回調方法 * @return 與消費者關聯的消費者標識 */ channel.basicConsume(QUEUE_NAME, true, ctag, deliverCallback, consumerTag -> {}); 15.String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) /** * 啟動一個消費者,並返回服務端生成的消費者標識 * queue:佇列名 * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器 * consumerTag:客戶端生成的一個消費者標識 * deliverCallback: 當一個訊息傳送過來後的回撥介面 * shutdownSignalCallback: 當channel/connection 關閉後回撥 * @return 與消費者關聯的消費者標識 */ channel.basicConsume(QUEUE_NAME, true, ctag, deliverCallback, (consumerTag, sig) -> {}); 16.String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) /** * 啟動一個消費者,並返回服務端生成的消費者標識 * queue:佇列名 * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器 * consumerTag:客戶端生成的一個消費者標識 * deliverCallback: 當一個訊息傳送過來後的回撥介面 * cancelCallback:當一個消費者取消訂閱時的回撥介面;取消消費者訂閱佇列時除了使用{@link Channel#basicCancel}之外的所有方式都會呼叫該回調方法 * shutdownSignalCallback: 當channel/connection 關閉後回撥 * @return 與消費者關聯的消費者標識 */ channel.basicConsume(QUEUE_NAME, true, ctag, deliverCallback, consumerTag -> {}, (consumerTag, sig) -> {}); 17.String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) /** * queue:佇列名 * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器 * consumerTag:客戶端生成的一個消費者標識 * nolocal:如果伺服器不應將在此通道連線上釋出的訊息傳遞給此使用者,則為true;請注意RabbitMQ伺服器上不支援此標記 * exclusive: 如果是單個消費者,則為true * callback:消費者物件介面 * @return 與消費者關聯的消費者標識 */ channel.basicConsume(QUEUE_NAME, true, ctag, false, false, arguments, new DefaultConsumer(channel){ //根據需求實現方法 }); 18.String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) /** * queue:佇列名 * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器 * consumerTag:客戶端生成的一個消費者標識 * nolocal:如果伺服器不應將在此通道連線上釋出的訊息傳遞給此使用者,則為true;請注意RabbitMQ伺服器上不支援此標記 * exclusive: 如果是單個消費者,則為true * arguments:消費的一組引數 * deliverCallback: 當一個訊息傳送過來後的回撥介面 * cancelCallback:當一個消費者取消訂閱時的回撥介面;取消消費者訂閱佇列時除了使用{@link Channel#basicCancel}之外的所有方式都會呼叫該回調方法 * @return 與消費者關聯的消費者標識 */ channel.basicConsume(QUEUE_NAME, true, ctag, false, false, arguments, deliverCallback, consumerTag -> {}); 19.String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) /** * queue:佇列名 * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器 * consumerTag:客戶端生成的一個消費者標識 * nolocal:如果伺服器不應將在此通道連線上釋出的訊息傳遞給此使用者,則為true;請注意RabbitMQ伺服器上不支援此標記 * exclusive: 如果是單個消費者,則為true * arguments:消費的一組引數 * deliverCallback: 當一個訊息傳送過來後的回撥介面 * shutdownSignalCallback: 當channel/connection 關閉後回撥 */ channel.basicConsume(QUEUE_NAME, true, ctag, false, false, arguments, deliverCallback, (consumerTag, sig) -> {}); 20.String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) /** * queue:佇列名 * autoAck:true 接收到傳遞過來的訊息後acknowledged(應答伺服器),false 接收到訊息後不應答伺服器 * consumerTag:客戶端生成的一個消費者標識 * nolocal:如果伺服器不應將在此通道連線上釋出的訊息傳遞給此使用者,則為true;請注意RabbitMQ伺服器上不支援此標記 * exclusive: 如果是單個消費者,則為true * arguments:消費的一組引數 * deliverCallback: 當一個訊息傳送過來後的回撥介面 * cancelCallback:當一個消費者取消訂閱時的回撥介面;取消消費者訂閱佇列時除了使用{@link Channel#basicCancel}之外的所有方式都會呼叫該回調方法 * shutdownSignalCallback: 當channel/connection 關閉後回撥 */ channel.basicConsume(QUEUE_NAME, true, ctag, false, false, arguments, deliverCallback, consumerTag -> {}, (consumerTag, sig) -> {});
basicGet:主動拉取佇列中的一條訊息
GetResponse basicGet(String queue, boolean autoAck) /** * 從訊息佇列中取出第一條訊息;整個方法的執行過程是首先消費佇列,然後檢索第一條訊息,然後再取消訂閱 */ GetResponse response = channel.basicGet(QUEUE_NAME, true); System.out.println("消費者接收到的訊息是:"+new String(response.getBody(), "UTF-8"));
取消消費者訂閱
/**
* 取消消費者對佇列的訂閱關係
* consumerTag:伺服器端生成的消費者標識
**/
void basicCancel(String consumerTag)
basicQoc設定服務端每次傳送給消費者的訊息數量
/** * prefetchSize:伺服器傳送最大內容量(以八位位元組計算),如果沒有限制,則為0 * prefetchCount:伺服器每次傳遞的最大訊息數,如果沒有限制,則為0; * global:如果為true,則當前設定將會應用於整個Channel(頻道) **/ void basicQos(int prefetchSize, int prefetchCount, boolean global) /** * prefetchCount:伺服器每次傳遞的最大訊息數,如果沒有限制,則為0; * global:如果為true,則當前設定將會應用於整個Channel(頻道) **/ void basicQos(int prefetchCount, boolean global) /** * prefetchCount:伺服器每次傳遞的最大訊息數,如果沒有限制,則為0; **/ void basicQos(int prefetchCount)
Ack(確認)收到一個或者多個訊息
/**
* 消費者確認收到一個或者多個訊息
* deliveryTag:伺服器端向消費者推送訊息,訊息會攜帶一個deliveryTag引數,也可以成此引數為訊息 * 的唯一標識,是一個遞增的正整數
* multiple:true表示確認所有訊息,包括訊息唯一標識小於等於deliveryTag的訊息,false只確認 * * deliveryTag指定的訊息
**/
void basicAck(long deliveryTag, boolean multiple)
/**
* 要求代理重新發送未確認的訊息
* requeue:如果為true,訊息將會重新入隊,可能會被髮送給其它的消費者;如果為false,訊息將會發送給* 相同的消費者
**/
Basic.RecoverOk basicRecover(boolean requeue)
/**
* 要求代理重新發送未確認的訊息;訊息將會重新排隊,並且可能會發送給其它的消費者
**/
Basic.RecoverOk basicRecover()
拒絕訊息
/**
* 拒絕接收到的一個或者多個訊息
* deliveryTag:接收到訊息的唯一標識
* multiple: true表示拒絕所有的訊息,包括提供的deliveryTag;false表示僅拒絕提供的deliveryTag
* requeue:true 表示拒絕的訊息應重新入隊,而不是否丟棄
*/
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
/**
* 拒絕接收到的一個或者多個訊息
* deliveryTag:接收到訊息的唯一標識
* requeue:true 表示拒絕的訊息應重新入隊,而不是否丟棄
*/
void basicReject(long deliveryTag, boolean requeue)
傳送訊息
/**
* exchange:要將訊息傳送到的Exchange(交換器)
* routingKey:路由Key
* mandatory:true 如果mandatory標記被設定
* immediate: true 如果immediate標記被設定,注意:RabbitMQ服務端不支援此標記
* props:其它的一些屬性,如:{@link MessageProperties.PERSISTENT_TEXT_PLAIN}
* body:訊息內容
**/
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
/**
* 釋出訊息
* 釋出到不存在的交換器將導致通道級協議異常,該協議關閉通道,
* exchange: 要將訊息傳送到的交換器
* routingKey: 路由KEY
* mandatory:true 如果mandatory標記被設定
* props: 訊息的其它屬性,如:路由頭等
* body: 訊息體
*/
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
/**
* 釋出訊息
* 釋出到不存在的交換器將導致通道級協議異常,該協議關閉通道,
* exchange: 要將訊息傳送到的交換器
* routingKey: 路由KEY
* props: 訊息的其它屬性,如:路由頭等
* body: 訊息體
*/
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)