1. 程式人生 > 其它 >rabbitmq工作模式---路由模式

rabbitmq工作模式---路由模式

模式說明:

  • 佇列與交換機的繫結,不能是任意綁定了,而是要指定一個RoutingKey(路由key)
  • 訊息的傳送方在向Exchange傳送訊息時,也必須指定訊息的RoutingKey
  • Exchange不再把訊息交給每一個繫結的佇列,而是根據訊息的RoutingKey進行判斷,只有佇列的Routingkey與訊息的Routingkey完全一致,才會接收到訊息

生產者示例程式碼

package com.rabbitmq.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 傳送訊息
 */
public class Producer_Routing {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立連線工廠
        ConnectionFactory factory = new ConnectionFactory();
        //2. 設定引數
        factory.setHost("172.16.98.133");//ip  預設值 localhost
        factory.setPort(5672); //埠  預設值 5672
        factory.setVirtualHost("/bgt");//虛擬機器 預設值/
        factory.setUsername("testuser");//使用者名稱 預設 guest
        factory.setPassword("111111");//密碼 預設值 guest
        //3. 建立連線 Connection
        Connection connection = factory.newConnection();
        //4. 建立Channel
        Channel channel = connection.createChannel();
       /*
       exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
       引數:
        1. exchange:交換機名稱
        2. type:交換機型別
            DIRECT("direct"),:定向
            FANOUT("fanout"),:扇形(廣播),傳送訊息到每一個與之繫結佇列。
            TOPIC("topic"),萬用字元的方式
            HEADERS("headers");引數匹配
        3. durable:是否持久化
        4. autoDelete:自動刪除
        5. internal:內部使用。 一般false
        6. arguments:引數
        */
       String exchangeName = "test_direct";
        //5. 建立交換機
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
        //6. 建立佇列
        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //7. 繫結佇列和交換機
        /*
        queueBind(String queue, String exchange, String routingKey)
        引數:
            1. queue:佇列名稱
            2. exchange:交換機名稱
            3. routingKey:路由鍵,繫結規則
                如果交換機的型別為fanout ,routingKey設定為""
         */
        //佇列1繫結 error
        channel.queueBind(queue1Name,exchangeName,"error");
        //佇列2繫結 info  error  warning
        channel.queueBind(queue2Name,exchangeName,"info");
        channel.queueBind(queue2Name,exchangeName,"error");
        channel.queueBind(queue2Name,exchangeName,"warning");
        String body = "日誌資訊:張三呼叫了delete方法...出錯誤了。。。日誌級別:error...";
        //8. 傳送訊息
        channel.basicPublish(exchangeName,"error",null,body.getBytes());
        //9. 釋放資源
        channel.close();
        connection.close();
    }
}

消費者示例程式碼

消費者監聽佇列queue2Name,因此能接收到訊息.

package com.rabbitmq.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_Routing1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立連線工廠
        ConnectionFactory factory = new ConnectionFactory();
        //2. 設定引數
        factory.setHost("172.16.98.133");//ip  預設值 localhost
        factory.setPort(5672); //埠  預設值 5672
        factory.setVirtualHost("/bgt");//虛擬機器 預設值/
        factory.setUsername("testuser");//使用者名稱 預設 guest
        factory.setPassword("111111");//密碼 預設值 guest
        //3. 建立連線 Connection
        Connection connection = factory.newConnection();
        //4. 建立Channel
        Channel channel = connection.createChannel();
        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";
        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        引數:
            1. queue:佇列名稱
            2. autoAck:是否自動確認
            3. callback:回撥物件

         */
        // 接收訊息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回撥方法,當收到訊息後,會自動執行該方法

                1. consumerTag:標識
                2. envelope:獲取一些資訊,交換機,路由key...
                3. properties:配置資訊
                4. body:資料

             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body:"+new String(body));
                System.out.println("將日誌資訊列印到控制檯.....");
            }
        };
        //監聽佇列2
        channel.basicConsume(queue2Name,true,consumer);
    }
}

本文來自部落格園,作者:bgtong,轉載請註明原文連結:https://www.cnblogs.com/bgtong/p/15957467.html