萬用字元模式
阿新 • • 發佈:2022-03-17
目錄
1、說明
生產者P傳送訊息到交換機X,type=topic,交換機根據繫結佇列的routing key的值進行萬用字元匹配;符號#:匹配一個或者多個詞lazy.# 可以匹配lazy.irs或者lazy.irs.cor
符號:只能匹配一個詞lazy. 可以匹配lazy.irs或者lazy.cor
2、模型
3、示例程式碼
3.1、生成者
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.Objects; import java.util.concurrent.TimeoutException; import static java.nio.charset.StandardCharsets.UTF_8; /** * 釋出訂閱模式 */ public class Publisher { private static final String MQ_VIRTUAL_HOST = "VHOST::TUTORIAL"; private static final String MQ_EXCHANGE_NAME = "EXCHANGE::TUTORIAL::TOPIC"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { final ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("romens"); factory.setVirtualHost(MQ_VIRTUAL_HOST); connection = factory.newConnection(); channel = connection.createChannel(); channel.basicPublish(MQ_EXCHANGE_NAME, "b2b.stock", null, "Hello Kitty".getBytes(UTF_8)); } catch (IOException | TimeoutException e) { e.printStackTrace(); } finally { if (Objects.nonNull(channel)) { try { channel.close(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } if (Objects.nonNull(connection)) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
3.2、消費者
3.2.1、消費者A
import com.rabbitmq.client.*; import java.io.IOException; import java.util.Objects; import java.util.concurrent.TimeoutException; import static java.nio.charset.StandardCharsets.UTF_8; public class ConsumerA { private static final String MQ_VIRTUAL_HOST = "VHOST::TUTORIAL"; private static final String MQ_EXCHANGE_NAME = "EXCHANGE::TUTORIAL::TOPIC"; private static final String MQ_QUEUE_NAME = "QUEUE::TUTORIAL::A"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { final ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("romens"); factory.setVirtualHost(MQ_VIRTUAL_HOST); connection = factory.newConnection(); channel = connection.createChannel(); channel.queueBind(MQ_QUEUE_NAME, MQ_EXCHANGE_NAME, "*.stock.*"); System.out.println("[*] Waiting for messages. To exits press CTRL+C"); final DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { System.out.println(" [X] Received '" + new String(body, UTF_8) + "'"); } }; channel.basicConsume(MQ_QUEUE_NAME, true, consumer); while (true) { synchronized (consumer) { try { consumer.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } catch (IOException | TimeoutException e) { e.printStackTrace(); } finally { if (Objects.nonNull(channel)) { try { channel.close(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } if (Objects.nonNull(connection)) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
3.2.2、消費者B
import com.rabbitmq.client.*; import java.io.IOException; import java.util.Objects; import java.util.concurrent.TimeoutException; import static java.nio.charset.StandardCharsets.UTF_8; public class ConsumerB { private static final String MQ_VIRTUAL_HOST = "VHOST::TUTORIAL"; private static final String MQ_EXCHANGE_NAME = "EXCHANGE::TUTORIAL::TOPIC"; private static final String MQ_QUEUE_NAME = "QUEUE::TUTORIAL::B"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { final ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("romens"); factory.setVirtualHost(MQ_VIRTUAL_HOST); connection = factory.newConnection(); channel = connection.createChannel(); channel.queueBind(MQ_QUEUE_NAME, MQ_EXCHANGE_NAME, "*.order.*"); System.out.println("[*] Waiting for messages. To exits press CTRL+C"); final DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { System.out.println(" [X] Received '" + new String(body, UTF_8) + "'"); } }; channel.basicConsume(MQ_QUEUE_NAME, true, consumer); while (true) { synchronized (consumer) { try { consumer.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } catch (IOException | TimeoutException e) { e.printStackTrace(); } finally { if (Objects.nonNull(channel)) { try { channel.close(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } if (Objects.nonNull(connection)) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }