1. 程式人生 > >RabbitMQ 學習開發筆記

RabbitMQ 學習開發筆記

private basic 反饋 soc ack pcs eve build 標識

ConnectionFactory、Connection、Channel

ConnectionFactory、Connection、Channel,這三個都是RabbitMQ對外提供的API中最基本的對象。不管是服務器端還是客戶端都會首先創建這三類對象。
ConnectionFactory為Connection的制造工廠。

Connection是與RabbitMQ服務器的socket鏈接,它封裝了socket協議及身份驗證相關部分邏輯。

Channel是我們與RabbitMQ打交道的最重要的一個接口,大部分的業務操作是在Channel這個接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、發布消息等。

Queue

Queue(隊列)是RabbitMQ的內部對象,用於存儲消息

  RabbitMQ中的消息都只能存儲在Queue中,生產者生產消息並最終投遞到Queue中,消費者可以從Queue中獲取消息並消費。

  隊列是有Channel聲明的,而且這個操作是冪等的。同名的隊列多次聲明也只會創建一次。我們發送消息就是想這個聲明的隊列裏發送消息

  工作隊列的主要思想是不用等待資源密集型的任務處理完成,

為了確保消息或者任務不會丟失,rabbitmq 支持消息確信 ACK。ACK機制是消費者端從rabbitmq收到消息並處理完成後,反饋給rabbitmq,rabbitmq收到反饋信息後將消息從隊列中刪除

  如果rabbitmq向消費者改善消息時,消費者服務器掛了,消息也不會超時,即使一個消息需要非常長的時間處理,也不會導致消息超時,永遠不會從rabbitmq中刪除,

忘記通過basicAck返回確認信息是個嚴重的錯誤

rabbitmq不允許重新定義一個已有的隊列信息

QueueingConsumer

隊列消費者,用於監聽隊列中的消息。調用nextDelivery方法時,內部實現就是調用隊列的take方法。該方法的作用:獲取並移除此隊列的頭部,在元素變得可用之前一直等待(如果有必要)。說白了就是如果沒有消息,就處於阻塞狀態。

RabbitMQ 筆記-Exchanges

  Procuder Publish的Message進入了Exchange。接著通過“routing keys”, RabbitMQ會找到應該把這個Message放到哪個queue裏。queue也是通過這個routing keys來做的綁定。

有三種類型的Exchanges:direct, fanout,topic。 每個實現了不同的路由算法(routing algorithm)。

  Direct exchange: 如果 routing key 匹配, 那麽Message就會被傳遞到相應的queue中。其實在queue創建時,它會自動的以queue的名字作為routing key來綁定那個exchange。

技術分享圖片
        
生產者:
     // 聲明交換器 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 發送消息 for (String severity : routingKeys) { String message = "Send the message level: " + severity; channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent ‘" + severity + "‘:‘" + message + "‘"); } 消費者 // 聲明交換器 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 獲取匿名隊列名稱 String queueName = channel.queueDeclare().getQueue(); // 根據路由關鍵字進行多重綁定 for (String severity : routingKeys) { channel.queueBind(queueName, EXCHANGE_NAME, severity); System.out.println("ReceiveLogsDirect1 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + severity); }
技術分享圖片

  Fanout exchange: 會向響應的queue廣播。

技術分享圖片
生產者
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 分發消息
        for (int i=0; i<5; i++) {
            String message = "Hello World!" + i;
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" [x] Sent ‘" + message + "‘");
        }

消費者
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");
    
技術分享圖片

  Topic exchange: 對key進行模式匹配,比如ab*可以傳遞到所有ab*的queue。

技術分享圖片
生產者
            // 聲明一個匹配模式的交換器
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            // 待發送的消息
            String routingKeys[] = new String[]{
                "quick.orange.rabbit",
                "lazy.orange.elephant",
                "quick.orange.fox",
                "lazy.brown.fox",
                "quick.brown.fox",
                "quick.orange.male.rabbit",
                "lazy.orange.male.rabbit"
            };

            // 發送消息
            for (String severity : routingKeys) {
                String message = "From "+severity+" routingKey‘ s message!";
                channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
                System.out.println("TopicSend [x] Sent ‘" + severity + "‘:‘" + message + "‘");
            }

消費者
        // 聲明一個匹配模式的交換器
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();

        // 路由關鍵字
        String routingKeys[] = new String[] {
            "*.orange.*"
        };

        // 綁定路由關鍵字

        for (String bindingKey : routingKeys) {
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
            System.out.println("ReceiveLogsTopic1 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + bindingKey);
        }
技術分享圖片

  匿名: 直接發送到queue。

技術分享圖片
生產者
        for (int i=0; i<5; i++) {
            String message = "hello world! " + i;
            channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.out.println(" [x] Sent ‘" + message + "‘");
        }

消費者
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
技術分享圖片

RabbitMQ 筆記-RPC

RabbitMQ中實現RPC的機制是:

  • 客戶端發送請求(消息)時,在消息的屬性(MessageProperties,在AMQP協議中定義了14中properties,這些屬性會隨著消息一起發送)中設置兩個值replyTo(一個Queue名稱,用於告訴服務器處理完成後將通知我的消息發送到這個Queue中)和correlationId(此次請求的標識號,服務器處理完成後需要將此屬性返還,客戶端將根據這個id了解哪條請求被成功執行了或執行失敗)
  • 服務器端收到消息並處理
  • 服務器端處理完消息後,將生成一條應答消息到replyTo指定的Queue,同時帶上correlationId屬性
  • 客戶端之前已訂閱replyTo指定的Queue,從中收到服務器的應答消息後,根據其中的correlationId屬性分析哪條請求被執行了,根據執行結果進行後續業務處理
技術分享圖片
public class RPCClient {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private Connection connection;
    private Channel channel;
    private String replyQueueName;
    private QueueingConsumer consumer;

    public RPCClient() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 設置MabbitMQ所在主機ip或者主機名
        factory.setHost("192.168.65.136");
        factory.setUsername("rabbitmq");
        factory.setPassword("123456");
        // 創建一個連接
        connection = factory.newConnection();
        // 創建一個頻道
        channel = connection.createChannel();

        //聲明隊列
        channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

        //為每一個客戶端獲取一個隨機的回調隊列
        replyQueueName = channel.queueDeclare().getQueue();
        //為每一個客戶端創建一個消費者(用於監聽回調隊列,獲取結果)
        consumer = new QueueingConsumer(channel);
        //消費者與隊列關聯
        channel.basicConsume(replyQueueName, true, consumer);
    }

    /**
     * 獲取斐波列其數列的值
     *
     * @param message
     * @return
     * @throws Exception
     */
    public String call(String message) throws Exception{
        String response = null;
        String corrId = java.util.UUID.randomUUID().toString();

        //設置replyTo和correlationId屬性值
        BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();

        //發送消息到rpc_queue隊列
        channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes());

        while (true) {
            System.out.println("OK?");
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            System.out.println("OK");
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response = new String(delivery.getBody(),"UTF-8");
                break;
            }
        }

        return response;
    }

    public static void main(String[] args) throws Exception {
        RPCClient fibonacciRpc = new RPCClient();
        String result = fibonacciRpc.call("4");
        System.out.println( "fib(4) is " + result);
    }
}
技術分享圖片 技術分享圖片
public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 設置MabbitMQ所在主機ip或者主機名
        factory.setHost("192.168.65.136");
        factory.setUsername("rabbitmq");
        factory.setPassword("123456");
        // 創建一個連接
        Connection connection = factory.newConnection();
        // 創建一個頻道
        Channel channel = connection.createChannel();

        //聲明隊列
        channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

        //限制:每次最多給一個消費者發送1條消息
        channel.basicQos(1);

        //為rpc_queue隊列創建消費者,用於處理請求
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

        System.out.println(" [x] Awaiting RPC requests");

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();

            //獲取請求中的correlationId屬性值,並將其設置到結果消息的correlationId屬性中
            BasicProperties props = delivery.getProperties();
            BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();
            //獲取回調隊列名字
            String callQueueName = props.getReplyTo();

            String message = new String(delivery.getBody(),"UTF-8");

            System.out.println(" [.] fib(" + message + ")");

            //獲取結果
            String response = "" + fib(Integer.parseInt(message));
            //先發送回調結果
            channel.basicPublish("", callQueueName, replyProps,response.getBytes());
            //後手動發送消息反饋
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }

    /**
     * 計算斐波列其數列的第n項
     *
     * @param n
     * @return
     * @throws Exception
     */
    private static int fib(int n) throws Exception {
        if (n < 0)
            throw new Exception("參數錯誤,n必須大於等於0");
        if (n == 0)
            return 0;
        if (n == 1)
            return 1;
        return fib(n - 1) + fib(n - 2);
    }
}
技術分享圖片

RabbitMQ 學習開發筆記