1. 程式人生 > >《RabbitMQ官方指南》主題

《RabbitMQ官方指南》主題

Topics

上一個教程中,我們改進了我們的日誌系統而不是使用只能進行廣播的fanout交換型別,我們使用direct型別,能夠選擇性地接收日誌。 雖然使用direct交換型別改進了我們的系統,但它仍然有限制 – 它不能基於多條件進行路由選擇。 在我們的日誌記錄系統中,我們可能不僅要根據日誌級別訂閱日誌,還可以基於日誌源進行訂閱。您可能會從syslog unix工具中瞭解過這個概念,該工具可以根據日誌級別(info/warn/crit..)和裝置(auth / cron / kern …)來路由日誌。

這將給我們很大的靈活性 – 我們可能只想要監聽來自“cron”的重要錯誤,但是想監聽來自”kern“的所有日誌。 要在我們的日誌系統中實現這一點,我們需要了解一個更復雜的交換型別-Topic(主題)交換。

主題交換

傳送到主題交換區的訊息不能是任意的routing_key – 它必須是由點分隔的單詞列表。這些單詞可以是任何東西,但通常它們指定與訊息相關聯的一些功能。幾個有效的路由金鑰示例:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。路由金鑰中可以有任意多的單詞,最多可達255個位元組。

繫結鍵也必須是相同的形式。主題交換背後的邏輯類似於直接交換 – 使用特定路由金鑰傳送的訊息將被傳遞到與繫結金鑰匹配的所有繫結佇列。但是,繫結金鑰有兩個重要的特殊情況:

  • *(star)可以替代一個字。
  • #(hash)可以替換零個或多個單詞。
用下面的例子可以很容易解釋:

在這個例子中,我們將傳送所有描述動物的訊息。訊息將使用由三個單詞(兩個點)組成的路由金鑰傳送。路由金鑰中的第一個單詞將描述速度,第二個描述顏色和第三個代表物種: “<speed>.<colour>.<species>“。 我們建立了三個繫結:Q1繫結鍵“* .orange.*”, Q2繫結“*.*.rabbit”和Q3繫結 “lazy.#“。

這些繫結可以總結為:

  • Q1對所有的橙色動物感興趣。
  • Q2想聽聽有關兔子的一切,以及有關懶惰動物的一切。

將路由金鑰設定為“quick.orange.rabbit”的訊息將傳遞給兩個佇列。訊息“lazy.orange.elephant”也會去他們兩個。另一方面,“quick.orange.fox”只會轉到第一個佇列,而“lazy.brown.fox”只能到第二個。 “lazy.pink.rabbit”只會傳遞到第二個佇列,即使它匹配兩個繫結。 “quick.brown.fox”不匹配任何繫結,所以它將被丟棄。

如果我們違反約定併發送一個或四個單詞的訊息,如 “orange” 或 “quick.orange.male.rabbit“,會發生什麼?那麼這些訊息將不會匹配任何繫結,並將丟棄。 另一方面,”lazy.orange.male.rabbit“即使它有四個單詞,但他將匹配最後的繫結,並將被傳遞到第二個佇列。

主題交換(Topic exchange)

主題交換是強大的,可以像其他交換型別一樣行事。

當佇列與“#”(hash)繫結鍵繫結時,

它將接收所有訊息,而不管路由金鑰是什麼,叫如fanout交換一樣。

當特殊字元“*”(start)和“#”(hash)在繫結中不被使用時,主題交換將表現得像一個direct交換。

把它們組合在一起

我們將在我們的日誌系統中使用主題交換。我們將從一個假設開始,日誌的路由金鑰將有兩個單片語成: “<facility>.<severity>“。

該程式碼與上一個教程幾乎相同。

EmitLogTopic.java的程式碼:

import com.rabbitmq.client.*;

import java.io.IOException;

public class EmitLogTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv)
                  throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String routingKey = getRouting(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
        System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

        connection.close();
    }
    //...
}

ReceiveLogsTopic.java:

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogsTopic {
  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1) {
      System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
      System.exit(1);
    }

    for (String bindingKey : argv) {
      channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
    }

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}

編譯執行,像在Tutorial 1中一樣,把classpath加進來 – 在Windows系統, 使用%CP%.

編譯:

javac -cp $CP ReceiveLogsTopic.java EmitLogTopic.java

接收所有日誌:

java -cp $CP ReceiveLogsTopic "#"

從裝置 “kern“接收日誌:

java -cp $CP ReceiveLogsTopic "kern.*"

或者只想接收級別為的”critical” 日誌:

java -cp $CP ReceiveLogsTopic "*.critical"

可以建立多個繫結:

java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"

生產路由金鑰為”kern.critical” 的日誌:

java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"

接下來,在教程6( tutorial 6)中找出如何做一個包含往返資訊的遠端過程呼叫。