RabbitMQ高階特性-消費端自定義監聽
阿新 • • 發佈:2018-12-31
消費端自定義監聽
- 在之前的程式碼演示中, consumer進行消費時 ,都是使用while迴圈進行訊息消費, 然後使用consumer.nextDelivery()方法獲取下一條訊息
- 但是在實際工作中, 使用自定義的Consumer更加的方便, 解耦性也更加的強, 實現自定義的Consumer可以實現Consumer介面, 或者更常用的是繼承預設的DefaultConsumer
程式碼演示
自定義消費者(替換QueueingConsumer)
package com.qiyexue.api.consumer;
import com.rabbitmq. client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
/**
* 自定義消費者
* @author 七夜雪
* @create 2018-12-16 8:20
*/
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("-------------自定義消費者------------");
System.out.println("consumerTag : " + consumerTag) ;
System.out.println("envelope : " + envelope);
System.out.println("properties : " + properties);
System.out.println("body : " + new String(body));
}
}
Producer
package com.qiyexue.api.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 生產者
*
* @author 七夜雪
* @create 2018-12-15 19:56
*/
public class Producer {
public static void main(String[] args) throws Exception {
// 1. 建立ConnectionFactory, 並設定屬性
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.72.138");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2. 建立連線
Connection connection = factory.newConnection();
// 3. 建立channel
Channel channel = connection.createChannel();
String exchangeName = "test_consumer_exchange";
String routingKey = "consumer.qiye";
// 傳送訊息
String msg = "自定義消費者, 訊息傳送 : Hello, 七夜雪";
channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());
// 關閉連線
channel.close();
connection.close();
}
}
Consumer
package com.qiyexue.api.consumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
* Return Listener模式消費者
*
* @author 七夜雪
* @create 2018-12-15 20:07
*/
public class Consumer {
public static void main(String[] args) throws Exception {
// 1. 建立連線工廠並設定屬性
ConnectionFactory factory = new ConnectionFactory();;
factory.setHost("192.168.72.138");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2. 建立連線
Connection connection = factory.newConnection();
// 3. 建立channel
Channel channel = connection.createChannel();
// 4. 宣告Exchange
String exchangeName = "test_consumer_exchange";
String exchangeType = "topic";
String routingKey = "consumer.*";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);
// 5. 宣告訊息佇列
String queueName = "test_consumer_queue";
channel.queueDeclare(queueName, true, false, false, null);
// 6. 繫結佇列和Exchange
channel.queueBind(queueName, exchangeName, routingKey);
// 7. 設定消費者為自定義的消費者
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
}