《RabbitMQ官方指南》RPC
遠端過程呼叫
(RPC)
(Java 客戶端版)
在第二章(工作佇列 Work Queues)中,我們學習瞭如何使用工作佇列在多使用者之間分配耗時的任務。
但是如果我們需要在遠端電腦上執行一個函式方法,並且還要等待一個返回結果該怎麼辦?好吧,這將會是一個不一樣的故事。這種模式通常被我們稱為遠端過程呼叫或者RPC.
在本章教程中,我們將會學習使用RabbitMQ去搭建一個RPC系統:一個客戶端和一個可以升級(擴充套件)的RPC伺服器。由於我們沒有任何的值得分配的耗時的任務,所以我們將建立一個返回斐波那契數列的虛擬的RPC服務。
客戶端介面(Client interface)
為了說明如何使用去使用一個PRC服務,我們將建立一個簡單的客戶端類。它將暴露一個命名為call的方法,這個方法傳送一個阻塞的PRC請求,直到收到一個響應的回覆:
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient(); String result = fibonacciRpc.call("4"); System.out.println( "fib(4) is " + result);
關於RPC的註釋:
儘管RPC是一個十分常見的計算機設計模型,但是它經常被批評。當一個開發者不能清晰的認識到請求的函式方法是本地的方法還是一個緩慢的PRC時就會出現問題。這樣混亂的結果導致了一個不可預知的系統,並且增加了一些不必要的複雜的除錯工作。所以,誤用PRC可能導致一些不可維護錯綜複雜的程式碼,化簡為繁。
銘記這一點,請遵循以下的建議:
- 確認一個函式方法,究竟是本地的還是遠端的。
- 記錄你的系統,確保元件之間的依賴關係是清晰的。
- 處理錯誤問題:當RPC伺服器長時間宕機時,客戶點應該如何做出迴應?
為了避免RPC中出現的疑問,如果可以,你應該使用非同步管道的方式來代替RPC的阻塞,請求的結果會被非同步推送到下一個計算階段。
回撥佇列(Callback queue)
通常情況下,使用RabbitMQ去實現RPC很容易。一個客戶端傳送請求資訊,對應得到一個伺服器端回覆的響應資訊。為了得到響應資訊,我們需要在請求的時候傳送一個“回撥”佇列地址。我們可以使用預設的(在JAVA客戶端中獨有)。讓我們試試吧:
callbackQueueName = channel.queueDeclare().getQueue();BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName).build(); channel.basicPublish("", "rpc_queue", props, message.getBytes()); // ... then code to read a response message from the callback_queue ...
訊息屬性:
AMQP 0-9-1協議預先定義了傳送一個訊息需要包含的14個屬性。但是其中大多數的屬性很少被使用,以下的除外:
1.deliveryMode:將訊息標記為持久訊息(值為2)或短暫訊息(任何其他值)。 從第二個教程可以記住這個屬性。
2.contentType:用於描述mime型別的編碼。 例如:對於經常使用的JSON編碼,將此屬性設定為:application / json。
3.replyTo:通常用來命名一個回撥佇列。
4.correlationId:用於將RPC響應與請求相關聯。
我們需要這個新的匯入:
import com.rabbitmq.client.AMQP.BasicProperties;
關聯id (correlationId):
在上面提出的方法中,我們建議為每個RPC請求建立一個回撥佇列。 這是非常低效的,但幸運的是有一個更好的方法:讓我們為每個客戶端建立一個回撥佇列。
這就提出了一個新的問題,在佇列中得到一個響應時,我們不清楚這個響應所對應的是哪一條請求。這時候就是使用關聯id(correlationId)的時候了。我們將為每一條請求設定唯一的的值。稍後,當我們在回撥佇列裡收到一條訊息的時候,我們將檢視它的屬性,我們可以匹配出對應的請求和響應。如果我們發現了一個未知的id值,我們可以安全的丟棄這條訊息,因為它不屬於我們的請求。
你可以會問,為什麼我們在回撥佇列中忽略未知訊息,而不是判斷失敗呢?這是由於一種可能:在伺服器端發生資源競爭(導致產生未知id)。儘管不太現實,很有可能RPC在剛剛傳送給我們答案後,但是在傳送確認請求訊息前掛掉了。如果出現這種情況,重啟RPC伺服器後,我們將重新處理該條請求。這就是為什麼我們要在客戶端優雅的處理重複的響應,並且RPC理應上是冪等的。
概要:
我們的RPC將像這樣工作:
1.當客戶端啟動時,它建立一個獨有的匿名回撥佇列。
對於一條RPC請求,客戶端傳送一個具有兩個屬性的訊息:replyTo:包含回撥佇列和correlationId:包含每個請求的唯一值。
2.請求傳送到rpc_queue佇列。
3.RPC worker(aka:server)正在等待佇列上的請求。 當出現請求時,它將執行該作業,並使用佇列中replyTo欄位,將結果傳送回客戶端。
4.客戶端等待回撥佇列中的資料。當一條訊息出現時,它會檢查correlationId屬性,如果和請求中correlationId的值匹配,就返回響應結果給應用程式。
把他們整合到一起:
斐波納契任務:
private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n-1) + fib(n-2); }
我們宣告斐波納契方法。它只假定輸入有效的正整數。(不要期望它執行大數字,並且它可能是用最慢的遞迴方法實現的。)
我們的RPC伺服器RPCServer.java的程式碼如下所示:
</pre> import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; public static void main(String[] argv) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; try { connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); System.out.println(" [x] Awaiting RPC requests"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(properties.getCorrelationId()) .build(); String response = ""; try { String message = new String(body,"UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response += fib(n); } catch (RuntimeException e){ System.out.println(" [.] " + e.toString()); } finally { channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(envelope.getDeliveryTag(), false); // RabbitMq consumer worker thread notifies the RPC server owner thread synchronized(this) { this.notify(); } } } }; channel.basicConsume(RPC_QUEUE_NAME, false, consumer); // Wait and be prepared to consume the message from RPC client. while (true) { synchronized(consumer) { try { consumer.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } catch (IOException | TimeoutException e) { e.printStackTrace(); } finally { if (connection != null) try { connection.close(); } catch (IOException _ignore) {} } } } <pre>
伺服器程式碼比較簡單:
1.像往常一樣,我們開始建立連線,通道並宣告佇列。
2.我們可能想執行更多的伺服器程序。為了在多個伺服器上做到負載均衡,我們需要在channel.basicQos中設定prefetchCount設定。
3.我們使用basicConsume訪問佇列,我們以物件(DefaultConsumer)的形式提供一個回撥,該物件將執行該操作並將響應傳送回來。
我們的RPC客戶端的程式碼RPCClient.java:
</pre> import com.rabbitmq.client.*; import java.io.IOException; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeoutException; public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; public RPCClient() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); } public String call(String message) throws IOException, InterruptedException { String corrId = UUID.randomUUID().toString(); AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1); channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { if (properties.getCorrelationId().equals(corrId)) { response.offer(new String(body, "UTF-8")); } } }); return response.take(); } public void close() throws IOException { connection.close(); } //... <pre>
客戶端程式碼稍微複雜點:
- 我們建立一個連線和通道,並宣告一個獨有的’回撥’佇列作為回覆。
- 我們訂閱’回撥’佇列,這樣我們可以收到RPC響應資訊。
- 我們請求方法建立一個實際的RPC請求。
- 在這裡,我們首先生成一個唯一的correlationId數字並儲存它,我們在DefaultConsumer中實現handleDelivery的時候將使用此值來捕獲相應的響應。
- 接下來,我們釋出請求訊息,包含兩個屬性:replyTo和correlationId。
- 現在,我們可以坐下來等待適當的響應到達。
- 由於我們消費者交付處理髮生在分開的執行緒中,因此有時候我們需要在響應到達前掛起主執行緒。使用BlockingQueue是其中的一種解決方案。 這裡我們建立ArrayBlockingQueue,容量設定為1,因為我們只需要等待一個響應。
- handleDelivery方法正在做一個非常簡單的工作,它檢查每個的響應訊息的correlationId是否是我們正在尋找的。 如果是,它會把響應訊息放入BlockingQueue中。
- 同時主執行緒正在等待從BlockingQueue取出響應的訊息。
最後,我們將響應返回給使用者。
建立客戶請求:
RPCClient fibonacciRpc = new RPCClient();&amp;nbsp; System.out.println(" [x] Requesting fib(30)"); String response = fibonacciRpc.call("30"); System.out.println(" [.] Got '" + response + "'"); fibonacciRpc.close();
現在是時候去演示我們完整的例項程式碼了(RPCClient.java和RPCServer.java),包括基本異常處理。
像往常一樣編譯和設定類路徑(參見教程一):
javac -cp $CP RPCClient.java RPCServer.java
我們的RPC服務現在已經準備好了。 我們可以啟動伺服器:
</pre> java -cp $CP RPCServer<em># => [x] Awaiting RPC requests</em>
請求執行客戶端的fibonacci數列:
</pre> java -cp $CP RPCClient<em># => [x] Requesting fib(30)</em>
這裡提出的設計不是RPC服務的唯一可能的實現方法,但是其具有一些重要的優點:
- 如果RPC伺服器太慢,可以通過執行另一個RPC伺服器進行擴充套件。 嘗試在新的控制檯中執行第二個RPCServer。
- 在客戶端,RPC只需要傳送和接收一條訊息。 不需要像queueDeclare這樣的同步呼叫。 因此,對於單個RPC請求,RPC客戶端只需要一次網路往返。
我們的程式碼仍然非常簡單,並且不會嘗試解決更復雜(但是重要的)問題,例如:
- 如果沒有伺服器執行,客戶端應該如何反應?
- 客戶端是否應該擁有PRC的一些超時請求的處理?
- 如果伺服器發生故障並引發異常,是否應該將其轉發給客戶端?
- 在執行之前防止無效的傳入訊息(例如檢查邊界,型別)。
如果要進行實驗,您可能會發現management UI對檢視佇列很有用。