springboot-rabbitmq-docker
阿新 • • 發佈:2018-12-08
1.安裝rabbitmq
//下載映象,我下載的這個lastest映象沒有web功能,也就是15672埠沒有反應
docker pull rabbitmq
//啟動映象,這裡15672埠沒辦法用,使用的話下載其他映象
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq
2.配置springboot
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
spring.rabbitmq.host=192.168.134.132
spring.rabbitmq.username=user
spring.rabbitmq.password=password
spring.rabbitmq.port=5672
3.進行簡單的佇列傳送,路由傳送,topic模式傳送
(1)對指定佇列進行傳送和接收:
package com.example.demo.web; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Arrays; import java.util.List; /** * @Auther: gaoyang * @Date: 2018/12/5 14:13 * @Description: */ @RestController @Configuration public class MyWeb { @Autowired RabbitTemplate rabbitTemplate; //註冊一個佇列 @Bean public Queue queue1() { return new Queue("myQueue"); } @GetMapping("/b") public Object b(){ rabbitTemplate.convertAndSend("myQueue","測試"); return "ok"; } }
(2)對指定路由傳送資料
//第二個佇列 @Bean public Queue queue2() { return new Queue("test222"); } //註冊一個交換機 @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("fan"); } //將兩個佇列繫結到該路由器 @Bean public Binding binding3(){ return BindingBuilder.bind(queue2()).to(fanoutExchange()); } @Bean public Binding binding4(){ return BindingBuilder.bind(queue1()).to(fanoutExchange()); }
//第二個佇列名稱忽略,這樣就會向兩個佇列同時傳送資訊
@GetMapping("/c")
public Object c(){
rabbitTemplate.convertAndSend("fan",null,"測試");
return "ok";
}
(3)topic動態交換機名稱傳送
//註冊一個動態交換機
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("jiaohuan");
}
//這裡繫結佇列後路由名稱為topic.msg
@Bean
public Binding binding(@Qualifier("queue1") Queue queue1, TopicExchange topicExchange) {
return BindingBuilder.bind(queue1).to(topicExchange).with("topic.msg");
}
//這裡的名字使用.#,#代表0到多個字元,*代表一個字元.所以這裡會正則的匹配
@Bean
public Binding binding2(@Qualifier("queue2") Queue queue2, TopicExchange topicExchange) {
return BindingBuilder.bind(queue2).to(topicExchange).with("topic.#");
}
//所以這裡傳送的話,路由名稱為topic.msg會像兩個佇列同時傳送,如果為topic.aaa其他等則只會正則的匹配一個佇列;
4.消費類
@Component
public class Consumer {
@RabbitListener(queues = "test111")
public void listen(List list){
System.out.println(list.get(0));
}
@RabbitListener(queues = "test222")
public void listen2(String list){
System.out.println(list);
}
}
4.ack模式
spring.rabbitmq.host=192.168.134.132
spring.rabbitmq.username=user
spring.rabbitmq.password=password
spring.rabbitmq.port=5672
# 開啟發送確認
spring.rabbitmq.publisher-confirms=true
# 開啟發送失敗退回
spring.rabbitmq.publisher-returns=true
# 開啟ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
配置傳送訊息的非同步結果:(該處可以做業務處理,重新發送等),此處是傳送方到mq的結果
@Component
public class MyConfirm implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
System.out.println("Confirm 訊息傳送失敗" + cause + correlationData.toString());
} else {
System.out.println("Confirm 訊息傳送成功 ");
}
}
}
mq到接收方的結果:如果失敗可以使用重新發送
@Component
public class MyCallBack implements RabbitTemplate.ReturnCallback {
@Autowired
RabbitTemplate rabbitTemplate;
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("sender return " + message.toString()+"==="+i+"==="+s1+"==="+s2);
rabbitTemplate.send(message);
}
}
傳送資訊:
@GetMapping("/d")
public Object d() {
rabbitTemplate.setReturnCallback(myCallBack);
rabbitTemplate.setConfirmCallback(myConfirm);
rabbitTemplate.convertAndSend("test333", "測試");
return "ok";
}
訊息接收者:
@RabbitListener(queues = "test333")
public void listen3(String list, Channel channel, Message message) throws IOException {
System.out.println(list);
try {
int a= 100/0;
//告訴伺服器收到這條訊息 已經被我消費了 可以在佇列刪掉 這樣以後就不會再發了 否則訊息伺服器以為這條訊息沒處理掉 後續還會在發
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("receiver success");
} catch (Exception e) {
e.printStackTrace();
//丟棄這條訊息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
System.out.println("receiver fail");
}
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
以上第一個引數是該條資料的index標識,後面引數是是否全部處理小於當前index值得資訊為接收成功;
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
以上第一個和第二個引數與上面同理,這裡是是否全部置於接收失敗,第三個引數為是否重新接收資料;
具體的方法含義可以引數這個---連結;