三、RabbitMq學習筆記
阿新 • • 發佈:2018-12-15
RabbitMQ原生API三種交換模式
1. Hello World
在這裡沒有宣告交換機(exchange),也沒有宣告繫結(bind),RabbitMQ會使用預設的交換機(AMQP default)路由鍵就是佇列名稱
【生產者】
/**
* 消費者
*
* @author ITCloud
*/
public class Consumer {
public static void main(String[] args) throws Exception {
// 1.建立連線
ConnectionFactory connectionFactory = new ConnectionFactory ();
connectionFactory.setHost("192.168.186.130");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
// 2.獲取連線,建立channel
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 3. 佇列宣告
String queueName = "hello.world";
/**
* (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
* 引數說明:
* queue:佇列名稱
* duable:佇列是否是持久化的,rabbitmq重啟之後,佇列依然存在
* exclusive:獨佔佇列,只對當前連線有效,一般都會設定成非獨佔佇列false
* autoDelete:佇列是否自動刪除
* arguments:一些引數,後續介紹
*/
channel.queueDeclare(queueName, true, false, false, null);
// 4.建立一個簡單的消費者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
//處理接收的訊息
/**
* body:接收訊息體
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
System.out.println("消費端:" + new String(body));
}
};
//設定非同步接收訊息,當消費端啟動後,將一直會監聽消費端
channel.basicConsume(queueName, true, consumer);
}
}
【消費者】
/**
* 生產者
* @author ITCloud
*/
public class Producer {
public static void main(String[] args) throws Exception {
//1.建立連線工廠類
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.186.130");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
//2.獲取連線,建立channel
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//3. 傳送訊息
String msg = "Hello world RabbitMQ!!";
/**
* 引數詳解
*String exchange, String routingKey, BasicProperties props, byte[] body
* 交換機名稱:exchange 必須,如果不指定,則使用rabbitmq提供預設的exchange:AMQP default
* 路由鍵:routingKey 當交換機是Fanout時候routingKey可以不需要,如果沒有明確指定,則路由到佇列
* 傳送訊息帶一些引數:props 非必需
* 要傳送的訊息:body
*/
channel.basicPublish("", "hello.world", null, msg.getBytes());
//4.關閉相關連線
channel.close();
connection.close();
}
}
2. direct交換模式
direct交換模式的特點:生產者和消費者通過routingKey來連線,消費端只有擁有相應的routingKey才能進行消費
【消費者】
/**
* direct型別的交換機,特點:
* 生產者和消費者通過routingKey來連線,消費端只有擁有相應的routingKey才能進行消費
* @author ITCloud
*
*/
public class DirectConsumer {
public static void main(String[] args) throws Exception{
// 1.建立連線
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.186.130");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
// 2.獲取連線,建立channel
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//3.交換機和佇列宣告
/**
* 引數說明:(String exchange, String type, boolean durable, boolean autoDelete,Map<String, Object> arguments)
* exchange: 交換機名稱
* type:交換機型別:direct topic fanout hearders(幾乎不用)
* durable: 是否是持久化的佇列,
* autoDelete:是否自動刪除
* arguments:一些引數,幾乎不用
*/
channel.exchangeDeclare("direct.exchange", "direct", true, false, null);
channel.queueDeclare("direct.queue", true, false, false, null);
//4.佇列繫結
channel.queueBind("direct.queue", "direct.exchange", "direct.queue");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
System.out.println("消費端:" + new String(body));
}
};
//true表示自動接收
channel.basicConsume("direct.queue", true, consumer);
}
}
【生產者】
public class DirectProducer {
public static void main(String[] args) throws Exception{
// 1.建立連線
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.186.130");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
// 2.獲取連線,建立channel
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//3. 傳送訊息
String msg = "Send msg by RabbitMQ!! direct";
channel.basicPublish("direct.exchange", "direct.queue", null, msg.getBytes());
channel.close();
connection.close();
}
}
3. fanout交換模式
fanout交換模式的特點:不需要routingKey,只要綁定了交換機即可
這種模式可以用於死信佇列中
【消費者】
/**
* fanout型別的交換機,特點:
* 不需要routingKey,只要綁定了交換機即可
* @author ITCloud
*
*/
public class FanoutConsumer {
public static void main(String[] args) throws Exception{
// 1.建立連線
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.186.130");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
// 2.獲取連線,建立channel
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//3.交換機和佇列宣告
channel.exchangeDeclare("fanout.exchange", "fanout", true, false, null);
channel.queueDeclare("fanout.queue", true, false, false, null);
//4.佇列繫結,這裡rountingKey=""; 但是不可以設定為null
channel.queueBind("fanout.queue", "fanout.exchange", "");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
System.out.println("消費端:" + new String(body));
}
};
//true表示自動接收
channel.basicConsume("fanout.queue", true, consumer);
}
}
【生產者】
public class FanoutProducer {
public static void main(String[] args) throws Exception{
// 1.建立連線
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.186.130");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
// 2.獲取連線,建立channel
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//3. 傳送訊息
String msg = "Send msg by RabbitMQ!! fanout";
channel.basicPublish("fanout.exchange", "", null, msg.getBytes());
channel.close();
connection.close();
}
}
4. topic交換模式
【消費者】
/**
* topic型別的交換機,特點:
* 通過rountingKey進行模糊匹配:
* 1. * 匹配一個單詞
* 2. # 匹配多個單詞
* 例如:A.* 只可以匹配A.aab;但是不可以匹配A.aa.bb
* @author ITCloud
*
*/
public class TopicConsumer {
public static void main(String[] args) throws Exception{
// 1.建立連線
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.186.130");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
// 2.獲取連線,建立channel
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//3.交換機和佇列宣告
channel.exchangeDeclare("topic.exchange", "topic", true, false, null);
channel.queueDeclare("topic.queue", true, false, false, null);
//4.佇列繫結,這裡可以設定rountingKey=""; 但是不可以設定為null
channel.queueBind("topic.queue", "topic.exchange", "topic.#");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
System.out.println("消費端:" + new String(body));
}
};
//true表示自動接收
channel.basicConsume("topic.queue", true, consumer);
}
}
【生產者】
public class TopicProducer {
public static void main(String[] args) throws Exception{
// 1.建立連線
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.186.130");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
// 2.獲取連線,建立channel
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//3. 傳送訊息
String msg = "Send msg by RabbitMQ!! topic";
//宣告rountingKey = "topic.hello.world" 匹配topic.#
channel.basicPublish("topic.exchange", "topic.hello.world", null, msg.getBytes());
channel.close();
connection.close();
}
}
5. Ack之重回佇列
【消費者】
/**
*重回佇列:就是將訊息進行重新扔到佇列中,給消費者重新消費
* 簡單的說:就是把沒有消費成功的佇列,重新返回給Broker
* @author ITCloud
*/
public class AckConsumer {
public static void main(String[] args) throws Exception {
// 1.建立連線
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.186.130");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
// 2.獲取連線,建立channel
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 3.交換機和 佇列宣告
String queueName = "ack.queue";
String exchanegName = "ack.exchange";
channel.queueDeclare(queueName, true, false, false, null);
channel.exchangeDeclare(exchanegName, "direct", true, false, null);
channel.queueBind(queueName, exchanegName, "ack.queue");
// 4.建立一個簡單的消費者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
System.out.println("消費端:" + new String(body));
Integer num = (Integer)properties.getHeaders().get("num");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (num == 3) {
//表示將該訊息重新扔到訊息的尾端,進行重新消費
//此時會阻塞在這個地方 TODO
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
//訊息手動接收
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicQos(0, 1, false);
//這裡設定成非自動確認接收訊息
channel.basicConsume(queueName, false, consumer);
}
}
【生產者】
/**
*
* @author ITCloud
*/
public class AckProducer {
public static void main(String[] args) throws Exception {
//1.建立連線工廠類
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.186.130");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
//2.獲取連線,建立channel
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//3. 傳送訊息
for (int x = 0; x<5; x++) {
Map<String, Object> headers = new HashMap<>();
headers.put("num", x);
BasicProperties properties = new BasicProperties.Builder()
.deliveryMode(2) //持久化投遞模式
.contentEncoding("utf-8")
.headers(headers)
.build();
String msg = "Hello world RabbitMQ!!" + x;
channel.basicPublish("ack.exchange", "ack.queue", true, properties, msg.getBytes());
}
//4.關閉相關連線
channel.close();
connection.close();
}
}
6. 訊息確認機制
6.1 生產者訊息確認
這裡主要研究非同步comfirm,因為非同步comfirm效能比較高
【消費者】
/**
* 消費者
* @author ITCloud
*/
public