1. 程式人生 > >RabbitMQ-RPC

RabbitMQ-RPC

前篇文章,講了如何使用工作佇列在多個消費者之間分配耗時的任務.如果我們需要在遠端計算機上執行一個方法並等待結果,這個模式通常叫rpc

Client interface(客戶端介面)

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);

Callback queue(回撥佇列)

通常在RabbitMQ上做RPC很容易。 客戶端傳送請求訊息,伺服器回覆響應訊息。 為了接收響應,我們需要在請求訊息中傳送一個回撥佇列。 我們可以使用預設佇列(僅限Java客戶端)。

callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... then code to read a response message from the callback_queue ...

訊息屬性

AMQP協議預定義了一組帶有訊息的14個屬性。 大多數屬性很少使用,但以下情況除外:
* deliveryMode:訊息傳遞模式,2.訊息持久化,其他值-瞬態.
* contentType:用於描述編碼的mime型別。 例如,對於經常使用的JSON編碼,將此屬性設定為:application / json是一種很好的做法。
* replyTo:回撥佇列名字
* correlationId:用於將RPC響應與請求相關聯。

Correlation Id

如果為每個rpc請求,都建立一個回撥佇列,會很低效,所以可以採用: 為每個客戶端建立一個單一的回撥佇列.
我們要為每個請求設定唯一的值,在回撥佇列中獲取訊息,correlation id, 關聯response和request就是基於這個屬性值的.如果看到一個未知的correlationid屬性值的訊息,拋棄它

您可能會問,為什麼我們應該忽略回撥佇列中的未知訊息,而不是失敗並出現錯誤?這是由於伺服器端可能存在競爭條件。雖然不太可能,但RPC伺服器可能會在向我們傳送結果後,傳送請求的確認訊息之前宕調了,這就有可能傳送未知的correlationid屬性值了。如果發生這種情況,重新啟動的RPC伺服器將再次處理請求。這就是為什麼在客戶端上我們必須優雅地處理重複的響應,理想情況下RPC應該是冪等的。

Summary

image.png | left | 671x189

流程
* 客戶端啟動時,會建立一個匿名的獨佔回撥佇列。
* 對於RPC請求,客戶端傳送帶有兩個屬性的訊息:replyTo(設定為回撥佇列)和correlationId(設定為每個請求的唯一值)。
* 請求被髮送到rpc_queue佇列。
* RPC 伺服器監聽rpc_queue佇列中請求. 當出現請求時,它會執行該作業,接收佇列就是replyTo設定的回撥佇列。
* 客戶端監聽回撥佇列。 出現訊息時,它會檢查correlationId屬性。 如果它與請求中的值匹配,則返回對應用程式的響應。
(客戶端,我有一堆東西,你處理一下,correlationid,我的請求標識,replyto.處理完結果返回這個佇列中
服務端:處理完了,返回,correlationid,客戶端拿它的id和服務端返回的id對比,然後進行接收)

程式碼

package com.tgb.kwy.rpc;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

/**
 * Description
 *
 * @author kongwy [email protected]
 * @version 1.0
 * @date 2018-07-11-09 -27
 */
public class RPCClient {
    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";
    private String replyQueueName;

    public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.159.132");/*設定rabbitmq所在主機ip或主機名*/
        /*指定使用者名稱和密碼*/
        factory.setUsername("admin");
        factory.setPassword("admin");
        connection = factory.newConnection();
        channel = connection.createChannel();
        replyQueueName = channel.queueDeclare().getQueue();
    }

    public String call(String message) throws IOException, InterruptedException {
        final String corrId = UUID.randomUUID().toString();
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();
        channel.basicPublish("", replyQueueName, props, message.getBytes("UTF-8"));
        final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
        channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                if (properties.getCorrelationId().equals(corrId)) {
                    response.offer(new String(body, "UTF-8"));
                }
            }
        });
        return response.take();
    }
    public void close() throws IOException{
        connection.close();
    }
    public static void main(String[] argv) {
        RPCClient fibonacciRpc = null;
        String response = null;
        try {
            fibonacciRpc = new RPCClient();

            System.out.println(" [x] Requesting fib(30)");
            response = fibonacciRpc.call("30");
            System.out.println(" [.] Got '" + response + "'");
        }
        catch  (IOException | TimeoutException | InterruptedException e) {
            e.printStackTrace();
        }
        finally {
            if (fibonacciRpc!= null) {
                try {
                    fibonacciRpc.close();
                }
                catch (IOException _ignore) {}
            }
        }
    }
}
package com.tgb.kwy.rpc;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Description
 *
 * @author kongwy [email protected]
 * @version 1.0
 * @date 2018-07-11-09 -10
 */
public class RPCServer {
    private static final String RPC_QUEUE_NAME="rpc_queue";
    private static int fib(int n) {
        if (n ==0) return 0;
        if (n == 1) return 1;
        return fib(n-1) + fib(n-2);
    }
    public static void main(String[] args) {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("192.168.159.132");/*設定rabbitmq所在主機ip或主機名*/
        /*指定使用者名稱和密碼*/
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection=null;
        try {
            connection      = factory.newConnection();
            final Channel channel = connection.createChannel();

            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

            channel.basicQos(1);

            System.out.println(" [x] Awaiting RPC requests");

            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                            .Builder()
                            .correlationId(properties.getCorrelationId())
                            .build();

                    String response = "";

                    try {
                        String message = new String(body,"UTF-8");
                        int n = Integer.parseInt(message);

                        System.out.println(" [.] fib(" + message + ")");
                        response += fib(n);
                    }
                    catch (RuntimeException e){
                        System.out.println(" [.] " + e.toString());
                    }
                    finally {
                        channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
                        channel.basicAck(envelope.getDeliveryTag(), false);
                        // RabbitMq consumer worker thread notifies the RPC server owner thread
                        synchronized(this) {
                            this.notify();
                        }
                    }
                }
            };

            channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
            // Wait and be prepared to consume the message from RPC client.
            while (true) {
                synchronized(consumer) {
                    try {
                        consumer.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
        finally {
            if (connection != null)
                try {
                    connection.close();
                } catch (IOException _ignore) {}
        }
    }
}

執行結果

rpc.png | center | 747x218

注:程式碼來自官網.官網對於概念性的內容,講解的還是很清楚的