1. 程式人生 > >RabbitMQ Java客戶端API指南

RabbitMQ Java客戶端API指南

本指導覆蓋了Java客戶端api,它不僅僅是一個教程,在不同的部分都是可用的。
此java客戶端得到了下面三方的許可

  • Apache Public License 2.0
  • Mozilla Public License
  • GPL 2.0

概要

RabbitMQ java客戶端使用 com.rabbitmq.client 作為最基礎的包。主要的類和介面定義如下:

Channel(通道)
Connection
ConnectionFactory
Consumer

Channel介面主要用於協議的操作。Connection用於開啟Channels、註冊connection生命週期事件的管理者以及用來關閉不再使用的connections。ConnectionFactory用於例項化Connection,你也可以用它配置連線的各種設定,比如虛擬主機和使用者名稱。

Connections and Channels

核心的api類是Connections 和Channels,分別代表一個AMQP 0-9-1協議的連線和一個通道,一般在使用他們之前你要像下面這樣引用:

import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel;

一、連線主機
下面的程式碼將使用給定的引數一個主機(主機名、埠號等):

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();

二、如果你在本地使用RabbitMQ伺服器,上面的這些引數都有合理的預設值。
另外,你也可以使用URIs達到同樣的功能

ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:[email protected]:portNumber/virtualHost");
Connection conn = factory.newConnection();

三、如果你在本地使用一個stock的RabbitMQ伺服器(不懂stock是什麼意思),上面的這些引數都有合理的預設值。
Connection 介面在建立完成後,可以用來開啟一個通道.

Channel channel = conn.createChannel();

四、這個通道可以被用來發送和接收訊息,如下面的章節描述。
如果你要斷開連線(Connection ),只需關閉通道和連線即可。

channel.close();
conn.close();

注意: 關閉通道是一個好的習慣,但是不是必需的。當你關閉連線時,通道會自動關閉的。

五、使用交換和佇列(Using Exchanges and Queues)
客戶端應用使用交換機和佇列,這是使用AMQP協議的最高層次。他們必須在使用前宣告。 宣告任一型別的物件只是確保該物件的存在,如果有必要建立它吧。

channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey); 

這將顯式的宣告以下物件,這兩者都可以通過使用額外的引數來建立。在這裡,他們都沒有特殊的引數。

a durable, non-autodelete exchange of “direct” type 
a non-durable, exclusive, autodelete queue with a generated name

上述的queueBind函式用於繫結交換機和給定的路由關鍵字(routingKey)。

六、注意:上面是一個典型的宣告一個佇列的方式,當只有一個客戶端使用它時。它不必使用一個公開的名字,沒有其它客戶可以使用它(獨佔模式),並且將被自動清理(自動刪除);如果幾個客戶端想要共享一個佇列,必須使用下面的程式碼:

channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

這將會顯式的宣告:

a durable, non-autodelete exchange of “direct” type 
a durable, non-exclusive, non-autodelete queue with a well-known name

注意:所有的關於通道的API方法可以被過載。為了使用方便,exchangeDeclare方法、queueDeclare方法和queueBind方法都提供預設值。如果有必要你可以使用更多的引數去過載他們,這給你提供了跟多的選擇。
這種模式貫穿整個客戶端API.

Publishing messages(釋出訊息)

七、資訊的交換,使用channel.basicpublish如下:

byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

八、對於一個控制,可以使用過載的變體來指定強制標誌,或傳送具有預先設定的訊息屬性的訊息:

channel.basicPublish(exchangeName, routingKey, mandatory,
                     MessageProperties.PERSISTENT_TEXT_PLAIN,
                     messageBodyBytes);

九、以傳遞模式2(永續性)、優先順序1和內容型別“文字/純”傳送訊息。可以使用一個生成器類來建立自己的訊息屬性物件,如您所喜歡的屬性,例如:

channel.basicPublish(exchangeName, routingKey,
             new AMQP.BasicProperties.Builder()
               .contentType("text/plain")
               .deliveryMode(2)
               .priority(1)
               .userId("bob")
               .build()),
               messageBodyBytes);

十、此示例使用自定義標頭髮布訊息:

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("latitude",  51.5252949);
headers.put("longitude", -0.0905493);

channel.basicPublish(exchangeName, routingKey,
             new AMQP.BasicProperties.Builder()
               .headers(headers)
               .build()),
               messageBodyBytes);

十一、此示例釋出過期訊息:

channel.basicPublish(exchangeName, routingKey,
             new AMQP.BasicProperties.Builder()
               .expiration("60000")
               .build()),
               messageBodyBytes);

十二、通道和併發考慮(執行緒安全模式)

通道例項不能再執行緒間共享。應用應該對每一個執行緒使用一個通道,而不是線上程間共享同一通道。渠道的操作有些事是安全的併發呼叫,有些不是,將會會導致線路上的不正確的幀交錯。執行緒之間共享通道也將干擾確認釋出訊息者。

通過訂閱接收訊息

import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

接收訊息最有效的方式是通過Consumer介面設定一個訂閱。那麼訊息將會自定的送達,而不是明確的被請求。