【Spring Boot】--整合RabbitMQ
阿新 • • 發佈:2018-12-04
目錄
0、前言
需要已經安裝RabbitMQ,並且啟動、配置好使用者。參考《基於CentOS6.5安裝RabbitMQ》,《基於CentOS6.5使用RabbitMQ(二)》,《基於CentOS6.5使用RabbitMQ(三)》。
1、訊息流程
生產者,消費者,訊息
內部訊息:Exchange,Binding,Queues
2、新增依賴
在pom.xml中新增rabbitmq的依賴:
<!-- 引入rabbitmq依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3、新增配置
# rabbitmq配置 mq.rabbit.host=**.**.**.** mq.rabbit.port=5672 mq.rabbit.virtualHost=/ mq.rabbit.username=admin mq.rabbit.password=admin
4、新建配置類
RabbitMQConfig.java
/** * @Auther: chisj [email protected] * @Date: 2018-11-12 15:35 * @Description: */ @Component @Slf4j @ConfigurationProperties public class RabbitMQConfig { @Value("${mq.rabbit.host}") public String mqRabbitHost; @Value("${mq.rabbit.port}") public int mqRabbitPort; @Value("${mq.rabbit.username}") public String mqRabbitUserName; @Value("${mq.rabbit.password}") public String mqRabbitPassword; @Value("${mq.rabbit.virtualHost}") public String mqRabbitVirtualHost; public static String EXCHANGE_NAME = "exchange-name"; public static String QUEUE_NAME = "queue-name"; public static String ROUTING_KEY = "routing-key"; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(this.mqRabbitHost,this.mqRabbitPort); connectionFactory.setUsername(this.mqRabbitUserName); connectionFactory.setPassword(this.mqRabbitPassword); connectionFactory.setVirtualHost(this.mqRabbitVirtualHost); connectionFactory.setPublisherConfirms(true); return connectionFactory; } @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; } @Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE_NAME); } @Bean public Queue queue() { return new Queue(QUEUE_NAME, true); } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(defaultExchange()).with(ROUTING_KEY); } @Bean public SimpleMessageListenerContainer messageContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); container.setMessageListener(new ChannelAwareMessageListener() { public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception { byte[] body = message.getBody(); log.info("消費端接收到訊息 : " + new String(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }); return container; } }
5、新建生產者介面
ProductService.java
/**
* @Auther: chisj [email protected]
* @Date: 2018-11-12 15:37
* @Description:
*/
public interface ProductService {
/**
* 儲存
*/
public int save(String productName);
}
6、新建生產者實現類
ProductServiceImpl.java
/**
* @Auther: chisj [email protected]
* @Date: 2018-11-12 15:39
* @Description:
*/
@Service("productServiceImpl")
@Slf4j
public class ProductServiceImpl implements ProductService, RabbitTemplate.ConfirmCallback {
private RabbitTemplate rabbitTemplate;
public ProductServiceImpl(RabbitTemplate rabbitTemplate){
this.rabbitTemplate=rabbitTemplate;
this.rabbitTemplate.setConfirmCallback(this);
}
/**
* 儲存
*
* @param productName
*/
@Override
public int save(String productName) {
//執行儲存
String uuid = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(uuid);
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, productName, correlationId);
return 0;
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("訊息id:" + correlationData);
if (ack) {
log.info("訊息傳送確認成功");
} else {
log.info("訊息傳送確認失敗:" + cause);
}
}
}
7、新建生產者控制器類
ProductController.java
/**
* @Auther: chisj [email protected]
* @Date: 2018-11-12 15:42
* @Description:
*/
@RestController
@RequestMapping("/Product")
public class ProductController {
@Resource
private ProductService productService;
@RequestMapping("/save")
public String save(String productName) {
productService.save(productName);
return "product save success.";
}
}
8、測試
在postman中請求{{url}}/Product/save這個端子
檢視後臺log