1. 程式人生 > >4.RabbitMQ Topic/Publish

4.RabbitMQ Topic/Publish

注意 消費者需要先啟動,不然會有訊息黑洞,即消失丟失現象。

package com.study.soufang.rabbit.a001.topicAndPublish;

public class ConstantOfTopic {

    public static final String QUEUE_NAME_A = "log_queue_a";          public static final String QUEUE_NAME_B = "log_queue_b";          public static final String EXCHANGE_NAME="logs";          public static final String EXCHANGE_TYPE_FANOUT="fanout"; }

 

package com.study.soufang.rabbit.a001.topicAndPublish;

import java.io.IOException; import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; import com.study.soufang.rabbit.a001.RabbitChannelUtil;

import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(chain = true) public class MyConsumer extends DefaultConsumer {

         public MyConsumer(Channel channel) {         super(channel);     }

    @Override     public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)             throws IOException {         try {             String message = new String(body, "UTF-8");             System.out.println(consumerTag+"--"+Thread.currentThread().getName()+" [x] Received '" + message + "'");         } finally {             /**              * deliveryTag:該訊息的index                 multiple:是否批量.true:將一次性ack所有小於deliveryTag的訊息。              */             getChannel().basicAck(envelope.getDeliveryTag(), false);             /*try {                 RabbitChannelUtil.closeChannel(getChannel());             } catch (TimeoutException e) {                 // TODO Auto-generated catch block                 e.printStackTrace();             }*/         }     }

}

package com.study.soufang.rabbit.a001.topicAndPublish;

import java.io.IOException; import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.study.soufang.rabbit.a001.RabbitChannelUtil;

public class Recv {

    public static void doRecv(String queue) throws IOException, TimeoutException{         Channel channel = null;         try {             channel = RabbitChannelUtil.createChannel();             //宣告交換機             channel.exchangeDeclare(ConstantOfTopic.EXCHANGE_NAME, ConstantOfTopic.EXCHANGE_TYPE_FANOUT);             //宣告佇列             channel.queueDeclare(queue, false, false, false, null);             //繫結佇列到交換機             channel.queueBind(queue, ConstantOfTopic.EXCHANGE_NAME, "");             Consumer consumer = new MyConsumer(channel);             boolean autoAck = false;             channel.basicConsume(queue, autoAck, consumer);             channel.basicQos(1);         } catch (IOException | TimeoutException e) {             e.printStackTrace();         }     }          public static void main(String[] args) throws IOException, TimeoutException {         new Thread(new Runnable() {             @Override             public void run() {                 try {                     doRecv(ConstantOfTopic.QUEUE_NAME_A);                 } catch (IOException e) {                     // TODO Auto-generated catch block                     e.printStackTrace();                 } catch (TimeoutException e) {                     // TODO Auto-generated catch block                     e.printStackTrace();                 }             }         }).start();         new Thread(new Runnable() {             @Override             public void run() {                 try {                     doRecv(ConstantOfTopic.QUEUE_NAME_B);                 } catch (IOException e) {                     // TODO Auto-generated catch block                     e.printStackTrace();                 } catch (TimeoutException e) {                     // TODO Auto-generated catch block                     e.printStackTrace();                 }             }         }).start();     }      }  

package com.study.soufang.rabbit.a001.topicAndPublish;

import java.io.IOException; import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.study.soufang.rabbit.a001.RabbitChannelUtil;

public class Send {

    public static void main(String[] args) throws IOException, TimeoutException {         Channel channel = null;         try {             channel = RabbitChannelUtil.createChannel();             //宣告交換機             channel.exchangeDeclare(ConstantOfTopic.EXCHANGE_NAME, ConstantOfTopic.EXCHANGE_TYPE_FANOUT);             //傳送訊息,注意:需要消費者先啟動並宣告佇列及繫結到此交換機,不然訊息會丟失             for(int i=0;i<100;i++){                 String message = "log---"+i;                 channel.basicPublish(ConstantOfTopic.EXCHANGE_NAME, "", null, message.getBytes());;                 Thread.sleep(100);             }         } catch (IOException | TimeoutException | InterruptedException e) {             e.printStackTrace();         }finally{             RabbitChannelUtil.closeChannel(channel);         }              } }