rabbitMQ應用,laravel生產廣播消息,springboot消費消息
阿新 • • 發佈:2018-06-18
-- lis bind 交換 oot server 官方 edi 圖片
最近做一個新需求,用戶發布了動態,前臺需要查詢,為了用戶讀取信息響應速度更快(MySQL很難實現或者說實現起來很慢),所以在用戶動態發布成功後,利用消息機制異步構建 redis緩存 和 elasticsearch索引 。
開發環境
rabbitMQ服務端,docker安裝
拉取rabbit-mq鏡像 docker pull hub.c.163.com/library/rabbitmq:3.6.10-management 運行鏡像 docker run -d --name rabbitmq --publish 5671:5671 --publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 hub.c.163.com/library/rabbitmq:3.6.10-management 後臺地址: http://192.168.1.8:15672
消息生產端(PHP):
composer 安裝 rabbitmq客戶端 composer require php-amqplib/php-amqplib 生產廣播消息官方demo https://github.com/php-amqplib/php-amqplib/blob/master/demo/amqp_publisher_fanout.php
應用中代碼
<?php /** * User: [email protected] * Date: 2018/6/18 * Time: 下午1:54 */ namespace App\ThirdParty\Message; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class AmqpPublisher { public function send($content) { $exchange = ‘message.fanout.exchange‘;// 創建連接 $connection = new AMQPStreamConnection( config(‘app.mq_host‘), config(‘app.mq_port‘), config(‘app.mq_user‘), config(‘app.mq_pass‘), config(‘app.mq_vhost‘) ); $channel = $connection->channel(); /* name: $exchange type: fanout passive: false // don‘t check is an exchange with the same name exists durable: false // the exchange won‘t survive server restarts auto_delete: true //the exchange will be deleted once the channel is closed. */ $channel->exchange_declare($exchange, ‘fanout‘, false, true, false); $messageBody = $content; $message = new AMQPMessage($messageBody, array(‘content_type‘ => ‘text/plain‘)); $channel->basic_publish($message, $exchange); // 關閉通道 $channel->close(); // 關閉連接 $connection->close(); } }
消息消費端(Java):
引入maven依賴 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置廣播隊列信息
package cn.taxiong.release.config; import cn.taxiong.release.constant.QueueConstants; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * RabbitMQFanout模式配置 * * @author [email protected] * @create 2018-06-18 下午4:04 **/ @Slf4j @Configuration public class RabbitMQFanoutConfig { @Bean public Queue createFanoutQueueCache() { log.info( "創建了FanoutQueue cache 緩存 隊列" ); return new Queue(QueueConstants.MESSAGE_QUEUE_RELEASE_CACHE_NAME); } @Bean public Queue createFanoutQueueIndex() { log.info( "創建了FanoutQueue index 緩存 隊列" ); return new Queue(QueueConstants.MESSAGE_QUEUE_RELEASE_INDEX_NAME); } @Bean public FanoutExchange fanoutExchangeRelease() { log.info( "創建了fanoutExchange交換機" ); return new FanoutExchange( QueueConstants.MESSAGE_FANOUT_EXCHANGE); } @Bean public Binding fanoutExchangeCacheQueueBinding() { log.info( "將FanoutQueue cache 隊列綁定到交換機fanoutExchange" ); return BindingBuilder.bind( createFanoutQueueCache() ).to( fanoutExchangeRelease() ); } @Bean public Binding fanoutExchangeIndexQueueBinding() { log.info( "將FanoutQueue index 隊列綁定到交換機fanoutExchange" ); return BindingBuilder.bind( createFanoutQueueIndex() ).to( fanoutExchangeRelease() ); } }
隊列常量信息
package cn.taxiong.release.constant; /** * 隊列常量 * * @author [email protected] * @create 2018-06-14 下午7:02 **/ public interface QueueConstants {/** * 消息交換 */ String MESSAGE_FANOUT_EXCHANGE = "message.fanout.exchange"; /** * 發布緩存消息隊列名稱 */ String MESSAGE_QUEUE_RELEASE_CACHE_NAME = "message.release.cache.queue"; /** * 發布索引消息隊列名稱 */ String MESSAGE_QUEUE_RELEASE_INDEX_NAME = "message.release.index.queue"; }
緩存(cache)服務消費消息:
package cn.taxiong.release.message; import cn.taxiong.release.constant.QueueConstants; import cn.taxiong.release.service.OperateReleaseService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; /** * 消息消費 * * @author [email protected] * @create 2018-06-14 下午7:14 **/ @Slf4j @Component @RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_RELEASE_CACHE_NAME) public class MessageConsumer { @Autowired private OperateReleaseService operateReleaseService; @RabbitHandler public void handler(@Payload String message) { // operateReleaseService.storeReleaseRedisCache(message); log.info("緩存cache消息消費1:{}", message); } }
索引(index)服務消費消息:
package cn.taxiong.release.message; import cn.taxiong.release.constant.QueueConstants; import cn.taxiong.release.service.OperateReleaseService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; /** * 消息消費 * * @author [email protected] * @create 2018-06-14 下午7:14 **/ @Slf4j @Component @RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_RELEASE_INDEX_NAME) public class MessageConsumer2 { @Autowired private OperateReleaseService operateReleaseService; @RabbitHandler public void handler(@Payload String message) { log.info("索引消息 index 消費2:{}", message); } }
rabbitMQ應用,laravel生產廣播消息,springboot消費消息