1. 程式人生 > 程式設計 >RabbitMQ 最常用的三大模式例項解析

RabbitMQ 最常用的三大模式例項解析

這篇文章主要介紹了RabbitMQ 最常用的三大模式例項解析,文中通過示例程式碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下

Direct 模式

  • 所有傳送到 Direct Exchange 的訊息被轉發到 RouteKey 中指定的 Queue。
  • Direct 模式可以使用 RabbitMQ 自帶的 Exchange: default Exchange,所以不需要將 Exchange 進行任何繫結(binding)操作。
  • 訊息傳遞時,RouteKey 必須完全匹配才會被佇列接收,否則該訊息會被拋棄,

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class DirectProducer {
  public static void main(String[] args) throws Exception {
    //1. 建立一個 ConnectionFactory 並進行設定
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setVirtualHost("/");
    factory.setUsername("guest");
    factory.setPassword("guest");

    //2. 通過連線工廠來建立連線
    Connection connection = factory.newConnection();

    //3. 通過 Connection 來建立 Channel
    Channel channel = connection.createChannel();

    //4. 宣告
    String exchangeName = "test_direct_exchange";
    String routingKey = "item.direct";

    //5. 傳送
    String msg = "this is direct msg";
    channel.basicPublish(exchangeName,routingKey,null,msg.getBytes());
    System.out.println("Send message : " + msg);

    //6. 關閉連線
    channel.close();
    connection.close();
  }
}
import com.rabbitmq.client.*;
import java.io.IOException;

public class DirectConsumer {

  public static void main(String[] args) throws Exception {
    //1. 建立一個 ConnectionFactory 並進行設定
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setVirtualHost("/");
    factory.setUsername("guest");
    factory.setPassword("guest");
    factory.setAutomaticRecoveryEnabled(true);
    factory.setNetworkRecoveryInterval(3000);
   
    //2. 通過連線工廠來建立連線
    Connection connection = factory.newConnection();

    //3. 通過 Connection 來建立 Channel
    Channel channel = connection.createChannel();

    //4. 宣告
    String exchangeName = "test_direct_exchange";
    String queueName = "test_direct_queue";
    String routingKey = "item.direct";
    channel.exchangeDeclare(exchangeName,"direct",true,false,null);
    channel.queueDeclare(queueName,null);

    //一般不用程式碼繫結,在管理介面手動繫結
    channel.queueBind(queueName,exchangeName,routingKey);

    //5. 建立消費者並接收訊息
    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)
          throws IOException {
        String message = new String(body,"UTF-8");
        System.out.println(" [x] Received '" + message + "'");
      }
    };

    //6. 設定 Channel 消費者繫結佇列
    channel.basicConsume(queueName,consumer);

  }
}
 Send message : this is direct msg
 
 [x] Received 'this is direct msg'

Topic 模式

可以使用萬用字元進行模糊匹配

  • 符號'#" 匹配一個或多個詞
  • 符號"*”匹配不多不少一個詞

例如

  • 'log.#"能夠匹配到'log.info.oa"
  • "log.*"只會匹配到"log.erro“

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class TopicProducer {

  public static void main(String[] args) throws Exception {
    //1. 建立一個 ConnectionFactory 並進行設定
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setVirtualHost("/");
    factory.setUsername("guest");
    factory.setPassword("guest");

    //2. 通過連線工廠來建立連線
    Connection connection = factory.newConnection();

    //3. 通過 Connection 來建立 Channel
    Channel channel = connection.createChannel();

    //4. 宣告
    String exchangeName = "test_topic_exchange";
    String routingKey1 = "item.update";
    String routingKey2 = "item.delete";
    String routingKey3 = "user.add";

    //5. 傳送
    String msg = "this is topic msg";
    channel.basicPublish(exchangeName,routingKey1,msg.getBytes());
    channel.basicPublish(exchangeName,routingKey2,routingKey3,msg.getBytes());
    System.out.println("Send message : " + msg);

    //6. 關閉連線
    channel.close();
    connection.close();
  }
}
import com.rabbitmq.client.*;
import java.io.IOException;

public class TopicConsumer {

  public static void main(String[] args) throws Exception {
    //1. 建立一個 ConnectionFactory 並進行設定
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setVirtualHost("/");
    factory.setUsername("guest");
    factory.setPassword("guest");
    factory.setAutomaticRecoveryEnabled(true);
    factory.setNetworkRecoveryInterval(3000);

    //2. 通過連線工廠來建立連線
    Connection connection = factory.newConnection();

    //3. 通過 Connection 來建立 Channel
    Channel channel = connection.createChannel();

    //4. 宣告
    String exchangeName = "test_topic_exchange";
    String queueName = "test_topic_queue";
    String routingKey = "item.#";
    channel.exchangeDeclare(exchangeName,"topic","UTF-8");
        System.out.println(" [x] Received '" + message + "'");
      }
    };
    //6. 設定 Channel 消費者繫結佇列
    channel.basicConsume(queueName,consumer);

  }
}
Send message : this is topc msg

[x] Received 'this is topc msg'
[x] Received 'this is topc msg'

Fanout 模式

不處理路由鍵,只需要簡單的將佇列繫結到交換機上傳送到交換機的訊息都會被轉發到與該交換機繫結的所有佇列上。
Fanout交換機轉發訊息是最快的。

import com.rabbitmq.client.*;
import java.io.IOException;

public class FanoutConsumer {
  public static void main(String[] args) throws Exception {
    //1. 建立一個 ConnectionFactory 並進行設定
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setVirtualHost("/");
    factory.setUsername("guest");
    factory.setPassword("guest");
    factory.setAutomaticRecoveryEnabled(true);
    factory.setNetworkRecoveryInterval(3000);

    //2. 通過連線工廠來建立連線
    Connection connection = factory.newConnection();

    //3. 通過 Connection 來建立 Channel
    Channel channel = connection.createChannel();

    //4. 宣告
    String exchangeName = "test_fanout_exchange";
    String queueName = "test_fanout_queue";
    String routingKey = "item.#";
    channel.exchangeDeclare(exchangeName,"fanout",consumer);
  }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class FanoutProducer {

  public static void main(String[] args) throws Exception {
    //1. 建立一個 ConnectionFactory 並進行設定
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setVirtualHost("/");
    factory.setUsername("guest");
    factory.setPassword("guest");

    //2. 通過連線工廠來建立連線
    Connection connection = factory.newConnection();

    //3. 通過 Connection 來建立 Channel
    Channel channel = connection.createChannel();

    //4. 宣告
    String exchangeName = "test_fanout_exchange";
    String routingKey1 = "item.update";
    String routingKey2 = "";
    String routingKey3 = "ookjkjjkhjhk";//任意routingkey

    //5. 傳送
    String msg = "this is fanout msg";
    channel.basicPublish(exchangeName,msg.getBytes());
    System.out.println("Send message : " + msg);

    //6. 關閉連線
    channel.close();
    connection.close();
  }
}
Send message : this is fanout msg

[x] Received 'this is fanout msg'
[x] Received 'this is fanout msg'
[x] Received 'this is fanout msg'

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。