7、RabbitMQ-主題模式
阿新 • • 發佈:2019-03-14
消費者 fin lee wid icc pic fault bool 分享
要的特殊情況:
*(星號)可以替代一個單詞。
#(hash)可以替換零個或多個單詞。
1、模式圖
發送到主題交換的消息不能具有任意的 routing_key - 它必須是由點分隔的單詞列表。 單詞可以是任何內容,但通常它們指定與消息相關的一些功能。一些有效的路由鍵示例:“ stock.usd.nyse ”,“ nyse.vmw”,“ quick.orange.rabbit ”。路由密鑰中可以包 含任意數量的單詞,最多可達255個字節。 綁定密鑰也必須采用相同的形式。主題交換背後的邏輯 類似於直接交換- 使用特定路 由密鑰發送的消息將被傳遞到與匹配綁定密鑰綁定的所有隊列。但是綁定鍵有兩個重
類型是topic
2、實踐
生產者
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.util.ConnectionUtils; public class Send {private static final String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] args) throws IOException, TimeoutException { Connection conn = ConnectionUtils.getConnection(); Channel channel = conn.createChannel(); //exchangechannel.exchangeDeclare(EXCHANGE_NAME, "topic"); String msg = "商品....."; //綁定路由 String routingKey = "goods.add"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); channel.close(); conn.close(); } }
消費者1:
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.util.ConnectionUtils; public class Receive { private static final String QUEUE_NAME="test_topic1"; private static final String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] args) throws IOException, TimeoutException { Connection conn = ConnectionUtils.getConnection(); Channel channel = conn.createChannel(); //隊列聲明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); //綁定隊列到交換機轉發器 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add"); //定義一個消費者 Consumer consumer = new DefaultConsumer(channel){ //收到消息就會觸發這個方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); System.out.println("消費者1接收到的消息" + msg); try { Thread.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); }finally{ System.out.println("消費者1處理完成!"); //手動回執 channel.basicAck(envelope.getDeliveryTag(), false); } } }; //監聽隊列 //自動應答false boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
消費者2:
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.util.ConnectionUtils; public class Receive2 { private static final String QUEUE_NAME="test_topic2"; private static final String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] args) throws IOException, TimeoutException { Connection conn = ConnectionUtils.getConnection(); Channel channel = conn.createChannel(); //隊列聲明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); //綁定隊列到交換機轉發器 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#"); //定義一個消費者 Consumer consumer = new DefaultConsumer(channel){ //收到消息就會觸發這個方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); System.out.println("消費者2接收到的消息" + msg); try { Thread.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); }finally{ System.out.println("消費者2處理完成!"); //手動回執 channel.basicAck(envelope.getDeliveryTag(), false); } } }; //監聽隊列 //自動應答false boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
此時如果生產者的String routingKey = "goods.add"; 此時2個消費者都可以收到消息 若:String routingKey = "goods.del"; 此時只有消費者2收到消息
7、RabbitMQ-主題模式