第九篇:Spring Boot整合RabbitMQ
阿新 • • 發佈:2018-12-12
RabbitMQ是一個開源的訊息代理和佇列伺服器,用來通過普通協議在完全不同的應用之間共享資料,或者簡單地將作業佇列以便讓分散式伺服器進行處理。訊息佇列使用訊息將應用程式連線起來,這些訊息通過像RabbitMQ這樣的訊息代理伺服器在應用程式之間路由。這篇文章將帶你瞭解怎麼整合RabbitMQ伺服器,並且通過它怎麼去傳送和接收訊息。
構建工程
構架一個SpringBoot工程,其pom檔案依賴加上spring-boot-starter-amqp的起步依賴:
<dependency>
<groupId>org.springframework.boot</ groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
訊息交換機配置
@Configuration
public class ExchangeConfig {
@Bean
public DirectExchange directExchange(){
DirectExchange directExchange = new DirectExchange(RabbitConfig.EXCHANGE,true,false);
return directExchange;
}
}
佇列配置
@Configuration
public class QueueConfig {
@Bean
public Queue Queue1() {
return new Queue("first-queue",true,false,false);
}
@Bean
public Queue Queue2() {
return new Queue("second-queue",true,false,false);
}
}
RabbitMq配置
@Configuration
public class RabbitMqConfig {
/** 訊息交換機的名字*/
public static final String EXCHANGE = "exchangeTest";
/** 佇列key1*/
public static final String ROUTINGKEY1 = "queue_one_key1";
/** 佇列key2*/
public static final String ROUTINGKEY2 = "queue_one_key2";
@Autowired
private QueueConfig queueConfig;
@Autowired
private ExchangeConfig exchangeConfig;
/**
* 連線工廠
*/
@Autowired
private ConnectionFactory connectionFactory;
/**
將訊息佇列1和交換機進行繫結
*/
@Bean
public Binding binding_one() {
return BindingBuilder.bind(queueConfig.Queue1()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTINGKEY1);
}
/**
* 將訊息佇列2和交換機進行繫結
*/
@Bean
public Binding binding_two() {
return BindingBuilder.bind(queueConfig.Queue2()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTINGKEY2);
}
/**
* queue listener 觀察 監聽模式
* 當有訊息到達時會通知監聽在對應的佇列上的監聽物件
* @return
*/
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer_one(){
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
simpleMessageListenerContainer.addQueues(queueConfig.Queue1());
simpleMessageListenerContainer.setExposeListenerChannel(true);
simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
simpleMessageListenerContainer.setConcurrentConsumers(1);
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設定確認模式手工確認
return simpleMessageListenerContainer;
}
/**
* 定義rabbit template用於資料的接收和傳送
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setConfirmCallback(msgSendConfirmCallBack());
return template;
}
/**
* 訊息確認機制
* Confirms給客戶端一種輕量級的方式,能夠跟蹤哪些訊息被broker處理,
* 哪些可能因為broker宕掉或者網路失敗的情況而重新發布。
* 確認並且保證訊息被送達,提供了兩種方式:釋出確認和事務。(兩者不可同時使用)
* 在channel為事務時,不可引入確認模式;同樣channel為確認模式下,不可使用事務。
* @return
*/
@Bean
public MsgSendConfirmCallBack msgSendConfirmCallBack() {
return new MsgSendConfirmCallBack();
}
}
訊息回撥
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.info("MsgSendConfirmCallBack , 回撥id:" + correlationData);
if (ack) {
logger.info("success");
} else {
logger.error("fail" + cause+"\n重新發送");
}
}
}
生產者
@Service
public class SendService {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 傳送訊息
* @param uuid
* @param message 訊息
*/
public void send(String uuid,Object message) {
CorrelationData correlationId = new CorrelationData(uuid);
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ROUTINGKEY2,
message, correlationId);
}
}
消費者
@Component
public class Consumer {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@RabbitListener(queues = {"first-queue", "second-queue"},
containerFactory = "rabbitListenerContainerFactory")
public void handleMessage(String message) throws Exception {
logger.info("Consumer handleMessage : " + message);
}
}
Controller
@RestController
public class SendController {
@Autowired
private SendService sendService;
@GetMapping("/send/{message}")
public String send(@PathVariable String message){
String uuid = UUID.randomUUID().toString();
sendService.send(uuid,message);
return uuid;
}
}
測試
產看控制檯
恭喜!你剛才已經學會了如何通過spring raabitmq去構建一個訊息傳送和訂閱的程式。