RabbitMq 集成 spring boot 消息隊列 入門Demo
阿新 • • 發佈:2017-07-04
spring boot rabbitmq 入門集成
spring boot 集成 RabbitMq還是很方便的。現在來一個簡單的例子來集成rabbitmq。入門demo。
主要概念:
其中比較重要的概念有 4 個,分別為:虛擬主機,交換機,隊列,和綁定。
虛擬主機:一個虛擬主機持有一組交換機、隊列和綁定。為什麽需要多個虛擬主機呢?很簡單,RabbitMQ當中,用戶只能在虛擬主機的粒度進行權限控制。 因此,如果需要禁止A組訪問B組的交換機/隊列/綁定,必須為A和B分別創建一個虛擬主機。每一個RabbitMQ服務器都有一個默認的虛擬主機“/”。
交換機:Exchange 用於轉發消息,但是它不會做存儲 ,如果沒有 Queue bind 到 Exchange 的話,它會直接丟棄掉 Producer 發送過來的消息。 這裏有一個比較重要的概念:路由鍵 。消息到交換機的時候,交互機會轉發到對應的隊列中,那麽究竟轉發到哪個隊列,就要根據該路由鍵。
綁定:也就是交換機需要和隊列相綁定,這其中如上圖所示,是多對多的關系。
首先是配制文件。
#spring.application.name=spring-boot-rabbitmq spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
發送者:
package com.basic.rabbitmq.send; import com.basic.rabbitmq.configuration.RabbitMqConfig2; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; import java.util.Date; /** * Created by sdc on 2017/6/17. */ @Service("helloSender") public class HelloSender { @Autowired private AmqpTemplate amqpTemplate; // private Rabbitt public void send() { String contenxt = "order_queue_message"; this.amqpTemplate.convertAndSend(RabbitMqConfig2.QUEUE_EXCHANGE_NAME,"order_queue_routing",contenxt); // this.amqpTemplate.conver } }
配制信息:
package com.basic.rabbitmq.configuration; import com.basic.rabbitmq.receiver.Receiver; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.listener.MessageListenerContainer; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.io.IOException; /** * Created by sdc on 2017/6/17. */ @Configuration public class RabbitMqConfig2 { public static final String QUEUE_NAME = "order_queue"; public static final String QUEUE_EXCHANGE_NAME = "topic_exchange_new"; public static final String routing_key = "order_queue_routing"; @Bean public Queue queue() { //是否持久化 boolean durable = false; //僅創建者可以使用該隊列,斷開後自動刪除 boolean exclusive = false; //當所有消費者都斷開連接後,是否刪除隊列 boolean autoDelete = false; return new Queue(QUEUE_NAME, durable, exclusive, autoDelete); } @Bean public TopicExchange exchange() { //是否持久化 boolean durable = false; //當所有消費者都斷開連接後,是否刪除隊列 boolean autoDelete = false; return new TopicExchange(QUEUE_EXCHANGE_NAME, durable, autoDelete); } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(exchange()).with(routing_key); } @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1",5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); /** 如果要進行消息回調,則這裏必須要設置為true */ connectionFactory.setPublisherConfirms(true); // 必須要設置 // connectionFactory.setPublisherReturns(); return connectionFactory; } @Bean SimpleMessageListenerContainer container() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory()); // container.setQueueNames(QUEUE_NAME); container.setQueues(queue()); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("收到消息 : " + new String(body)); channel.queueDeclare(QUEUE_NAME, true, false, false, null); // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認消息成功消費 // channel.basicAck(); //應答 // channel.basicReject();//拒絕 // channel.basicRecover(); //恢復 // channel.basicQos(); // channel.addConfirmListener(new ConfirmListener() { // @Override // public void handleAck(long deliveryTag, boolean multiple) throws IOException { // //失敗重發 // } // // @Override // public void handleNack(long deliveryTag, boolean multiple) throws IOException { // //確認ok // } // }); } }); return container; } // @Bean // MessageListenerAdapter listenerAdapter(Receiver receiver) { // return new MessageListenerAdapter(receiver, "receiveMessage"); // } }
測試類:
package com.rabbit.test; import com.basic.rabbitmq.send.HelloSender; import com.basic.system.Application; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import javax.annotation.Resource; /** * Created by sdc on 2017/6/17. */ @RunWith(SpringRunner.class) @SpringBootTest(classes = Application.class) public class RabbitMqTest { @Autowired public HelloSender helloSender; @Test public void helloword() throws Exception { helloSender.send(); } }
這只是一個demo,學習的時候會測試各種的事情,在這基礎上更改就可以了,心中的疑慮測試沒了就可以寫一些項目了。
本文出自 “10093778” 博客,請務必保留此出處http://10103778.blog.51cto.com/10093778/1944218
RabbitMq 集成 spring boot 消息隊列 入門Demo