1. 程式人生 > >7、RabbitMQ-主題模式

7、RabbitMQ-主題模式

消費者 fin lee wid icc pic fault bool 分享

1、模式圖

技術分享圖片

發送到主題交換的消息不能具有任意的 routing_key - 它必須是由點分隔的單詞列表。 單詞可以是任何內容,但通常它們指定與消息相關的一些功能。一些有效的路由鍵示例:“ stock.usd.nyse ”,“ nyse.vmw”,“ quick.orange.rabbit ”。路由密鑰中可以包 含任意數量的單詞,最多可達255個字節。 綁定密鑰也必須采用相同的形式。主題交換背後的邏輯 類似於直接交換- 使用特定路 由密鑰發送的消息將被傳遞到與匹配綁定密鑰綁定的所有隊列。但是綁定鍵有兩個重
要的特殊情況: *(星號)可以替代一個單詞。 #(hash)可以替換零個或多個單詞。

類型是topic

2、實踐

生產者

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.util.ConnectionUtils;

public class Send {
    
    
private static final String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] args) throws IOException, TimeoutException { Connection conn = ConnectionUtils.getConnection(); Channel channel = conn.createChannel(); //exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String msg = "商品....."; //綁定路由 String routingKey = "goods.add"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); channel.close(); conn.close(); } }

消費者1:

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.util.ConnectionUtils;

public class Receive {
    
    private static final String QUEUE_NAME="test_topic1";
    private static final String EXCHANGE_NAME = "exchange_topic";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        
        Connection conn = ConnectionUtils.getConnection();
        
        Channel channel = conn.createChannel();
        
        //隊列聲明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        channel.basicQos(1);
        
        //綁定隊列到交換機轉發器
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add");

                //定義一個消費者
                Consumer consumer = new DefaultConsumer(channel){
                    //收到消息就會觸發這個方法
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                            throws IOException {
                        String msg = new String(body,"utf-8");
                        System.out.println("消費者1接收到的消息" + msg);
                        
                        try {
                            Thread.sleep(1500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }finally{
                            System.out.println("消費者1處理完成!");
                            //手動回執
                            channel.basicAck(envelope.getDeliveryTag(), false);
                        }
                    }
                };
                //監聽隊列
                //自動應答false
                boolean autoAck = false;
                channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}

消費者2:

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.util.ConnectionUtils;

public class Receive2 {
    
    private static final String QUEUE_NAME="test_topic2";
    private static final String EXCHANGE_NAME = "exchange_topic";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection conn = ConnectionUtils.getConnection();
        Channel channel = conn.createChannel();
        
        //隊列聲明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);
        
        //綁定隊列到交換機轉發器
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");
        
                //定義一個消費者
                Consumer consumer = new DefaultConsumer(channel){
                    //收到消息就會觸發這個方法
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                            throws IOException {
                        String msg = new String(body,"utf-8");
                        System.out.println("消費者2接收到的消息" + msg);
                        
                        try {
                            Thread.sleep(1500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }finally{
                            System.out.println("消費者2處理完成!");
                            //手動回執
                            channel.basicAck(envelope.getDeliveryTag(), false);
                        }
                    }
                };
                //監聽隊列
                //自動應答false
                boolean autoAck = false;
                channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}

此時如果生產者的String routingKey = "goods.add"; 此時2個消費者都可以收到消息 若:String routingKey = "goods.del"; 此時只有消費者2收到消息

技術分享圖片

7、RabbitMQ-主題模式