2.RabbitMQ HelloWorld
在RabbitMQ的安裝中,新建的使用者admin是沒有許可權使用預設的虛擬主機“/”的,需要新建一個虛擬主機,如圖:
建立一個helloworld的虛擬主機,如圖:
程式碼部分:
1.通道的通用工具類
package com.study.soufang.rabbit.a001;
import java.io.IOException; import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
//獲取通道的通用工具類
public class RabbitChannelUtil {
private final static String RABBIT_HOST= "192.168.10.22"; private final static int RABBIT_PORT=5672; public static Channel createChannel() throws IOException, TimeoutException{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost(RABBIT_HOST); factory.setPort(RABBIT_PORT); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("helloworld");//剛剛建立的虛擬主機 Connection connection = null; Channel channel = null; connection = factory.newConnection();//連線 channel = connection.createChannel();//通道 return channel; } /** * 關閉連線後不會再監聽 * @param channel * @throws IOException * @throws TimeoutException */ public static void closeChannel(Channel channel) throws IOException, TimeoutException{ Connection connection = channel.getConnection(); if(null != channel){ channel.close(); } if(null != connection){ connection.close(); } } }
2.常量類
package com.study.soufang.rabbit.a001.helloworld;
public class ConstantOfHelloWorld {
public static final String QUEUE_NAME = "hello"; }
3.訊息傳送
package com.study.soufang.rabbit.a001.helloworld;
import java.io.IOException; import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.study.soufang.rabbit.a001.RabbitChannelUtil;
public class Send {
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = null; try { //取得通道 channel = RabbitChannelUtil.createChannel(); /** * 宣告訊息要傳送到的佇列 */ channel.queueDeclare(ConstantOfHelloWorld.QUEUE_NAME, false, false, false, null); String message = "helloworld"; /** * 傳送訊息 */ channel.basicPublish("", ConstantOfHelloWorld.QUEUE_NAME, null, message.getBytes()); } catch (IOException | TimeoutException e) { e.printStackTrace(); }finally{ RabbitChannelUtil.closeChannel(channel); } } }
訊息成功傳送到佇列hello
4.消費者
package com.study.soufang.rabbit.a001.helloworld;
import java.io.IOException;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties;
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) { super(channel); }
@Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }
5.訊息消費
package com.study.soufang.rabbit.a001.helloworld;
import java.io.IOException; import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.study.soufang.rabbit.a001.RabbitChannelUtil;
public class Recv {
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = null; try { channel = RabbitChannelUtil.createChannel(); channel.queueDeclare(ConstantOfHelloWorld.QUEUE_NAME, false, false, false, null); Consumer consumer = new MyConsumer(channel); /** * autoAck:是否自動ack,如果不自動ack,需要使用channel.ack、channel.nack、channel.basicReject 進行訊息應答 */ String result = channel.basicConsume(ConstantOfHelloWorld.QUEUE_NAME, true, consumer); System.out.println(result); } catch (IOException | TimeoutException e) { e.printStackTrace(); }finally { RabbitChannelUtil.closeChannel(channel); } } }