1. 程式人生 > >《RabbitMQ開發庫的完整API文件》翻譯

《RabbitMQ開發庫的完整API文件》翻譯

背景

我的應用需要實時的接收RabbitMQ推送過來的訊息,故找了對應的文件檢視用法。這裡特地翻譯出來方便自己和諸位博友日後學習分享。


譯文連結

Java Client API Guide


我的譯文


Java客戶端API指南

該指南涵蓋了RabbitMQ Java客戶端API。然而,這並不是一個教程。這些都可以在不同的章節中找到。

5.x版本系列的庫需要JDK 8,用於編譯和執行時。在Android上,這意味著只支援Android 7.0或更高的版本。4.x釋出系列支援JDK 6和Android7.0之前的版本。

該庫是開源的,並且有以下三種授權:

這意味著使用者可以考慮在上面的三種授權列表中的任何許可下使用該庫。例如,使用者可以選擇Apache Public License 2.0,並將該客戶端包含到一個商業產品中。在GPLv2下獲得許可的程式碼庫可以選擇GPLv2,等等。

API引用(JavaDoc)是單獨可用的。

還有一些命令列工具,這些工具曾經與Java客戶端一起使用。

客戶端API與AMQP 0-9-1協議規範進行了密切的建模,並提供了額外的抽象以方便使用。


概述

RabbitMQ Java客戶端使用 com.RabbitMQ.client 作為它的top-level package。關鍵類和介面是:

Channel
Connection
ConnectionFactory
Consumer

協議操作通過 Channel 介面可用。Connection 用於開啟channels,註冊連線生命週期事件處理程式,以及關閉不再需要的連線。Connections 通過ConnectionFactory例項化,這是您配置各種連線設定的方式,例如vhost或使用者名稱。


Connections and Channels

核心API類是Connection 和Channel,分別表示AMQP 0-9-1連線和通道。這些是使用前需要匯入的:

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

連線到一個代理

下面的程式碼使用給定的引數(主機名、埠號等)連線到AMQP代理。

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();

對於在本地執行的RabbitMQ伺服器,所有這些引數都有合理的預設值

或者,也可以使用uri:

ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:[email protected]:portNumber/virtualHost");
Connection conn = factory.newConnection();

所有這些引數對於執行在本地的RabbitMQ伺服器都有合理的預設值。

然後可以使用 Connection 介面開啟通道:

Channel channel = conn.createChannel();

該通道現在可以用於傳送和接收訊息,後續部分會有講解。

要斷開連線,只需關閉通道和連線:

channel.close();
conn.close();

請注意,關閉通道可能被認為是良好的習慣,但在這裡並不是絕對必要的,因為當底層連線關閉後,任何情況下通道都會自動的關閉


使用 Exchanges and Queues(佇列)

客戶端應用程式與交換機和佇列一起工作,即AMQP的高階構建塊。這些必須在使用之前被“宣告”。宣告任何型別的物件只會確保其中一個名稱存在,並在必要時建立它。

繼續之前的例子,下面的程式碼聲明瞭一個交換機和一個佇列,然後將它們繫結在一起。

channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);

這將主動宣告以下的物件,這兩個物件都可以通過使用額外的引數進行定製。這裡,物件有一些特殊的引數體現:

  1. 一個持久的、非自動刪除的“直接”型別的exchange(交換機)

  2. 有一個已知名稱的、非持久的、專有的、自動刪除的佇列

上面的函式呼叫然後用給定的路由鍵將佇列繫結到交換機。

注意,當只有一個客戶端希望使用它時,這將是宣告佇列的一種典型方式:它不需要一個已知的名稱,其他客戶端不能獨佔佇列,並且佇列會自動清理(自動刪除)。如果有幾個客戶端想要共享一個已知名稱的佇列,那麼這段程式碼將是所需要的:

channel.exchangeDeclare(exchangeName, "direct", true);

channel.queueDeclare(queueName, true, false, false, null);

channel.queueBind(queueName, exchangeName, routingKey);

這些會主動宣告:

  1. 一個持久的、非自動刪除的“直接”型別的exchange(交換機)

  2. 有一個已知名稱的、持久的、非專有的(這裡理解為可共享的)、非自動刪除的佇列

請注意,所有這些 Channel API方法都是過載的。對於exchangeDeclare, queueDeclare 和 queueBind 這些方便的簡寫形式都是使用了合理的預設。還有更多引數的更長的形式,可以讓您根據需要覆蓋這些預設值,以便在需要的時候提供絕對控制權。

這種“簡寫形式、長形式(這裡理解為帶更多引數的一種形式)”模式在客戶端的API中使用。


釋出訊息(Publishing messages)

要將訊息釋出到交換機中,請使用 Channel.basicPublish 如下:

byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

為了獲得良好的控制,您可以使用過載的變式來指定 mandatory 標誌,或者使用預先設定的訊息屬性來發送訊息:

channel.basicPublish(exchangeName, routingKey, mandatory,
                     MessageProperties.PERSISTENT_TEXT_PLAIN,
                     messageBodyBytes);

傳送一條訊息,mode是2(即持久化的),priority 為1,content-type 為 “text/plain”。您可以構建自己的訊息屬性物件,使用一個Builder類,您可以使用您喜歡的屬性,例如:

channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder().contentType("text/plain")
.deliveryMode(2)
.priority(1).userId("bob")
.build()),
messageBodyBytes);

舉個例子, 發一個帶自定義頭部的訊息:

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("latitude",  51.5252949);
headers.put("longitude", -0.0905493);

channel.basicPublish(exchangeName, routingKey,
             new AMQP.BasicProperties.Builder()
               .headers(headers)
               .build()),
               messageBodyBytes);

這個例子釋出了一個expiration的訊息:

channel.basicPublish(exchangeName, routingKey,
             new AMQP.BasicProperties.Builder()
               .expiration("60000")
               .build()),
               messageBodyBytes);

我們還沒有說明所有的可能性。

注意,BasicProperties是自動生成的holder類AMQP的內部類。

如果資源驅動的警報生效,那麼 Channel#basicPublish 的呼叫最終會被阻塞。


通道和併發性考慮事項(執行緒安全)

根據已有經驗,線上程之間共享 Channel 例項是可以避免的。應用程式應該選擇一個 Channel 對應一個執行緒,而不是在多個執行緒之間共享同一個 Channel 。

雖然通道上的一些操作是可以同時呼叫的,但有些操作是不安全的,並且會導致不正確的幀間交錯問題,雙確認問題等等。

共享通道上的併發釋出可能會導致連線上不正確的幀,從而觸發連線級別的協議異常和連線關閉。因此,它需要在應用程式程式碼中顯式同步( Channel#basicPublish 必須在關鍵部分中呼叫)。線上程之間共享通道也會干擾釋出者的確認。我們強烈建議在共享通道上避免併發釋出。

在共享通道時,在一個執行緒中消費,在另一個執行緒中釋出可以是安全的。

Server-pushed deliveries (見下面的部分)是同時發出的,保證每個通道的順序被保留。

對每個連線,分派機制使用java.util.concurrent.ExecutorService。可以提供一個自定義執行器,它將由一個ConnectionFactory的ConnectionFactory#setSharedExecutor 呼叫所產生的所有連線共享。

當使用手動確認時,重要的是要考慮哪些執行緒做了確認。如果它與接收到的執行緒不同(例如,Consumer#handleDelivery委託給不同的執行緒)將 multiple 引數設定為 true 是不安全的,並將導致雙重確認,因此會出現通道級協議異常,從而關閉通道。因此一次只確認一條資訊可以是安全的。


通過訂閱接收訊息(”Push API”)

import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

接收訊息的最有效方式是使用 Consumer 介面設定訂閱。訊息將在到達時自動傳送,而不用顯式地請求。在呼叫與Consumers有關的API方法時,個人訂閱總是與它們的消費者標籤相關聯。消費者標籤是消費者的唯一識別符號,它可以是客戶端,也可以是伺服器生成的。為了讓RabbitMQ生成一個節點範圍的惟一標記,使用 Channel#basicConsume 方法重寫,它不需要攜帶一個消費者標記引數,或者傳遞一個空字串給消費者標記,並使用 Channel#basicConsume 方法返回的值。消費者標籤可被用來取消消費者。

不同的消費者例項必須具有不同的消費者標記。對連線上的重複的消費者標籤是強烈反對的,當消費者被監控時,會導致自動連線恢復和令人困惑的監控資料的問題。

實現Consumer 的最簡單方法是例項化 DefaultConsumer 類。這個子類的一個物件可以通過basicConsume的呼叫來設定訂閱:

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
             String routingKey = envelope.getRoutingKey();
             String contentType = properties.getContentType();
             long deliveryTag = envelope.getDeliveryTag();
             // (這裡進行訊息元件的處理 ...)
             channel.basicAck(deliveryTag, false);
         }
     });

在這裡,因為我們指定了autoAck=false,有必要確認訊息傳遞給消費者,最方便的是在 handleDelivery 方法中完成,如上所示。

更復雜的Consumers需要覆蓋更多的方法。特別地,當通道和連線關閉時,handleShutdownSignal被呼叫,而handleConsumeOk方法會在任何其他回撥之前傳遞消費者標籤給Consumer。

消費者還可以實現handleCancelOk 和handleCancel 方法,分別通知顯式和隱式取消。

通過消費者標籤,你可以用Channel.basicCancel明確地取消一個特定的Consumer :channel.basicCancel(consumerTag);

就像釋出者一樣,考慮消費者的併發性風險也是很重要的。

對使用者的回撥被分派到執行緒池中,該執行緒池與例項化Channel的執行緒分開。這意味著消費者可以安全地呼叫Connection 或Channel上的阻塞方法,例如 Channel#queueDeclare 或者 Channel#basicCancel。

每個Channel 都有自己的分派執行緒,對於一個Channel 中一個Consumer 最常見的使用情況,這意味著Consumers不支援其他Consumers。如果一個通道都有多個消費者,那麼一個長時間執行的消費者可能會持有該通道上的其他消費者的回撥的引用。

關於併發性和併發性危害安全的其他主題,請參閱併發性考慮(執行緒安全)部分


恢復個人訊息(“Pull API”)

要顯式地檢索訊息,請使用Channel.basicGet。返回值是GetResponse的一個例項,從這個例項中可以提取頭資訊(屬性)和訊息體:

boolean autoAck = false;
GetResponse response = channel.basicGet(queueName, autoAck);
if (response == null) {
    // No message retrieved.
} else {
    AMQP.BasicProperties props = response.getProps();
    byte[] body = response.getBody();
    long deliveryTag = response.getEnvelope().getDeliveryTag();
    ...

而且由於設定了autoAck =false,您還必須呼叫Channel.basicAck 方法確認你確實已經成功地收到了這樣的資訊:

...
    channel.basicAck(method.deliveryTag, false); // 確認收到的訊息

處理 unroutable(無法傳送的) 訊息

如果訊息釋出時帶有“強制”標誌,但是不能被路由,代理將把它返回給傳送客戶端(通過AMQP.Basic.Return 命令)。

收到返回通知,客戶端可以實現ReturnListener 介面或者呼叫Channel.addReturnListener。如果客戶端沒有為特定的通道配置一個返回監聽,那麼相關的返回訊息將被預設丟棄。

channel.addReturnListener(new ReturnListener() {
    public void handleReturn(int replyCode,
                                  String replyText,
                                  String exchange,
                                  String routingKey,
                                  AMQP.BasicProperties properties,
                                  byte[] body)
    throws IOException {
        ...
    }
});

例如,如果客戶端將帶有“mandatory”標誌的訊息釋出到一個沒有繫結到佇列的“direct”型別的交換機上,則將呼叫一個返回監聽。


關閉協議

AMQP客戶端關閉的概述

AMQP 0-9-1連線和通道共享相同的通用方法來管理網路故障、內部故障和顯式的本地關閉。

AMQP 0-9-1連線和通道具有以下生命週期狀態:

 open: 物件已經準備好使用了          

 closing:該物件已被顯式地通知在本地關閉,並向任何支援底層物件的物件發出了關閉請求,並等待它們的關閉過程完成。         
  closed:該物件從任何較低級別的物件中接收了所有的關閉完成通知,結果已經關閉了它自己。     

這些物件總是處於關閉狀態,不管導致閉包的原因是什麼,比如應用程式請求、內部客戶機庫故障、遠端網路請求或網路故障。

AMQP連線和通道物件擁有以下與關閉相關的方法:

removeShutdownListener(ShutdownListener listener)
addShutdownListener(ShutdownListener listener)          

管理任何監聽器,當物件轉換到關閉狀態時將被觸發。注意,向已經關閉的物件新增一個ShutdownListener將立即觸發偵聽器

getCloseReason(),允許調查物件關閉的原因是什麼                

isOpen(),用於測試物件是否處於開放狀態              

close(int closeCode, String closeMessage), 顯式地通知物件關閉              

監聽器的簡單用法如下:

import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.ShutdownListener;

connection.addShutdownListener(new ShutdownListener() {
    public void shutdownCompleted(ShutdownSignalException cause)
    {
        ...
    }
});

關於關閉情況的一些資訊

可以檢索ShutdownSignalException,其中包含關於關閉原因的所有可用資訊,或者通過顯式呼叫getCloseReason()方法,或者使用ShutdownListener類的服務(ShutdownSignalException起因)方法中的原因引數。

ShutdownSignalException類提供了分析關閉原因的方法。通過呼叫isHardError()方法,我們可以獲得資訊,無論它是連線還是通道錯誤,而getReason()將返回關於該原因的資訊,這是一個AMQP方法,即AMQP.Channel.Close或AMQP.Connection.Close (如果原因在庫中是某個異常,將返null,比如網路通訊失敗,在這種情況下,可以用getCause()來檢索異常)。

public void shutdownCompleted(ShutdownSignalException cause)
{
  if (cause.isHardError())
  {
    Connection conn = (Connection)cause.getReference();
    if (!cause.isInitiatedByApplication())
    {
      Method reason = cause.getReason();
      ...
    }
    ...
  } else {
    Channel ch = (Channel)cause.getReference();
    ...
  }
}                      

isOpen()方法的原子性和使用

對於生產程式碼,不建議使用通道和連線物件的isOpen()方法,因為方法返回的值依賴於關閉原因。下面的程式碼說明了競態條件的可能性:

public void brokenMethod(Channel channel)
{
    if (channel.isOpen())
    {
        // The following code depends on the channel being in open state.
        // However there is a possibility of the change in the channel state
        // between isOpen() and basicQos(1) call
        ...
        channel.basicQos(1);
    }
}

相反,我們通常應該忽略這種檢查,並簡單地嘗試所需的操作。如果在程式碼執行期間,連線的通道被關閉,則會丟擲一個ShutdownSignalException,指示該物件處於無效狀態。我們還應該捕獲由SocketException引起的IOException,當代理關閉連線時,或者當代理啟動清理關閉時,或者出現ShutdownSignalException。

public void validMethod(Channel channel)
{
    try {
        ...
        channel.basicQos(1);
    } catch (ShutdownSignalException sse) {
        // possibly check if channel was closed
        // by the time we started action and reasons for
        // closing it
        ...
    } catch (IOException ioe) {
        // check why connection was closed
        ...
    }
}

高階連線選項

消費者執行緒池

在預設情況下,消費者執行緒(see Receivingbelow)將自動在一個新的ExecutorService執行緒池中分配 。如果需要在newConnection()方法中提供一個ExecutorService用以更好的控制,那麼就需要使用這個執行緒池下面是一個比通常分配的更大的執行緒池的例子:

ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);

Executors 和ExecutorService 類都在java.util.concurrent 包中。

當連線關閉時,預設的ExecutorService將會關閉,但是使用者提供的ExecutorService(如上面的es)不會關閉。提供定製的ExecutorService的客戶端必須確保它最終被關閉(通過呼叫它的shutdown() 方法),否則池的執行緒可能會阻止JVM終止。

相同的執行器服務可以在多個連線之間共享,也可以在重新連線上進行序列重用,但在shutdown()之後不能使用。

只有當有證據表明在處理消費者回調過程中存在嚴重的瓶頸時,才應該考慮使用該特性。如果沒有執行或很少的消費者回調,那麼預設的分配就足夠了。開銷最初是最小的,並且分配的執行緒資源是有界的,即使偶爾會出現消費活動的爆發。

使用主機列表

可以將一個Address 陣列傳遞給newConnection()。Address 只是com.rabbitmq.client 包中的一個方便類,帶有主機和埠元件。舉個列子:

Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1)
                                 , new Address(hostname2, portnumber2)};
Connection conn = factory.newConnection(addrArr);

如果它不能連線到hostname2:portnumber2,將嘗試連線到hostname1:portnumber1。直到地址陣列中的某個地址第一次成功連線之後返回。這完全等同於在工廠中反覆地設定主機和埠,每次呼叫factory.newconnection(),直到其中一個成功。

如果還提供了一個ExecutorService(使用factory.newConnection(es, addrArr)形式)執行緒池將與(第一個)成功連線相關聯。

如果您想要對主機進行更多的控制,請檢視服務發現的支援

使用AddressResolver介面的服務發現

在3.6.6版本中,可以讓AddressResolver的實現在建立連線時選擇在哪裡進行連線:

Connection conn = factory.newConnection(addressResolver);

AddressResolver介面如下:

public interface AddressResolver {

  List<Address> getAddresses() throws IOException;

}

就像一個主機列表一樣,返回的第一個地址將首先被嘗試,然後是第二個地址,如果客戶端不能連線到第一個地址,等等。

如果還提供了一個ExecutorService(使用factory.newConnection(es, addressResolver)形式)執行緒池將與(第一個)成功連線相關聯。

AddressResolver是實現自定義服務發現邏輯的完美場所,這在動態基礎結構中尤其有用。與自動恢復相結合,客戶端可以自動連線到第一次啟動時甚至不啟動的節點。關聯和負載平衡是自定義AddressResolver可能有用的其他場景。

Java客戶端附帶了以下實現(請參閱javadoc以獲得詳細資訊):

  1. DnsRecordIpAddressResolver:給定主機名,返回其IP地址(針對平臺DNS伺服器的解析)。這對於簡單的基於dns負載平衡或故障轉移非常有用。

  2. DnsSrvRecordAddressResolver:給定服務的名稱,返回主機名/埠對。搜尋是作為一個DNS SRV請求實現的。這在使用類似於HashiCorp Consul的服務註冊時非常有用。

心跳超時

請參閱Heartbeats guide,瞭解關於heartbeats的更多資訊,以及如何在Java客戶機中配置它們。

自定義執行緒工廠

像Google App Engine(GAE)這樣的環境可以限制直接的執行緒例項化。要在這樣的環境中使用RabbitMQ Java客戶端,有必要配置一個自定義的ThreadFactory,該工廠使用適當的方法來例項化執行緒,例如GAE的ThreadManager。下面是Google App Engine的一個例子:

import com.google.appengine.api.ThreadManager;

ConnectionFactory cf = new ConnectionFactory();
cf.setThreadFactory(ThreadManager.backgroundThreadFactory());

對Java非阻塞IO的支援

4.0版本的Java客戶端的為Java非阻塞IO提供支援(a.k.a Java NIO)。NIO不應該比阻塞IO更快,它只允許更容易地控制資源(在本例中是執行緒)。

使用預設的阻塞IO模式,每個連線都使用一個執行緒從網路套接字讀取。使用NIO模式,您可以控制網路套接字讀/寫的執行緒數量。

如果您的Java程序使用許多連線(數十或數百),請使用NIO模式。您應該使用較少的執行緒,而不是預設的阻塞模式。使用適當數量的執行緒集,您不應該嘗試任何效能的下降,特別是如果連線不是很忙的話。

必須顯式地啟用NIO:

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.useNio();

NIO模式可以通過NioParams類進行配置:

connectionFactory.setNioParams(new NioParams().setNbIoThreads(4));

NIO模式使用合理的預設值,但是您可能需要根據您自己的工作負載來更改它們。其中一些設定是:使用的IO執行緒總數、緩衝區的大小、用於IO迴圈的服務執行器、記憶體中的寫佇列的引數(寫請求在被髮送到網路之前入隊)。請閱讀Javadoc以獲得詳細資訊和預設值。


從網路故障中自動恢復

連線恢復

客戶端和RabbitMQ節點之間的網路連線可能會失敗。RabbitMQ Java客戶端支援連線和拓撲(佇列、交換、繫結和消費者)的自動恢復。許多應用程式的自動恢復過程遵循以下步驟:

  1. Reconnect(重新連線)
  2. Restore connection listeners(恢復連線監聽器)
  3. Re-open channels(重新開啟通道)
  4. Restore channel listeners(恢復通道偵聽器)
  5. Restore channel basic.qos setting, publisher confirms and transaction settings(恢復基本頻道。qos設定、釋出者確認和事務設定)
拓撲恢復包括以下操作,為每個通道執行       
  1. Re-declare exchanges (except for predefined ones)(重新宣告交換(除了預定義的交換))
  2. Re-declare queues(重新宣告佇列)
  3. Recover all bindings(恢復所有繫結)
  4. Recover all consumers(恢復所有消費者)

在4.0.0版本的Java客戶端中,預設情況下自動恢復是啟用的(因此也可以進行拓撲恢復)。

使用factory.setAutomaticRecoveryEnabled(boolean)方法可以禁用或啟用自動連線恢復。下面的程式碼片段展示瞭如何顯式地啟用自動恢復(例如,在4.0.0之前的Java客戶端):

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
factory.setAutomaticRecoveryEnabled(true);
// connection that will recover automatically
Connection conn = factory.newConnection();

如果由於異常(例如RabbitMQ節點仍然不能到達)而恢復失敗,那麼在固定的時間間隔之後將重新嘗試(預設為5秒)。可以配置間隔時間:

ConnectionFactory factory = new ConnectionFactory();
// attempt recovery every 10 seconds(每10秒嘗試一次恢復)
factory.setNetworkRecoveryInterval(10000);

當提供一個地址列表時,列表會被打亂,所有的地址都會被嘗試,一個接著一個:

ConnectionFactory factory = new ConnectionFactory();

Address[] addresses = {new Address("192.168.1.4"), new Address("192.168.1.5")};
factory.newConnection(addresses);

恢復監聽器

可以在可恢復的連線和通道上註冊一個或多個恢復偵聽器。當啟用連線恢復,連線將會返回通過實現com.rabbitmq.client.Recoverable包下的兩個方法ConnectionFactory#newConnection 和Connection#createChannel 。提供兩種具有相當描述性名稱的方法:

addRecoveryListener
removeRecoveryListener

注意,為了使用這些方法,您現在需要將連線和通道轉換為Recoverable

對釋出的影響

當連線關閉時,使用Channel.basicPublish釋出的訊息將丟失。客戶端不會在連線恢復後將它們儲入佇列。為了確保釋出的訊息能夠到達RabbitMQ應用程式,需要使用釋出者確認和對連線失敗做出解釋說明。

拓撲恢復

拓撲恢復涉及到交換、佇列、繫結和消費者的恢復。當啟用自動恢復功能時,預設啟用它。因此,在Java客戶端4.0.0中預設啟用拓撲恢復。

如果需要,可以顯式地禁用拓撲恢復:

ConnectionFactory factory = new ConnectionFactory();

Connection conn = factory.newConnection();
// enable automatic recovery (e.g. Java client prior 4.0.0)
// 啟用自動恢復(例如,在4.0.0之前的Java客戶端)
factory.setAutomaticRecoveryEnabled(true);
// disable topology recovery
// 禁用拓撲復蘇
factory.setTopologyRecoveryEnabled(false);

故障檢測和恢復限制

自動連線恢復有許多限制和專門的設計決策,應用程式開發人員需要知道這些決策。

當連線關閉或丟失時,它需要時間來檢測。因此,有一個時間視窗,其中庫和應用程式都不知道連線失敗的有效性。在此期間釋出的任何訊息都將被序列化,並像往常一樣被寫入TCP套接字。他們向代理的交付只能通過釋出者確認:AMQP 0-9-1的釋出完全是非同步的。

當一個連線啟用了自動恢復的連線時,當一個套接字或輸入/輸出操作錯誤被檢測到時,在預設情況下,恢復將在預設情況下啟動,預設為5秒。該設計假設,即使許多網路故障是短暫的,並且通常是短暫的,但它們不會立即消失。連線恢復嘗試將在相同的時間間隔內繼續,直到成功開啟新連線。

當連線處於恢復狀態時,任何在其通道上嘗試的釋出都將被一個異常拒絕。針對外出的訊息,客戶端當下不執行任何內部緩衝。當恢復成功時,應用程式開發人員負責跟蹤這些訊息並重新發布它們。釋出者確認是一個協議擴充套件,它應該被那些不能承受訊息損失的釋出者使用。

當通道由於通道級異常而關閉時,連線恢復將不會啟動。此類異常通常表示應用程式級別的問題。該library無法對何時發生這種情況作出明智的決定。

即使在連線恢復啟動後,關閉通道也無法恢復。這包括顯式關閉通道和上面的通道級異常情況。

手動確認和自動恢復

當使用手動確認時,在訊息傳遞和確認之間可能會出現與RabbitMQ節點的網路連線失敗情況。在連線恢復之後,RabbitMQ將在所有通道上重新設定傳輸標記。這意味著basic.ackbasic.nackbasic.reject 使用舊的交付標記將導致通道異常。為了避免這種情況,RabbitMQ Java客戶端保持跟蹤和更新發送標記,使它們在恢復之間單調地增長。basicack()、channel.basicnack()和channel.basicreject()然後將調整後的交付標籤翻譯成RabbitMQ使用的標籤。使用過期的交付標籤將不會被髮送,使用手動確認和自動恢復的應用必須能夠處理重新交付的情況。


未處理異常

與連線、通道、恢復和消費者生命週期相關的未處理異常被委託給異常處理程式。異常處理程式是實現了ExceptionHandler 介面的物件。預設情況下,使用了DefaultExceptionHandler的例項。它將異常細節輸出到標準輸出。

可以使用ConnectionFactory#setExceptionHandler來覆蓋處理程式。它將用於工廠建立的所有連線:

ConnectionFactory factory = new ConnectionFactory();
cf.setExceptionHandler(customHandler);

異常處理程式應該用於異常日誌記錄。


度量和監控

在版本4.0.0中,客戶端收集執行時指標(如已釋出的訊息的數量)。度量指標集合是可選的,並在ConnectionFactory級別上設定,使用setmetricscollsetter(metricscollsetter)方法。這個方法需要一個度量標準的例項,這個例項在客戶端程式碼的幾個地方被呼叫。

客戶端支援Micrometer(版本4.3)和下拉向導的標準。

以下是收集到的資料:

Number of open connections  (開啟的連線數量)   
Number of open channels  (開啟的通道數量)    
Number of published messages (已經發布了的訊息)     
Number of consumed messages (消費了的訊息數量)      
Number of acknowledged messages (確認了的訊息數量)     
Number of rejected messages (被拒絕的資訊)         

對於與訊息相關的度量標準,不管是測微計和Dropwizard指標都提供了計數,但也包括平均速率、最後五分鐘速率等。他們還支援用於監視和報告的常用工具(JMX, Graphite, Ganglia, Datadog, etc)。請參閱下面的專用部分,瞭解更多細節。

請注意以下有關度量標準的收集:

不要忘記在使用Micrometer 或Dropwizard 度量時,將適當的依賴項(in Maven, Gradle, 或者作為 JAR 檔案)新增到JVM的classpath中。這些是可選的依賴項,並且不會被Java客戶端自動引用。您還可能需要根據所使用的reporting backend(s)來新增其他依賴項。
指標集合是可擴充套件的。為特定的需求實現一個自定義的MetricsCollector是被鼓勵的。     
MetricsCollector是在ConnectionFactory級別設定的,但是可以在不同的例項之間共享。
度量收集不支援事務。例如,如果在事務中傳送一個確認資訊,然後事務被回滾,那麼確認就會被計入客戶端指標(但顯然不是由代理進行的)。注意,確認實際上被髮送到代理,然後被事務回滾取消,因此客戶端指標在傳送的確認中是正確的。作為總結,不要使用客戶度量來作為關鍵的業務邏輯,它們不能保證是完全準確的。它們的目的是簡化關於執行系統的推理,使操作更加高效。      

Micrometer support

您可以以下方式啟用Micrometer指標收集 :

ConnectionFactory connectionFactory = new ConnectionFactory();
MicrometerMetricsCollector metrics = new MicrometerMetricsCollector();
connectionFactory.setMetricsCollector(metrics);
...
metrics.getPublishedMessages(); // 獲取 Micrometer's Counter 物件

Micrometer 支援several reporting backends: Netflix Atlas, Prometheus, Datadog, Influx, JMX, 等待等。

你通常會把MeterRegistry 的例項傳遞給MicrometerMetricsCollector。

下面是一個使用JMX的示例:

JmxMeterRegistry registry = new JmxMeterRegistry();
MicrometerMetricsCollector metrics = new MicrometerMetricsCollector(registry);
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setMetricsCollector(metrics);

Dropwizard指標支援

你可以採用以下方式使用Dropwizard的資料收集:

ConnectionFactory connectionFactory = new ConnectionFactory();
StandardMetricsCollector metrics = new StandardMetricsCollector();
connectionFactory.setMetricsCollector(metrics);
...
metrics.getPublishedMessages(); // 得到指標的計量物件

Dropwizard度量支援多個several reporting backends:console, JMX, HTTP, Graphite, Ganglia, etc。

你通常會把MetricsRegistry的例項傳遞給StandardMetricsCollector。

MetricRegistry registry = new MetricRegistry();
StandardMetricsCollector metrics = new StandardMetricsCollector(registry);

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setMetricsCollector(metrics);

JmxReporter reporter = JmxReporter
  .forRegistry(registry)
  .inDomain("com.rabbitmq.client.jmx")
  .build();
reporter.start();

谷歌應用引擎上的RabbitMQ Java客戶端

在Google App Engine(GAE)上使用RabbitMQ Java客戶端需要使用一個自定義執行緒工廠,該工廠使用GAE的ThreadManager例項化執行緒(見上文)。此外,還需要設定一個低心跳間隔(4-5秒),以避免在GAE上執行低InputStream讀取超時:

ConnectionFactory factory = new ConnectionFactory();
cf.setRequestedHeartbeat(5);

警告和限制

為了使拓撲恢復成為可能,RabbitMQ Java客戶端維護已宣告的佇列、交換和繫結的快取。快取是每個連線。某些RabbitMQ特性使得客戶端不可能觀察到某些拓撲更改,例如,由於TTL而刪除了一個佇列。RabbitMQ Java客戶機嘗試在下面的情況下使快取項無效:

When queue is deleted.(當佇列被刪除)
When exchange is deleted.(當交換被刪除)
When binding is deleted.(當繫結被刪除)
When consumer is cancelled on an auto-deleted queue.(當消費者被自動刪除的佇列取消時。)
When queue or exchange is unbound from an auto-deleted exchange.(當佇列或交換從自動刪除的交換中釋放時。)         

但是,客戶端無法跟蹤這些拓撲更改,而不僅僅是單個連線。依賴於自動刪除佇列或交換的應用程式,以及佇列TTL(注意:不是訊息TTL!),以及使用自動連線恢復,應該顯式地刪除那些不被使用或刪除的實體,以清除客戶端拓撲快取。這是由Channel#queueDelete, Channel#exchangeDelete, Channel#queueUnbind和Channel#exchangeUnbind 在RabbitMQ 3.3.x中具有冪等性的。(刪除不存在的內容並不會導致異常)。


RPC(請求/應答)模式

作為一種程式設計便利,Java客戶端API提供了一個類RpcClient,它使用臨時應答佇列,通過AMQP 0-9-1提供簡單rpc樣式的通訊設施。

該類不會在RPC引數和返回值上強加任何特定的格式。它簡單地提供了一種機制,將訊息傳送到給定的交換器,並使用特定的路由鍵,並等待應答佇列上的響應。

import com.rabbitmq.client.RpcClient;

RpcClient rpc = new RpcClient(channel, exchangeName, routingKey);

這個類如何使用AMQP 0-9-1的實現細節如下:請求訊息以basic.correlation_id 欄位集的方式傳送到該RpcClient例項的惟一值,並將basic.reply_to 設定為應答佇列的名稱。

一旦您建立了這個類的一個例項,您就可以使用它來發送RPC請求,使用以下方法:

byte[] primitiveCall(byte[] message);
String stringCall(String message)
Map mapCall(Map message)
Map mapCall(Object[] keyValuePairs)

(primitiveCall 方法將原始位元組陣列作為請求和響應體進行傳輸。方法stringCall 是圍繞primitiveCall的一個簡單的便利包裝器,將訊息體作為預設字元編碼中的字串例項。)

mapCall 的變體稍微複雜一點:它們將一個包含普通Java值的 java.util.Map 編碼為AMQP 0-9-1二進位制表表示,並以同樣的方式解碼響應。(注意,這裡有一些限制,針對什麼型別的值可以在這裡使用——請參閱javadoc以獲得詳細資訊)

所有 marshalling/unmarshalling 便利方法使用 primitiveCall 作為傳輸機制,並提供包裝層之上。


TLS的支援

可以使用TLS加密客戶端和代理之間的通訊。還支援客戶端和伺服器身份驗證(也稱為對等驗證)。下面是使用Java客戶端的加密最簡單的方法:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5671);

factory.useSslProtocol();

注意,在上面的示例中,客戶端預設不強制執行任何伺服器身份驗證(對等證書鏈驗證),“trust all certificates”TrustManager 被使用。這對於本地開發來說很方便,但容易發生中間人攻擊,因此不建議生產。要在RabbitMQ中瞭解更多關於TLS支援的資訊,請參閱“TLS指南”。如果您只想配置Java客戶端(特別是對等驗證和信任管理器部分),請閱讀TLS指南的適當部分

================================================================
轉載宣告:
文章轉載自https://blog.csdn.net/csdnzouqi/article/details/78926603

================================================================

微信公眾號