rabbitmq消費訊息的兩種方式
rabbitMQ中consumer通過建立到queue的連線,建立channel物件,通過channel通道獲取message,
Consumer可以宣告式的以API輪詢poll的方式主動從queue的獲取訊息,也可以通過訂閱的方式被動的從Queue中消費訊息,
最近翻閱了基於java的客戶端的相關原始碼,簡單做個分析。
程式設計模型虛擬碼如下:
ConnectionFactory factory = new ConnectionFactory();
Connection conn = factory.newConnection();
Channel channel=conn.createChannel();
建立Connection需要指定MQ的實體地址和埠,是socket tcp物理連線,而channel是一個邏輯的概念,支援在tcp連線上建立多個MQ channel
以下是基於channel上的兩種消費方式。
1、Subscribe訂閱方式
boolean autoAck = false; channel.basicConsume(queueName, autoAck, "myConsumerTag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.contentType; long deliveryTag = envelope.getDeliveryTag(); // (process the message components here ...) channel.basicAck(deliveryTag, false); } });
訂閱方式其實是向queue註冊consumer,通過rpc向queue server傳送註冊consumer的訊息,rabbitMQ Server在收到訊息後,根據訊息的內容型別判斷這是一個訂閱訊息,
這樣當MQ 中queue有訊息時,會自動把訊息通過該socket(長連線)通道傳送出去。
參見ChannelN中的方法
public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, final Consumer callback) throws IOException { ...... rpc((Method) new Basic.Consume.Builder() .queue(queue) .consumerTag(consumerTag) .noLocal(noLocal) .noAck(autoAck) .exclusive(exclusive) .arguments(arguments) .build(), k); try { return k.getReply(); } catch(ShutdownSignalException ex) { throw wrap(ex); } }
Consumer接收訊息的過程:
建立Connection後,會啟動MainLoop後臺執行緒,迴圈從socket(FrameHandler)中獲取資料包(Frame),呼叫channel.handleFrame(Frame frame)處理訊息,
public void handleFrame(Frame frame) throws IOException {
AMQCommand command = _command;
if (command.handleFrame(frame)) { // 對訊息進行協議assemble
_command = new AMQCommand(); // prepare for the next one
handleCompleteInboundCommand(command);//對訊息消費處理
}
}
ChannelN.handleCompleteInboundCommand
---ChannelN.processAsync
----dispatcher.handleDelivery
---QueueingConsumer.handleDelivery
---this._queue.add(new Delivery(envelope, properties, body));//訊息最終放到佇列中
每個Consumer都有一個BlockQueue,用於快取從socket中獲取的訊息。
接下來,Consumer物件就可以呼叫api來從客戶端快取的_queue中依次獲取訊息,進行消費,參見QueueingConsumer.nextDelivery()
對於這種長連線的方式,沒看到心跳功能,以防止長連線的因網路等原因連線失效
2、poll API方式
ChannelN:
GetResponse basicGet(String queue, boolean autoAck)
這種方式比較簡單,直接通過RPC從MQ Server端獲取佇列中的訊息
關注公眾號: