RabbitMQ(3) Java客戶端使用
RabbitMQ針對不同的開發語言(java,python,c/++,Go等等),提供了豐富對客戶端,方便使用。就Java而言,可供使用的客戶端有RabbitMQ Java client、
RabbitMQ JMS client、apache的camel-rabbitmq、以及Banyan等。在Spring中,也可以使用Spring AMQP、Spring Cloud Data Flow方便對集成RabbitMQ。
實際開發使用中,RabbitMQ Java client和Spring AMQP比較常用。RabbitMQ Java client在使用上更加接近AMQP協議,Spring AMQP則更加方便Spring項目中的集成。本為總結RabbitMQ Java client的使用。
Java客戶端概覽
RabbitMQ的Java客戶端包為amqp-client-{version}.jar,可以從RabbitMQ官網下載後引入項目中。
對於Maven工程,pom.xml中加入以下依賴即可引入RabbitMQ的Java客戶端:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.2.0</version>
</dependency>
amqp-client客戶端主要包含以下包:
包 | 說明 |
---|---|
com.rabbitmq.client | 客戶端api定義,提供了接口和類定義AMQP中connection,channel,queue,exchange等核心對象 |
com.rabbitmq.client.impl | 客戶端具體實現 |
com.rabbitmq.client.impl.nio | 客戶端nio實現 |
com.rabbitmq.client.impl.recovery | |
com.rabbitmq.tool.json | 對象json工具類 |
com.rabbitmq.tool.jsonrpc | 基於AMQP等json-rpc支持 |
com.rabbitmq.util | 客戶端中使用等工具累 |
對普通用戶而言,一般只需關註com.rabbitmq.client包,其中定了AMQP協議中對基礎對象,包含以下主要接口和類:
- Channel: AMQP 0-9-1 Channel對象,表示一個連接通道,提供了大多數AMQP操作,如創建隊列、交換器、綁定等
- Connection: AMQP 0-9-1 Connection,表示一個客戶端連接
- ConnectionFactory: Connectiong工廠
- Consumer: 消息消費者接口
- DefaultConsumer: 消費者接口默認實現
- BasicProperties: 消息屬性對象,用於發送消息時設置消息屬性
- BasicProperties.Builder: BasicProperties構建器
使用ConnectionFactory創建出Connection對象,在使用Connection對象創建一個Channel,在Channel上即可完成基本的發送消息,消費消息等AMQP操作;
發送消息時,可通過BasicProperties設置消息屬性;可以通過實現Consumer接口或繼承DefaultConsumer類實現一個消費者來消費消息。總之,通過以上對象,
即可完成基本的消息從生產到消費的全流程。
連接到RabbitMQ
通過ConnectionFactory工廠方法,設置連接屬性,生成Connection對象,建立客戶端到RabbitMQ的連接。在Connection上創建Channel,建立一個連接通道。
Channel不是線程安全的,在實際使用中,應該通過Connection為每個線程創建獨立的Channel。
#設置連接屬性。未設置時使用默認值:使用默認賬號guest連接到localhost到默認vhost "/"
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
#生成Connection & Channel
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
也可以通過設置URI的方式來建立連接:
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri("amqp://username:password@hostname:port/vhost");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
Channel接口上定義AMQP協議幾乎所有的操作。建立好到RabbitMQ到連接後,就可以在Channel對象上執行AMQP的操作,如聲明隊列、交換器、綁定等。
操作exchange、queue和binding
queue操作
聲明隊列
Channel定義來以下三組方法來聲明隊列:
普通的queueDeclare方,有兩個重載版本
Queue.DeclareOk queueDeclare() Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
第一個不帶參數的queueDeclare()方法聲明一個隊列,隊列名稱由rabbitMQ自動生成,該隊列事非持久的、排他的、自動刪除的;
第二個方法聲明隊列,可以指定用戶設定的隊列屬性和參數,是最常用的方法。其中各個參數含義如下:- queue: 隊列名稱
- durable: 隊列是否持久話。持久化以為著隊列可以從RabbitMQ重啟中恢復。
- exclusive: 排他性。排他性的隊列只對首次聲明它的連接(Connection而不是Channel)可見,並將在連接斷開是自動刪除隊列。排他性的隊列被首次聲明後,
其他連接是不允許創建同名隊列的,這種類型的隊列使用場景很有限。 - autoDelete: 隊列是否自動刪除。自動刪除的前提是,至少有一個消費者連接到該隊列,而後由斷開來連接,隊列沒有任何消費者時,隊列被自動刪除。
- queueDeclareNoWait方法
void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
使用queueDeclareNoWait方法聲明隊列時,不等待服務端到響應,直接返回。這種情況下,聲明完隊列後立即使用可能引發異常。 queueDeclarePassive方法
Queue.DeclareOk queueDeclarePassive(String queue)
最後一個queueDeclarePassive方法不是真正對聲明隊列,而只是檢查隊列是否存在,如果隊列存在則正常返回;否則會拋出一個異常,並且執行該操作對Channel不再
可用,後續應該創建新的Channel對象使用。
刪除隊列
刪除隊列有三個方法:
- 直接刪除
Queue.DeleteOk queueDelete(String queue) throws IOException;
該方法會直接刪除掉指定的隊列,而不隊列對狀態,如對是否正在使用、隊列中是否還有數據等。 - 按需刪除
Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException; void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
指定ifUnused為true,則只有當隊列未使用是才會被刪除;指定ifEmpty,則只有當隊列為空,裏面沒數據是才會被刪除。 - 清空隊列
queuePurge不刪除隊列,而是清空隊列中數據。
Queue.PurgeOk queuePurge(String queue) throws IOException;
exchange操作
聲明exchange
聲明exchange的方法也分為三組:
普通的exchangeDeclare方法
各個參數含義如下:Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException; Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type) throws IOException; Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException; Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable) throws IOException; Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException; Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException; Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;
- exchange: 交換器的名稱
- type/BuiltinExchangeType: 交換器的類型
- durable: 是否持久化。持久化的交換器會從RabbitMQ服務重啟中恢復,而不用重新創建。
- autoDelete: 是否自動刪除。自動刪除的前提是,只是有一隊列或交換器與該交換器綁定,之後所有與該交換器綁定的隊列/交換器都進行來解綁。
- internal: 是否為內置交換器。內置交換器是不允許客戶端發送消息的。內置交換使用的場景是與其他交換器綁定(RabbitMQ擴展,非AMQP原生功能)
- arguments: 其他的結構化參數
exchangeDeclareNoWait方法
使用exchangeDeclareNoWait方法聲明exchange,方法調用不等待服務端的響應,直接返回,各個參數含義與上面相同。所以聲明exchange後立即使用,很可能引發異常。void exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; void exchangeDeclareNoWait(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;
exchangeDeclarePassive方法
與queueDeclarePassive方法類似,exchangeDeclarePassive用來檢查exchange是否存在,而不會創建exchange。Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;
刪除exchange
與刪除隊列類似,可以直接刪除交換器或是按需刪除。- 直接刪除
Exchange.DeleteOk exchangeDelete(String exchange) throws IOException;
按需刪除
Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException; void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;
ifUnused設置為true時,只有當交換器未被使用時,才會被刪除。
binding操作
隊列與交換器綁定/解綁
把隊列和交換器綁定起來,使用queueBind方法:
Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
被綁定的隊列和交換器的名稱分別由參數queue和exchange指定,綁定的key通過參數routingKey指定。arguments指定了其他的結構化參數。
對應於綁定方法,有如下兩個解綁方法:
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
交換器與交換器綁定/解綁
作為RabbitMQ對AMQP的擴展功能,交換器允許和別的交換器綁定起來。這和隊列與交換器對綁定在使用上沒有太大對不同。可以將交換器E1和E2綁定起來,在將E2和隊列
Q綁定起來,之後生產者向E1發送消息,消息在E1上被路由到E2,再由E2路由到Q。
綁定交換器和其他交換器使用如下方法:
Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;
Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
destination和source參數分別指定目的交換器和源交換器。
同樣,通過以下方法解綁:
Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey) throws IOException;
Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
void exchangeUnbindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
何時需要創建隊列、交換器以及它們之間的綁定關系呢?這取決與應用程序。在一些復雜應用中,可能需要管理員事先規劃好交換器、隊列以及綁定關系,並在服務隊創建好
這些資源,客戶端直接去使用即可,這樣便於統一規劃,防止客戶端隨意創建資源造成錯誤或是資源浪費;在其他一些應用中,可能由客戶端自己創建這些資源並使用。總之,
這取決於應用本身的規劃。
發送消息
在channel對象上,使用basicPublish方法,將消息發送到指定的交換器。basicPublish有三個重載方法:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
發送消息是,指定的各個參數含義如下:
- exchange: exchange名稱,指定消息發送到哪個交換器
- routingKey: 消息到路由鍵
- mandatory: 該參數決定消息到達exchange後無法路由時到處理方式。指定為true,當消息到達exchange時,如果根據路由鍵找不到匹配到隊列,該消息將被返回給
客戶端;否則,無法路由到消息被直接丟棄。 - immediate: 該參數設置為true,消息在從exchange路由到隊列時,如果隊列上消費者,消息將被投遞到隊列,否則將消息返回給生產者。不過,該參數從RabbitMQ 3.0版本開始已經廢棄掉。
- props: 指定消息到其他屬性,如header、deliveryMode、contentType等。
- body: 消息體
消費消息
在RabbitMQ中,消費者可以以推模式和拉模式兩種方式消費消息,分別對象basicConsume方法和basicGet方法。推模式中,由服務端主動推送消息到消費這,消費者在
會調方法中處理消息,然後響應服務端;拉模式中,消費者每次主動從隊列中拉起單條消息消費,然後響應服務端。
推模式
推模式使用basicConsume方法來消費消息,可以看到,RabbitMQ定義了basicConsume方法許多重載版本:
String basicConsume(String queue, Consumer callback) throws IOException;
String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
String basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
各個參數含義如下:
- queue: 指定隊列名稱,即消費哪個隊列裏的數據。
- autoAck: 是否自動進行消息確認。設置為true表示自動消息確認,消費者收到消息時,自動向服務端發送ack信號,服務端收到該信號,則認為消息已經被消費,將其從隊列中刪除。
設置為false值,需要消費者手動進行ack。 - consumerTag: 消費者標簽,用了區分多個消費者。
- noLocal: 設置為true,表示不能將同一個Connection中生成者發送的消息傳遞給這個Connection中的消費者。
- exclusive: 是否排他。
- arguments: 其他結構化參數。
- callback: 消費者會調函數,處理發送過來的消息。
- cancelCallback: 消費者取消訂閱時的回調方法。
- shutdownSignalCallback: channel或者connection關閉時的回調方法。
- 其他回調方法...
可見,消費者主要還是定義各個回調方法,然後調用basicConsume方法消費消息。最主要的回調參數還是callback,定了了消費者消費消息的主要流程,可以同實現
Consumer接口或繼承DefaultConsumer類來實現一個Consumer,如:
String QUEUE_NAME ="";
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,"myTag",
new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException{
long deliveryTag = envelope.getDeliveryTag();
process(body,consumerTag,envelope,properties) //消費消息
channel.basicAck(deliveryTag,false);
}
});
拉模式
拉模式只有basicGet一個方法:
GetResponse basicGet(String queue, boolean autoAck) throws IOException;
參數queue指定從哪個隊列中讀取消息;參數autoAck指定是否自動進行消息確認,設定為fasle時,需要消費者對消息消費後,主動向服務端發送ack消息。
GetResponse response = channel.basicGet(QUEUE_NAME, false);
handle(response) //消費消息
channel.basicAck(response.getEnvelope.getDeliveryTag,false);
拒絕消息
消費者可以拒絕消費投遞過來的消息,使用basicReject拒絕單條消息,使用basicNack可以具體多條消息。basicNack不是AMQP的標準操作,而是RabbitMQ的擴展。
void basicReject(long deliveryTag, boolean requeue) throws IOException;
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
各個參數含義:
- deliveryTag: 消息的編號
- requeue: 消費者拒絕後,服務端是否將消息重新入隊
- multiple: basicNack,multiple參數設置為true時,表明一次性拒絕多條消息:deliveryTag編號之前的所有未被消費的消息都被拒絕。
一個完整的示例
我們將實現一個簡單的告警處理系統。告警分為io告警和file告警兩種類型,告警級別有error和warning兩種。
整體結構如下圖所示
生產者將告警發送到topic類型的交換器中,交換器更具告警的級別將告警路由分別路由到ErrorQueue和WarningQueue中。消費者ErrorConsumer訂閱ErrorQueue
消費error級別的告警消息;IOWarningConsumer訂閱WarningQueue,處理warning級別到告警,正如其名稱,IOWarningConsumer只對io類型到告警消費,其他類型
的告警reject掉。
實現如下:
Producer: 隨機生產各自類型、級別的告警,並發送到交換器
public class Producer { public static String EXCHANEG = "alert_exchange"; public static String WARNING_QUEUE = "waring_queue"; public static String ERROR_QUEUE = "error_queue"; public static void main(String[] args) throws IOException, TimeoutException { //get a default connection and create a channel ConnectionFactory cf = new ConnectionFactory(); Connection connection = cf.newConnection(); final Channel channel = connection.createChannel(); //declare exchange,queues and bindings channel.exchangeDeclare(EXCHANEG, BuiltinExchangeType.TOPIC); channel.queueDeclare(WARNING_QUEUE,true,false,false,null); channel.queueDeclare(ERROR_QUEUE,true,false,false,null); channel.queueBind(WARNING_QUEUE, EXCHANEG,"*.warning"); channel.queueBind(ERROR_QUEUE, EXCHANEG,"*.error"); while (true){ String key = randomRoutingKey(); String message = message(key); channel.basicPublish(EXCHANEG,key,null,message.getBytes()); System.out.println("Sent > " + message); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } public static String[] keys = new String[]{"io.error","io.warning","file.error","file.warning"}; public static String randomRoutingKey(){ Random random = new Random(); int rand = Math.abs(random.nextInt())%(keys.length); return keys[rand]; } public static String message(String key){ String message = "Alert ..." + " level:" + key; return message; } }
ErrorConsumer: 消費error級別到告警
public class ErrorConsumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory cf = new ConnectionFactory(); Connection connection = cf.newConnection(); final Channel channel = connection.createChannel(); boolean autoAck = false; channel.basicConsume(Producer.ERROR_QUEUE,false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{ String message = new String(body); //consume message System.out.println("consumed. " + message); channel.basicAck(envelope.getDeliveryTag(),false); } }); } }
IOWarningConsumer: 消費io類型、warning級別的告警。
public class IOWarningConsumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory cf = new ConnectionFactory(); Connection connection = cf.newConnection(); final Channel channel = connection.createChannel(); boolean autoAck = false; channel.basicConsume(Producer.WARNING_QUEUE,autoAck,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{ String key = envelope.getRoutingKey(); String message = new String(body); long deliveryTag = envelope.getDeliveryTag(); if (key.startsWith("io")) { // only consumer io warning messages //consume message System.out.println("consumed: " + message); channel.basicAck(deliveryTag,false); } else { //reject other messages and requeue them System.out.println("rejected: " + message); channel.basicReject(deliveryTag,true); } } }); } }
分別啟動Producer、ErrorConsumer和IOWarningConsumer。 從輸出結果中可以看到:
Producer每隔一秒鐘隨機生產一條消息:
Sent > Alert ... level:io.error Sent > Alert ... level:file.warning Sent > Alert ... level:file.error Sent > Alert ... level:io.warning Sent > Alert ... level:file.warning Sent > Alert ... level:io.error Sent > Alert ... level:file.error Sent > Alert ... level:file.error Sent > Alert ... level:io.warning Sent > Alert ... level:file.warning Sent > Alert ... level:file.error
ErrorConsumer處理了所有error級別的消息:
consumed. Alert ... level:io.error consumed. Alert ... level:file.error consumed. Alert ... level:io.error consumed. Alert ... level:file.error consumed. Alert ... level:file.error consumed. Alert ... level:file.error
IOWarningConsumer消費了io類型的warning消息,拒絕了其他消息。
rejected: Alert ... level:file.warning consumed: Alert ... level:io.warning rejected: Alert ... level:file.warning consumed: Alert ... level:io.warning rejected: Alert ... level:file.warning rejected: Alert... level:file.warning ...
在IOWrningConsumer,對於reject的消息,其requeue參數設置為true,這樣消息被拒絕後,在服務端將重新入隊,然後又推送給該消費者,一直循環。實際
應用中,應該避免這種情況到發生,此處只是為了演示reject的用法。
查看隊列狀態,可以發現warning_queue中始終存在未被確認的消息:$ rabbitmqctl list_queues name,durable,messages_ready,messages_unacknowledged,messages Timeout: 60.0 seconds ... Listing queues for vhost / ... error_queue true 0 0 0 task_queue true 0 0 0 waring_queue true 19 0 19
總結
使用java客戶端,創建ConnectionFactory對象,設置連接屬性後,創建已經到服務到Connection,Connection代表一個Tcp連接,可以在客戶端復用,復用方式
為使用Connection創建Channel。可以根據規劃事先在服務端創建好交換器、隊列等資源,也可以在Channel上聲明這些資源,聲明資源時,如果資源不存在則會創建。
一切資源就緒,生產者在Channel使用basicPublish發送消息到指定到交換器,消費這訂閱指定的隊列,定義消費消息的回調函數,在Channel上使用basicConsume消費消息。
RabbitMQ(3) Java客戶端使用