4.RabbitMQ Topic/Publish
注意 消費者需要先啟動,不然會有訊息黑洞,即消失丟失現象。
package com.study.soufang.rabbit.a001.topicAndPublish;
public class ConstantOfTopic {
public static final String QUEUE_NAME_A = "log_queue_a"; public static final String QUEUE_NAME_B = "log_queue_b"; public static final String EXCHANGE_NAME="logs"; public static final String EXCHANGE_TYPE_FANOUT="fanout"; }
package com.study.soufang.rabbit.a001.topicAndPublish;
import java.io.IOException; import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; import com.study.soufang.rabbit.a001.RabbitChannelUtil;
import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(chain = true) public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) { super(channel); }
@Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { try { String message = new String(body, "UTF-8"); System.out.println(consumerTag+"--"+Thread.currentThread().getName()+" [x] Received '" + message + "'"); } finally { /** * deliveryTag:該訊息的index multiple:是否批量.true:將一次性ack所有小於deliveryTag的訊息。 */ getChannel().basicAck(envelope.getDeliveryTag(), false); /*try { RabbitChannelUtil.closeChannel(getChannel()); } catch (TimeoutException e) { // TODO Auto-generated catch block e.printStackTrace(); }*/ } }
}
package com.study.soufang.rabbit.a001.topicAndPublish;
import java.io.IOException; import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.study.soufang.rabbit.a001.RabbitChannelUtil;
public class Recv {
public static void doRecv(String queue) throws IOException, TimeoutException{ Channel channel = null; try { channel = RabbitChannelUtil.createChannel(); //宣告交換機 channel.exchangeDeclare(ConstantOfTopic.EXCHANGE_NAME, ConstantOfTopic.EXCHANGE_TYPE_FANOUT); //宣告佇列 channel.queueDeclare(queue, false, false, false, null); //繫結佇列到交換機 channel.queueBind(queue, ConstantOfTopic.EXCHANGE_NAME, ""); Consumer consumer = new MyConsumer(channel); boolean autoAck = false; channel.basicConsume(queue, autoAck, consumer); channel.basicQos(1); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } public static void main(String[] args) throws IOException, TimeoutException { new Thread(new Runnable() { @Override public void run() { try { doRecv(ConstantOfTopic.QUEUE_NAME_A); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (TimeoutException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { doRecv(ConstantOfTopic.QUEUE_NAME_B); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (TimeoutException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }).start(); } }
package com.study.soufang.rabbit.a001.topicAndPublish;
import java.io.IOException; import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.study.soufang.rabbit.a001.RabbitChannelUtil;
public class Send {
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = null; try { channel = RabbitChannelUtil.createChannel(); //宣告交換機 channel.exchangeDeclare(ConstantOfTopic.EXCHANGE_NAME, ConstantOfTopic.EXCHANGE_TYPE_FANOUT); //傳送訊息,注意:需要消費者先啟動並宣告佇列及繫結到此交換機,不然訊息會丟失 for(int i=0;i<100;i++){ String message = "log---"+i; channel.basicPublish(ConstantOfTopic.EXCHANGE_NAME, "", null, message.getBytes());; Thread.sleep(100); } } catch (IOException | TimeoutException | InterruptedException e) { e.printStackTrace(); }finally{ RabbitChannelUtil.closeChannel(channel); } } }