RabbitMq六種使用模式(2)_多個消費者
上一篇文章中,一個佇列只有一個消費者,其實可以同時有多個消費者從同一佇列裡面取訊息,如何分配有rabbitmq伺服器決定;
程式碼基本上如上文一致,只是有多個consumer在監控著佇列,每個consumer獨立處理獲取的訊息;
1:訊息的確認機制
目前的程式碼,一旦consumer獲取到message,那麼這個message就立刻從queue裡面移除(自動的訊息接收確認);但是如果還沒有處理該message,worker被kill,那麼這個訊息就沒有被成功處理;此外,一個consumer可能同時收到了多個訊息,這些訊息也相當於丟失; 此時,需要使用訊息的手動確認機制,處理成功之後,通知rabbitmq伺服器將訊息刪除;如果沒有收到確認訊息,改訊息狀態變成unacked,不會刪除;如果rabbitmq重啟或者當前client連結失效或者當前worker失效,unacked的訊息會參與重新分配,有consumer重新處理;QueueingConsumer consumer = new QueueingConsumer(channel); boolean autoAck = false;//預設的是true,自動確認 channel.basicConsume("hello", autoAck, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //此時,consumer可能已經從rabbitmq獲得和多個訊息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //delivery.getEnvelope().getDeliveryTag()訊息的標識,false只確認當前一個訊息收到,true確認所有consumer獲得的訊息 }
2:訊息的持久化 目前程式碼,訊息仍然有可能丟失;如果rabbitmq伺服器掛掉,佇列和訊息都沒有被持久化;
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
//宣告該佇列需要持久化
此外訊息也需要被持久化,有可能rabbitmq收到了訊息但是還沒有放入佇列,伺服器掛了,此時訊息仍有可能丟失;
channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
3:公平的訊息分發 目前的訊息佇列,分發訊息時沒有考慮consumer的具體情況,有可能造成有的consumer負載過重,有的consumer負載太輕; 應該考慮consumer沒有確認訊息的數量,如果unacked的訊息過多,則應該少往此consumer傳送訊息;
int prefetchCount = ;//maximum number of messages that the server will deliver, 0 if unlimited
channel.basicQos(prefetchCount);//