rabbitmq-direct路由訂閱模型
阿新 • • 發佈:2021-06-10
生產者:
package com.gavin.mq.direct; import com.gavin.utils.RabbitMQUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; import org.junit.Test; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 路由訂閱模型routing-direct模式: */ public class WorkProvider { @Test public void testSendMessage() throws IOException, TimeoutException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); //宣告交換機 引數1:交換機名稱 channel.exchangeDeclare("logs_direct","direct"); //routingKey為info channel.basicPublish("logs_direct","info", MessageProperties.PERSISTENT_BASIC,("hello direct").getBytes()); RabbitMQUtils.close(channel,connection); } }
消費者1:
package com.gavin.mq.direct; import com.gavin.utils.RabbitMQUtils; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class WorkConsumer { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs_direct","direct"); String tempQueue = channel.queueDeclare().getQueue(); channel.queueBind(tempQueue,"logs_direct","error"); channel.queueBind(tempQueue,"logs_direct","warning"); channel.basicConsume(tempQueue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); } }); } }
消費者2:
package com.gavin.mq.direct; import com.gavin.utils.RabbitMQUtils; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class WorkConsumer2 { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs_direct","direct"); String tempQueue = channel.queueDeclare().getQueue(); channel.queueBind(tempQueue,"logs_direct","info"); channel.basicConsume(tempQueue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumer2:"+new String(body)); } }); } }