RabbitMQ 使用之 四種交換機模式
阿新 • • 發佈:2018-12-30
一、整合RabbitMQ
- . 新增依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- . 新增配置
#RabbitMQ
spring.rabbitmq.host=39.106.128.50
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq .password=guest
spring.rabbitmq.virtual-host=/
#消費者數量
spring.rabbitmq.listener.simple.concurrency=10
spring.rabbitmq.listener.simple.max-concurrency=10
#消費者每次從佇列中獲取的訊息數量
spring.rabbitmq.listener.simple.prefetch=1
#消費者自動啟動
spring.rabbitmq.listener.simple.auto-startup=true
#消費失敗,自動重新入隊
spring.rabbitmq.listener .simple.default-requeue-rejected=true
#啟用傳送重試
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1000
spring.rabbitmq.template.retry.max-attempts=3
spring.rabbitmq.template.retry.max-interval=10000
spring.rabbitmq.template.retry.multiplier=1.0
注意使用guest使用者遠端訪問RabbitMQ,如果訪問失敗,需要在遠端的RabbitMQ中進行如下配置:
在 /usr/local/rabbitmq/rabbitmq_server-3.7.4/etc/rabbitmq/ 目錄下,新建rabbitmq.config檔案,裡面寫入:“[{rabbit, [{loopback_users, []}]}]. ”即可,不用加雙引號。
二、RabbitMQ的使用
RabbitMQ 有四種交換機模式:
- Direct Pattern (此模式不需要配置交換機)
- Fanout Pattern ( 類似於廣播一樣,將訊息傳送給和他繫結的佇列 )
- Topic Pattern ( 繫結交換機時可以做匹配。 #:表示零個或多個單詞。*:表示一個單詞 )
- Header Pattern ( 帶有引數的匹配規則 )
詳細舉例如下:
/**
* 傳送方
*/
@Service
public class MQSender {
private static final Logger log = LoggerFactory.getLogger(MQSender.class);
@Autowired
AmqpTemplate amqpTemplate;
/**
* use Direct Pattern. RabbitMQ default, no need exchange.
* @param message
*/
public void sendDirect(Object message){
String msg = RedisService.BeanToString(message);
log.info("send direct message:" + msg );
amqpTemplate.convertAndSend(MQConfig.DIRECT_QUEUE,msg);
}
/**
* use Direct Pattern
* @param message
*/
public void sendFanout(Object message){
String msg = RedisService.BeanToString(message);
log.info("send fanout message:" + msg );
amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE,"",msg);
}
/**
* use Topic Pattern
* @param message
*/
public void sendTopic(Object message){
String msg = RedisService.BeanToString(message);
log.info("send Topic message:" + msg );
amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE,"topic.key1",msg+"1"); // 可以匹配到 topic.# and topic.key1
amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE,"topic.key2",msg+"2"); // 可以匹配到 topic.#
}
/**
* use Header Pattern
* @param message
*/
public void sendHeader(Object message){
String msg = RedisService.BeanToString(message);
log.info("send Header message:" + msg );
MessageProperties mp = new MessageProperties();
mp.setHeader("header1","value1");
mp.setHeader("header2","value2"); // 與 配置中的map完全匹配,因為那邊是 whereAll()方法
Message message1 = new Message(msg.getBytes(),mp);
amqpTemplate.convertAndSend(MQConfig.HEADER_EXCHANGE,"",message1); // 可以匹配到 topic.# and topic.key1
}
/**
* 配置中心
*/
@Configuration
public class MQConfig {
public static final String DIRECT_QUEUE = "direct.queue";
public static final String FANOUT_QUEUE1 = "fanout.queue1";
public static final String FANOUT_QUEUE2 = "fanout.queue2";
public static final String FANOUT_EXCHANGE = "fanout.exchange";
public static final String TOPIC_QUEUE1 = "topic.queue1";
public static final String TOPIC_QUEUE2 = "topic.queue2";
public static final String TOPIC_EXCHANGE = "topic.exchange";
public static final String TOPIC_KEY1 = "topic.key1";
public static final String TOPIC_KEY2 = "topic.#";
public static final String HEADER_QUEUE = "header.queue";
public static final String HEADER_EXCHANGE = "header.exchange";
/**
* Direct 模式 交換機Exchange
*/
@Bean
public Queue directQueue(){
// 一個引數是名稱,,另一個表示是否持久化
return new Queue(DIRECT_QUEUE,true);
}
/**---------------------------------------------*/
/**
* Fanout Pattern. 類似於廣播一樣,將訊息傳送給和他繫結的佇列
**/
@Bean
public Queue fanoutQueue1(){
return new Queue(FANOUT_QUEUE1,true);
}
@Bean
public Queue fanoutQueue2(){
return new Queue(FANOUT_QUEUE2,true);
}
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(FANOUT_EXCHANGE);
}
/**
* 繫結 exchange and queue
*/
@Bean
public Binding fanoutBinding1(){
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
@Bean
public Binding fanoutBinding2(){
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
/**---------------------------------------------*/
/**
* Topic Pattern. 繫結交換機時可以做匹配。 #:表示零個或多個單詞。*:表示一個單詞
**/
@Bean
public Queue topicQueue1(){
return new Queue(TOPIC_QUEUE1,true);
}
@Bean
public Queue topicQueue2(){
return new Queue(TOPIC_QUEUE2,true);
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(TOPIC_EXCHANGE);
}
/**
* 繫結 exchange and queue
*/
@Bean
public Binding topicBinding1(){
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(TOPIC_KEY1); // 精確匹配, 匹配成功則傳送到 TOPIC_QUEUE1佇列
}
@Bean
public Binding topicBinding2(){
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(TOPIC_KEY2); // 模糊匹配,匹配成功則傳送到 TOPIC_QUEUE2佇列
}
/**---------------------------------------------*/
/**
* Header Pattern. 交換機 Exchange
**/
@Bean
public Queue headerQueue(){
return new Queue(HEADER_QUEUE,true);
}
@Bean
public HeadersExchange headersExchange(){
return new HeadersExchange(HEADER_EXCHANGE);
}
@Bean
public Binding headerBinding(){
Map<String,Object> map = new HashMap<>();
map.put("header1","value1");
map.put("header2","value2");
return BindingBuilder.bind(headerQueue()).to(headersExchange()).whereAll(map).match(); // whereXxx() 方法代表了匹配規則
}
}
/**
* 消費者
*/
@Service
public class MQReceiver {
private static final Logger log = LoggerFactory.getLogger(MQReceiver.class);
/**
* Direct Pattern.
* @param message
*/
@RabbitListener(queues = MQConfig.DIRECT_QUEUE)
public void directReceive(String message){
log.info("receive direct message:" + message);
}
/** ----------------------------- */
/**
* fanout Pattern.
* @param message
*/
@RabbitListener(queues = MQConfig.FANOUT_QUEUE1)
public void fanoutReceive1(String message){
log.info("receive fanout1 message:" + message);
}
/**
* fanout Pattern.
* @param message
*/
@RabbitListener(queues = MQConfig.FANOUT_QUEUE2)
public void fanoutReceive2(String message){
log.info("receive fanout2 message:" + message);
}
/** ----------------------------- */
/**
* topic Pattern.
* @param message
*/
@RabbitListener(queues = MQConfig.TOPIC_QUEUE1)
public void topicReceive1(String message){
log.info("receive topic1 message:" + message);
}
/**
* topic Pattern.
* @param message
*/
@RabbitListener(queues = MQConfig.TOPIC_QUEUE2)
public void topicReceive2(String message){
log.info("receive topic2 message:" + message);
}
/** ----------------------------- */
/**
* Header Pattern.
* @param message
*/
@RabbitListener(queues = MQConfig.HEADER_QUEUE)
public void topicHeader(byte[] message){
log.info("receive topic1 message:" + new String(message));
}
}