RabbitMQ Java客戶端API指南
本指導覆蓋了Java客戶端api,它不僅僅是一個教程,在不同的部分都是可用的。
此java客戶端得到了下面三方的許可
- Apache Public License 2.0
- Mozilla Public License
- GPL 2.0
概要
RabbitMQ java客戶端使用 com.rabbitmq.client 作為最基礎的包。主要的類和介面定義如下:
Channel(通道)
Connection
ConnectionFactory
Consumer
Channel介面主要用於協議的操作。Connection用於開啟Channels、註冊connection生命週期事件的管理者以及用來關閉不再使用的connections。ConnectionFactory用於例項化Connection,你也可以用它配置連線的各種設定,比如虛擬主機和使用者名稱。
Connections and Channels
核心的api類是Connections 和Channels,分別代表一個AMQP 0-9-1協議的連線和一個通道,一般在使用他們之前你要像下面這樣引用:
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
一、連線主機
下面的程式碼將使用給定的引數一個主機(主機名、埠號等):
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); Connection conn = factory.newConnection();
二、如果你在本地使用RabbitMQ伺服器,上面的這些引數都有合理的預設值。
另外,你也可以使用URIs達到同樣的功能
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:[email protected]:portNumber/virtualHost");
Connection conn = factory.newConnection();
三、如果你在本地使用一個stock的RabbitMQ伺服器(不懂stock是什麼意思),上面的這些引數都有合理的預設值。
Connection 介面在建立完成後,可以用來開啟一個通道.
Channel channel = conn.createChannel();
四、這個通道可以被用來發送和接收訊息,如下面的章節描述。
如果你要斷開連線(Connection ),只需關閉通道和連線即可。
channel.close();
conn.close();
注意: 關閉通道是一個好的習慣,但是不是必需的。當你關閉連線時,通道會自動關閉的。
五、使用交換和佇列(Using Exchanges and Queues)
客戶端應用使用交換機和佇列,這是使用AMQP協議的最高層次。他們必須在使用前宣告。 宣告任一型別的物件只是確保該物件的存在,如果有必要建立它吧。
channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);
這將顯式的宣告以下物件,這兩者都可以通過使用額外的引數來建立。在這裡,他們都沒有特殊的引數。
a durable, non-autodelete exchange of “direct” type
a non-durable, exclusive, autodelete queue with a generated name
上述的queueBind函式用於繫結交換機和給定的路由關鍵字(routingKey)。
六、注意:上面是一個典型的宣告一個佇列的方式,當只有一個客戶端使用它時。它不必使用一個公開的名字,沒有其它客戶可以使用它(獨佔模式),並且將被自動清理(自動刪除);如果幾個客戶端想要共享一個佇列,必須使用下面的程式碼:
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
這將會顯式的宣告:
a durable, non-autodelete exchange of “direct” type
a durable, non-exclusive, non-autodelete queue with a well-known name
注意:所有的關於通道的API方法可以被過載。為了使用方便,exchangeDeclare方法、queueDeclare方法和queueBind方法都提供預設值。如果有必要你可以使用更多的引數去過載他們,這給你提供了跟多的選擇。
這種模式貫穿整個客戶端API.
Publishing messages(釋出訊息)
七、資訊的交換,使用channel.basicpublish如下:
byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
八、對於一個控制,可以使用過載的變體來指定強制標誌,或傳送具有預先設定的訊息屬性的訊息:
channel.basicPublish(exchangeName, routingKey, mandatory,
MessageProperties.PERSISTENT_TEXT_PLAIN,
messageBodyBytes);
九、以傳遞模式2(永續性)、優先順序1和內容型別“文字/純”傳送訊息。可以使用一個生成器類來建立自己的訊息屬性物件,如您所喜歡的屬性,例如:
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2)
.priority(1)
.userId("bob")
.build()),
messageBodyBytes);
十、此示例使用自定義標頭髮布訊息:
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("latitude", 51.5252949);
headers.put("longitude", -0.0905493);
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.headers(headers)
.build()),
messageBodyBytes);
十一、此示例釋出過期訊息:
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.expiration("60000")
.build()),
messageBodyBytes);
十二、通道和併發考慮(執行緒安全模式)
通道例項不能再執行緒間共享。應用應該對每一個執行緒使用一個通道,而不是線上程間共享同一通道。渠道的操作有些事是安全的併發呼叫,有些不是,將會會導致線路上的不正確的幀交錯。執行緒之間共享通道也將干擾確認釋出訊息者。
通過訂閱接收訊息
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
接收訊息最有效的方式是通過Consumer介面設定一個訂閱。那麼訊息將會自定的送達,而不是明確的被請求。