1. 程式人生 > >rabbitmq+java入門(三)exchange的使用

rabbitmq+java入門(三)exchange的使用

int none throw receive spa 必須 消息 tostring message

參考:http://www.rabbitmq.com/tutorials/tutorial-three-java.html

先決條件

本教程假定RabbitMQ 在標準端口(5672上的本地主機安裝並運行如果您使用不同的主機,端口或證書,則連接設置需要進行調整。

在之前的教程中,我們創建了一個工作隊列。工作隊列背後的假設是,每個任務只被傳遞給一個消費者。在這一部分,我們將做一些完全不同的事情 - 我們會向多個消費者傳遞信息。這種模式被稱為“發布/訂閱”。

為了說明這種模式,我們將建立一個簡單的日誌系統。它將包含兩個程序 - 第一個將發射日誌消息,第二個將接收並打印它們。

在我們的日誌系統中,每個接收程序的運行副本都會收到消息。

這樣我們就可以運行一個接收器並將日誌指向硬盤; 同時我們將能夠運行另一個接收器並在屏幕上查看日誌。

基本上,發布的日誌消息將被廣播給所有的接收者。

exchanges

在本教程的上一個部分中,我們發送消息並從隊列中接收消息。現在是時候在rabbit中引入完整的消息傳遞模型了。

讓我們快速回顧一下前面教程中的內容:

  • 一個生產者是發送消息的用戶的應用程序。
  • 一個隊列是存儲消息的緩沖器。
  • 一個消費者是接收消息的用戶的應用程序。

RabbitMQ中的消息傳遞模型的核心思想是生產者永遠不會將任何消息直接發送到隊列中。實際上,生產者通常甚至不知道郵件是否會被傳送到任何隊列中。

相反,生產者只能發送消息給

交易所(exchange)交換是一件非常簡單的事情。一方面它接收來自生產者的消息,另一方面它推動他們排隊。exchange必須知道如何處理收到的消息。是否應該附加到特定隊列?它應該附加到許多隊列中嗎?或者它應該被丟棄嗎。這些規則都由交換類型定義

技術分享圖片

有幾種可用的交換類型:direct, topic, headers 和 fanout我們將關註最後一個fanout讓我們創建一個這種類型的exchage,命名為logs

channel.exchangeDeclare(“logs”“fanout”);

fanout非常簡單。詞如其名,它只是將收到的所有消息廣播到它所知道的所有隊列中。

這也正是我們的日誌系統所需要的。

綁定

技術分享圖片

exchange和隊列之間的關系稱為綁定。代碼如下:

channel.queueBind(queueName,“logs”“”);

列出綁定

您可以用以下的命令列出所有的綁定:

rabbitmqctl list_bindings

把以上這些放在一起

技術分享圖片

發出日誌消息的生產者程序與之前的教程沒有多大區別。最重要的變化是我們現在想發布消息到我們的自己定義的名稱為logs的exchange而不是一個沒有名稱的exchange發送時我們需要提供一個routingKey,但是對於fanout交換而言這個值被忽略了。以下EmitLog.java程序的代碼

package rmq.publishSubscribe;

/**
 * Created by zuzhaoyue on 18/5/16.
 */
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //exchange的類型包括:direct, topic, headers and fanout,我們本例子主要關註的是fanout
        //fanout類型是指向所有的隊列發送消息
        //以下是創建一個fanout類型的exchange,取名logs
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        String message = getMessage(argv);

        //1.在上個"hello world"例子中,我們用的是channel.basicPublish("", "hello", null, message.getBytes());
        //這裏用了默認的exchanges,一個空字符串 "",在basicPublish這個方法中,第一個參數即是exchange的名稱
        //2.準備向我們命名的exchange發送消息啦
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent ‘" + message + "‘");

        channel.close();
        connection.close();
    }

    private static String getMessage(String[] strings){
        if (strings.length < 1)
            return "info: Hello World!";
        return joinStrings(strings, " ");
    }

    private static String joinStrings(String[] strings, String delimiter) {
        int length = strings.length;
        if (length == 0) return "";
        StringBuilder words = new StringBuilder(strings[0]);
        for (int i = 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
}

 

如你所見,建立連接後我們聲明了exchange。這一步是必要的,因為不可以發布到不存在的exchange。

如果沒有隊列綁定到交換機上,這些消息將會丟失,但這對當前的例子來說沒問題; 如果沒有消費者正在收聽,我們可以放心地丟棄消息。

以下是接收方的代碼:

//package rmq.workqueues;

/**
 * Created by zuzhaoyue on 18/5/16.
 */
import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogs {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //上個例子中我們是向一個隊列中發送消息,接收方也是從一個隊列中獲取,那種情況下給隊列命名是很重要的,因為你需要生產者和消費者共享這個隊列
        //但是這個例子裏,則不需要給隊列命名,首先看下需求:即時讀取日誌,可以看出日誌系統需要的是即時性,那些舊的日誌我們不需要看,所以我們必須滿足以下兩點
        //1.每次連接rmq時我們都需要一個新的空的隊列,這個可以用隨機給隊列命名並創建來實現,或者更棒的方式是,讓rmq服務器自己隨機選擇一個名字給我們
        //2.當我們關閉與rmq的連接時,這個隊列得自動刪除
        //當然,這個已經有封裝好的方法了哈哈:channel.queueDeclare().getQueue()方法,可以創建一個暫時的,獨立的,可自動刪除並隨機命名的隊列
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        String queueName = channel.queueDeclare().getQueue();
        System.out.println("隊列名稱:" + queueName);
        //現在我們已經有了一個exchange,下一步就是讓exchange向隊列發送消息,exchange與隊列之間的關系也叫做binding(綁定)
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received ‘" + message + "‘");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

為了演示不同隊列都接收到了消息,我們把隊列的名稱打印出來,並且一個顯示在屏幕上,一個重定向到文件中:

第一個消費者啟動:啟動ReceiveLog.java的main()方法

第二個消費者啟動(重定向到文件/data/rmqlogs.log中):

javac -cp /data/amqp-client-4.2.0.jar ReceiveLogs.java
java -cp /data/amqp-client-4.2.0.jar:/data/slf4j-api-1.7.21.jar:. ReceiveLogs>/data/rmqlogs.log

註意:

1.用javac和java命令啟動時需要將package那行代碼註釋掉,不然會報找不到或無法加載主類的錯誤。

2.兩個jar包必須加上,不然會報找不到jar的異常

啟動完成後,我們啟動第一個EmotLog.java的main()方法,可以觀察到屏幕上打印內容:

技術分享圖片

與此同時,tail -f /data/rmqlogs.log顯示如下:

技術分享圖片

可以發現兩個隊列名稱不同,但接收到了相同的消息,調試成功。

rabbitmq+java入門(三)exchange的使用