1. 程式人生 > >RabbitMQ入門學習——Topics(主題)

RabbitMQ入門學習——Topics(主題)

在前面的章節中,我們改善了我們的日誌程式碼。我們使用direct直播代替fanout廣播,並且可以選擇性惡接受日誌。儘管使用direct直播改善了我們的日誌程式碼,但是它還有許多侷限性。比如:不能給予多種標準來路由。

在我們的日誌系統中,我們也許即希望按照日誌的嚴重程度來訂閱,也希望按照日誌的來源定於。你也許知道unix syslog工具的概念,它是給予嚴重程度和裝置來路由日誌的。這將給我們許多的靈活性——我們也許既希望監聽嚴重的錯誤,同時也不希望錯過來自kern的日誌。為了實現這種目的,我們需要學習一種更復雜的交換區——topic

1Topic exchange(主題交換區)

傳送到topic

交換區的訊息不能使用隨意的routing_key,它必須一串通過點分隔的單詞。可以使任意的單詞,但是它們通常都與訊息有一定的關聯。比如一些有效的routing key"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。在routing key中可以包含任意多個單詞,只要它們不超過255個位元組。

   Binding key必須要有相同的形式。Topic交換區和direct有些類似——它們都通過一個特定的routing key傳遞給多個binding key相匹配的佇列。對於binding key有兩點非常重要:

*可以代替一個單詞

#可以代替

0個或多個單詞

下圖是一個最簡單的例子

在這個例子中,我們要傳送的訊息全部是用來描述動物的。該訊息將會被路由到一個包含三個單詞的routing key(兩個點分隔)Routing key中的第一個單詞表示速度,第二個表示顏色,第三個種類:“<speed>.<color>.<species>”。

我們建立了三個bindingQ1通過“*.orange.*”繫結,Q2通過“*.*.rabbit”和“lazy.#”繫結。

這三種繫結可以描述為:

Q1對所有黃顏色的動物感興趣

Q2對所有的兔子和比較懶的動物感興趣

通過使用routing keyquick.orange.rabbit

”的訊息,將會被髮送到Q1Q2兩個佇列上,使用“lazy.orange.elephant”的訊息也將被髮送到兩個佇列上。但是,“quick.orange.fox”只會被髮送到Q1佇列上,“lazy.brown.fox”只會傳送到Q2佇列上。雖然“lazy.pink.rabbit”匹配了兩個binding,但是它只會被髮送到Q2上一次。"quick.brown.fox"沒有匹配任何binding,它將被丟棄。

如果我們沒有按照約定來發送訊息(比如“orange”,“quick.orange.male.rabbit),那將會怎麼樣呢?這些訊息由於沒有匹配到任何的binding,所以他們將會被丟棄。雖然“lazy.orange.male.rabbit”有四個單詞,單是匹配到了最後一個binding,所以會發送給Q2

Topic交換區功能非常強大,他可以實現其他交換區的功能。

當一個佇列通過binding key #”進行繫結,它會接受所有的訊息,忽略routing key。類似fanout交換區

當特殊字元“*”和“#”沒有在binding中使用,topic交換區就和direct交換區類似。

2Putting it all together

public class EmitLogTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv)

                  throws Exception {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("localhost");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String routingKey = getRouting(argv);

        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());

        System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

        connection.close();

    }

    //...

}

public class ReceiveLogsTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv)

                  throws Exception {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("localhost");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1){

            System.err.println("Usage: ReceiveLogsTopic [binding_key]...");

            System.exit(1);

        }

        for(String bindingKey : argv){

            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);

        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);

        channel.basicConsume(queueName, true, consumer);

        while (true) {

            QueueingConsumer.Delivery delivery = consumer.nextDelivery();

            String message = new String(delivery.getBody());

            String routingKey = delivery.getEnvelope().getRoutingKey();

            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");

        }

    }

}