rabbitMq與spring boot搭配實現監聽
在我前面有一篇博客說到了rabbitMq實現與zk類似的watch功能,但是那一篇博客沒有代碼實例,後面自己補了一個demo,便於理解。demo中主要利用spring boot的配置方式,
一、消費者(也就是watcher)配置
配置都采用spring的註解進行配置
1、創建連接
[email protected] public ConnectionFactory createConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
//設置rabbitMq的ip和端口 connectionFactory.setAddresses("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); return connectionFactory; }
2、創建交換機
@Bean public Exchange fanoutExchange() { return new FanoutExchange("ex_rabbit_test"); }
創建一個名為ex_rabbit_test的交換機,交換機的類型為廣播類型(為了實現消息的廣播)
3、創建隊列,並綁定到交換機上
@Bean public Queue queueOne() { return new Queue("queue_one", false, false, true); } @Bean public Binding bindingOne(Queue queueOne, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queueOne) .to(fanoutExchange); }
每一個消費者有自己的隊列,只消費自己隊列的消息;將隊列和交換機綁定之後,交換機會將生產者發出的消息放到所有綁定的隊列中,但是僅限廣播模式,其它模式會按照一定的路由規則進行消息路由,比如topic類型的交換機會按照routingKey路由消息。
註意:在廣播模式中,為了實現消息監聽,每個消費者需要各自起一個隊列,而且隊列名不相同,比如現在有另外一個消費者:
@Bean public Queue queueTwo() { return new Queue("queue_two", false, false, true); } @Bean public Binding BingdingTwo(Queue queueTwo, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queueTwo) .to(fanoutExchange); }
如此一來,當生產者將消息發到交換機ex_rabbit_test中時,交換機就將消息發給queue_one和queue_two兩個隊列中,兩個消費者分別取兩個隊列的消息進行消費。
4、消費消息
@Bean public SimpleMessageListenerContainer execMessageContainerOne() {
//設置監聽者“容器” SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(createConnectionFactory());
//設置隊列名 container.setQueueNames("queue_one");
//設置監聽者數量,即消費線程數 container.setConcurrentConsumers(1); container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> { byte[] body = message.getBody(); if(null != body) { try { String msg = new String(body); String usr = "Consumer one"; consumerService.doProcess(usr, msg);//消費消息 } catch(Exception e) { e.printStackTrace(); } } }); return container; } @Bean public SimpleMessageListenerContainer execMessageContainerTwo() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(createConnectionFactory()); container.setQueueNames("queue_two"); container.setConcurrentConsumers(1); container.setMessageListener((ChannelAwareMessageListener) (message, channel) ->{ byte[] body = message.getBody(); if(null != body) { try { String msg = new String(body); String usr = "Consumer two"; consumerService.doProcess(usr, msg);//消費消息 } catch (Exception e) { e.printStackTrace(); } } }); return container; }
consumerService提供消費消息的服務,執行如下方法
public void doProcess(String usr, String msg) { System.out.println(usr + " receive message from producer:" + msg); }
二、生產者配置
1、與消費者相同的方式建立rabbitMq的連接
2、與消費者相同的方式設置交換機,交換機名稱也為ex_rabbit_test(如果rabbitmq中已經存在這個交換機,可以不用創建)
3、關於是否建立隊列以及將隊列與交換機綁定,我的理解是這樣的:
如果在生產者的代碼裏面建立隊列並將其與交換機綁定,那麽就必須建立所有的消費者的隊列,並將所有隊列與交換機綁定,如果這樣做,消費者中就可以省掉這個配置。事實上,這樣做是有點得不償失的,我不贊同這樣做,這裏只是說明這樣做也可以達到目的。
4、創建rabbit模板(org.springframework.amqp.rabbit.core.RabbitTemplate)
@Bean public RabbitTemplate rabbitTemplateProducer() { RabbitTemplate rabbitTemplate = new RabbitTemplate(this.createConnectionFactory()); rabbitTemplate.setExchange("ex_rabbit_test"); return rabbitTemplate; }
5、實現消息發送
demo中使用spring web的方式啟動消息發送,下面是controller和service的代碼
@Controller @RequestMapping(value="/index") public class ProducerController { @Autowired private ProducerService producerService; @RequestMapping(value = "/send") @ResponseBody public String sendMsg(@RequestParam String msg) { producerService.send(msg); return "Success"; } }
@Service public class ProducerService { @Resource(name = "rabbitTemplateProducer") private RabbitTemplate rabbitTemplate; public void send(String msg) { String message = "Hello, consumer.Sending:" + msg; rabbitTemplate.convertAndSend(message); } }
三、pom文件
在consumer中只需要引入spring ampq的依賴
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>1.5.3.RELEASE</version> </dependency> </dependencies>
在prudocer中需要引入spring ampq的依賴,另外由於是啟動了web 項目,所以需要spring web的依賴
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>1.5.3.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>1.5.3.RELEASE</version> </dependency> </dependencies>
四、啟動項目和測試結果
使用spring boot可以快速啟動項目,首先,在8882端口上啟動producer,然後啟動consumer。通過在controller中定義的訪問地址http://localhost:8882/index/send?msg=hello everybody(此處的msg必須有,[email protected]),可以看到兩個消費者都消費了這條消息
Consumer one receive message from producer:Hello, consumer.Sending:hello everybody
Consumer two receive message from producer:Hello, consumer.Sending:hello everybody
從rabbitMq的後臺(http://localhost:15672 usrname:guest pasword:guest)可以看到剛才創建的交換機和隊列。
當消費者變多,或者為了代碼的統一管理,每個消費者的代碼需要相同,為了實現廣播需求,需要為每個消費者設置不同的隊列名。這種情況下,可以采用UUID的方式,每個消費者可以創建一個唯一的隨機隊列名。UUID方式創建隊列名的代碼可以在ampq的jar包中找到org.springframework.amqp.core.AnonymousQueue
public String generateName() { UUID uuid = UUID.randomUUID(); ByteBuffer bb = ByteBuffer.wrap(new byte[16]); bb.putLong(uuid.getMostSignificantBits()) .putLong(uuid.getLeastSignificantBits()); // TODO: when Spring 4.2.4 is the minimum Spring Framework version, use encodeToUrlSafeString() SPR-13784. return this.prefix + Base64Utils.encodeToString(bb.array()) .replaceAll("\\+", "-") .replaceAll("/", "_") // but this will remain .replaceAll("=", ""); }
可以將UUID方法的返回值加在固定隊列名的後面,這樣就生成了一個唯一的隨機隊列名。關於UUID的描述可以自行百度。
ps:前段時間看了spring cloud,看到其中的一個工具,spring cloud bus也可以用作消息監聽,細察之後發現,spring cloud bus也是封裝了rabbitMq,實現了消息隊列。
rabbitMq與spring boot搭配實現監聽