1. 程式人生 > >2.RabbitMQ HelloWorld

2.RabbitMQ HelloWorld

在RabbitMQ的安裝中,新建的使用者admin是沒有許可權使用預設的虛擬主機“/”的,需要新建一個虛擬主機,如圖:

建立一個helloworld的虛擬主機,如圖:

程式碼部分:

1.通道的通用工具類

package com.study.soufang.rabbit.a001;

import java.io.IOException; import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;

//獲取通道的通用工具類

public class RabbitChannelUtil {

    private final static String RABBIT_HOST= "192.168.10.22";     private final static int RABBIT_PORT=5672;          public static Channel createChannel() throws IOException, TimeoutException{         ConnectionFactory factory = new ConnectionFactory();         factory.setHost(RABBIT_HOST);         factory.setPort(RABBIT_PORT);         factory.setUsername("admin");         factory.setPassword("admin");         factory.setVirtualHost("helloworld");//剛剛建立的虛擬主機         Connection connection = null;         Channel channel = null;         connection = factory.newConnection();//連線         channel = connection.createChannel();//通道         return channel;     }     /**      * 關閉連線後不會再監聽      * @param channel      * @throws IOException      * @throws TimeoutException      */     public static void closeChannel(Channel channel) throws IOException, TimeoutException{         Connection connection = channel.getConnection();         if(null != channel){             channel.close();         }         if(null != connection){             connection.close();         }     }      }

2.常量類

package com.study.soufang.rabbit.a001.helloworld;

public class ConstantOfHelloWorld {

    public static final String QUEUE_NAME = "hello";      }

 3.訊息傳送

package com.study.soufang.rabbit.a001.helloworld;

import java.io.IOException; import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.study.soufang.rabbit.a001.RabbitChannelUtil;

public class Send {

    public static void main(String[] args) throws IOException, TimeoutException {         Channel channel = null;         try {             //取得通道             channel = RabbitChannelUtil.createChannel();             /**              * 宣告訊息要傳送到的佇列              */             channel.queueDeclare(ConstantOfHelloWorld.QUEUE_NAME, false, false, false, null);             String message = "helloworld";             /**              * 傳送訊息              */             channel.basicPublish("", ConstantOfHelloWorld.QUEUE_NAME, null, message.getBytes());         } catch (IOException | TimeoutException e) {             e.printStackTrace();         }finally{             RabbitChannelUtil.closeChannel(channel);         }              } }

訊息成功傳送到佇列hello

4.消費者

package com.study.soufang.rabbit.a001.helloworld;

import java.io.IOException;

import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties;

public class MyConsumer extends DefaultConsumer {

    public MyConsumer(Channel channel) {         super(channel);     }

    @Override     public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)             throws IOException {         String message = new String(body, "UTF-8");         System.out.println(" [x] Received '" + message + "'");     }      }  

5.訊息消費

package com.study.soufang.rabbit.a001.helloworld;

import java.io.IOException; import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.study.soufang.rabbit.a001.RabbitChannelUtil;

public class Recv {

    public static void main(String[] args) throws IOException, TimeoutException {         Channel channel = null;         try {             channel = RabbitChannelUtil.createChannel();             channel.queueDeclare(ConstantOfHelloWorld.QUEUE_NAME, false, false, false, null);             Consumer consumer = new MyConsumer(channel);             /**              * autoAck:是否自動ack,如果不自動ack,需要使用channel.ack、channel.nack、channel.basicReject 進行訊息應答              */             String result = channel.basicConsume(ConstantOfHelloWorld.QUEUE_NAME, true, consumer);             System.out.println(result);         } catch (IOException | TimeoutException e) {             e.printStackTrace();         }finally {             RabbitChannelUtil.closeChannel(channel);         }     }      }