1. 程式人生 > >rabbitMQ應用,laravel生產廣播消息,springboot消費消息

rabbitMQ應用,laravel生產廣播消息,springboot消費消息

-- lis bind 交換 oot server 官方 edi 圖片

最近做一個新需求,用戶發布了動態,前臺需要查詢,為了用戶讀取信息響應速度更快(MySQL很難實現或者說實現起來很慢),所以在用戶動態發布成功後,利用消息機制異步構建 redis緩存 和 elasticsearch索引 。

開發環境

rabbitMQ服務端,docker安裝

拉取rabbit-mq鏡像
docker pull hub.c.163.com/library/rabbitmq:3.6.10-management

運行鏡像
docker run -d --name rabbitmq --publish 5671:5671  --publish 5672:5672 --publish 4369
:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 hub.c.163.com/library/rabbitmq:3.6.10-management 後臺地址: http://192.168.1.8:15672

消息生產端(PHP):

composer 安裝 rabbitmq客戶端
composer require php-amqplib/php-amqplib


生產廣播消息官方demo
https://github.com/php-amqplib/php-amqplib/blob/master/demo/amqp_publisher_fanout.php

應用中代碼

<?php
/**
 * User: [email protected]
 * Date: 2018/6/18
 * Time: 下午1:54
 */

namespace App\ThirdParty\Message;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class AmqpPublisher
{
    public function send($content)
    {
        $exchange = ‘message.fanout.exchange‘;
        
// 創建連接 $connection = new AMQPStreamConnection( config(‘app.mq_host‘), config(‘app.mq_port‘), config(‘app.mq_user‘), config(‘app.mq_pass‘), config(‘app.mq_vhost‘) ); $channel = $connection->channel(); /* name: $exchange type: fanout passive: false // don‘t check is an exchange with the same name exists durable: false // the exchange won‘t survive server restarts auto_delete: true //the exchange will be deleted once the channel is closed. */ $channel->exchange_declare($exchange, ‘fanout‘, false, true, false); $messageBody = $content; $message = new AMQPMessage($messageBody, array(‘content_type‘ => ‘text/plain‘)); $channel->basic_publish($message, $exchange); // 關閉通道 $channel->close(); // 關閉連接 $connection->close(); } }

技術分享圖片

消息消費端(Java):

引入maven依賴

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
    

配置廣播隊列信息

package cn.taxiong.release.config;

import cn.taxiong.release.constant.QueueConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * RabbitMQFanout模式配置
 *
 * @author [email protected]
 * @create 2018-06-18 下午4:04
 **/
@Slf4j
@Configuration
public class RabbitMQFanoutConfig {


    @Bean
    public Queue createFanoutQueueCache() {
        log.info( "創建了FanoutQueue cache 緩存 隊列" );
        return new Queue(QueueConstants.MESSAGE_QUEUE_RELEASE_CACHE_NAME);
    }

    @Bean
    public Queue createFanoutQueueIndex() {
        log.info( "創建了FanoutQueue index 緩存 隊列" );
        return new Queue(QueueConstants.MESSAGE_QUEUE_RELEASE_INDEX_NAME);
    }

    @Bean
    public FanoutExchange fanoutExchangeRelease() {
        log.info( "創建了fanoutExchange交換機" );
        return new FanoutExchange( QueueConstants.MESSAGE_FANOUT_EXCHANGE);
    }

    @Bean
    public Binding fanoutExchangeCacheQueueBinding() {
        log.info( "將FanoutQueue cache 隊列綁定到交換機fanoutExchange" );
        return BindingBuilder.bind( createFanoutQueueCache() ).to( fanoutExchangeRelease() );
    }

    @Bean
    public Binding fanoutExchangeIndexQueueBinding() {
        log.info( "將FanoutQueue index 隊列綁定到交換機fanoutExchange" );
        return BindingBuilder.bind( createFanoutQueueIndex() ).to( fanoutExchangeRelease() );
    }
}

隊列常量信息

package cn.taxiong.release.constant;

/**
 * 隊列常量
 *
 * @author [email protected]
 * @create 2018-06-14 下午7:02
 **/
public interface QueueConstants {/**
     * 消息交換
     */
    String MESSAGE_FANOUT_EXCHANGE = "message.fanout.exchange";

    /**
     * 發布緩存消息隊列名稱
     */
    String MESSAGE_QUEUE_RELEASE_CACHE_NAME = "message.release.cache.queue";

    /**
     * 發布索引消息隊列名稱
     */
    String MESSAGE_QUEUE_RELEASE_INDEX_NAME = "message.release.index.queue";
}


緩存(cache)服務消費消息:

package cn.taxiong.release.message;

import cn.taxiong.release.constant.QueueConstants;
import cn.taxiong.release.service.OperateReleaseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

/**
 * 消息消費
 *
 * @author [email protected]
 * @create 2018-06-14 下午7:14
 **/
@Slf4j
@Component
@RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_RELEASE_CACHE_NAME)
public class MessageConsumer {

    @Autowired
    private OperateReleaseService operateReleaseService;

    @RabbitHandler
    public void handler(@Payload String message) {
        // operateReleaseService.storeReleaseRedisCache(message);
        log.info("緩存cache消息消費1:{}", message);
    }
}


索引(index)服務消費消息:

package cn.taxiong.release.message;

import cn.taxiong.release.constant.QueueConstants;
import cn.taxiong.release.service.OperateReleaseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

/**
 * 消息消費
 *
 * @author [email protected]
 * @create 2018-06-14 下午7:14
 **/
@Slf4j
@Component
@RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_RELEASE_INDEX_NAME)
public class MessageConsumer2 {

    @Autowired
    private OperateReleaseService operateReleaseService;

    @RabbitHandler
    public void handler(@Payload String message) {
        log.info("索引消息 index 消費2:{}", message);
    }
}

技術分享圖片

rabbitMQ應用,laravel生產廣播消息,springboot消費消息