1. 程式人生 > >spring boot整合用docker構建的rabbitmq

spring boot整合用docker構建的rabbitmq

docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management

或者不設定密碼

docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 rabbitmq:management

使用 docker inspect rabbit檢視容器的ip地址,
在這裡插入圖片描述
這個留著待會兒用spring boot連線的時候進行配置。

開啟瀏覽器 localhost:15672 來到rabbitmq的web管理介面,剛剛我們的rabbit容器是以預設的方式開啟的,所以賬號密碼都是 guest。
在這裡插入圖片描述

登入進去之後就設計佇列和交換機(類比計算機網路)
我設計了兩個佇列,分別是C.Queue 用來接收spider服務傳過來給web端的,spider_queue spider元件來接收web傳過來的專案。

編號 Exchange RoutingKey Queue 描述
1 C.Exchange C.* C.Queue web服務端監聽,訊息來自於spider服務
2 topicExchange spider.* spider_queue spider服務監聽,訊息來自於web服務端

字首最好設定成一樣的,這樣一旦佇列多了,方便維護,以免眼花
建好之後:
在這裡插入圖片描述

在這裡插入圖片描述

然後用spring boot來呼叫
在pom.xml加入:

<dependency
>
<groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.0.0.RELEASE</version> </dependency>

並在配置檔案中加入配置:
這是web服務的配置

spring.rabbitmq.host=172.17.0.3
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

com.shengqian.demo.rabbitmq.ToSpiderQueueName=C.Queue
com.shengqian.demo.rabbitmq.exchangename=C.Exchange
com.shengqian.demo.rabbitmq.routekey=C.*
com.shengqian.demo.rabbitmq.recv=recvMessage

下面來看看spider服務的配置

spring.rabbitmq.host=172.17.0.3
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

com.shengqian.demo.rabbitmq.ToSpiderQueueName=spider_queue
com.shengqian.demo.rabbitmq.exchangename=topicExchange
com.shengqian.demo.rabbitmq.routekey=spider.*
com.shengqian.demo.rabbitmq.recv=recvMessage

實際上沒什麼區別,唯一的區別就是監聽的佇列引數不一樣,詳細的都在那個表裡面了。

這裡的host就是剛剛我們檢視的docker 容器的ip。之所以能這麼做是因為我在我的路由表中加了一條路由,使得本機發出的172.17.0.0/16的ip能丟到docker的預設network中,有關詳情請參見我的 用springboot連線redis叢集一文,裡面有較為詳細的操作方法。

然後加入rabbitmq配置:

@Configuration
public class RabbitMQConfig {
    @Autowired
    private QueueService queueService;
   
    @Value("${com.shengqian.demo.rabbitmq.ToSpiderQueueName}")
    private String queueName;

    
    @Value("${com.shengqian.demo.rabbitmq.exchangename}")
    private String exchangeName;

   
    @Value("${com.shengqian.demo.rabbitmq.routekey}")
    private String routeKey;

    
    @Value("${com.shengqian.demo.rabbitmq.recv}")
    private String messageHandler;

    
    @Bean
    Queue getQueue() {
        return new Queue(queueName);
    }

   
    @Bean
    TopicExchange getTopicExchange() {
        return new TopicExchange(exchangeName);
    }



   
    @Bean
    Binding getBinding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(routeKey);
    }

   
    @Bean
    MessageListenerAdapter listenerAdapter(AmqpMessageService receiver) {
        return new MessageListenerAdapter(receiver, messageHandler);
    }

   
    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                             MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter);
        container.setConcurrentConsumers(1); 
        container.setMaxConcurrentConsumers(1);
        container.setPrefetchCount(1);
        container.setExposeListenerChannel(true);
        return container;
    }
}

現在就可以使用了,實現一個簡單的兩個服務之間用訊息佇列進行對話的demo:
在web端

public interface AmqpMessageService {
    /**
     * 接收訊息佇列訊息
     *
     * @param message 訊息內容
     */
    void recvMessage(String message);

    /**
     * 傳送訊息到訊息佇列
     *
     * @param message 訊息內容
     */
    void sendMessage(String routingKey, String message);
}

public interface QueueService extends AmqpMessageService {
    void receive(String message);
}


@Service
public class QueueServiceImpl implements QueueService, InitializingBean {

    @Autowired
    public RabbitTemplate rabbitTemplate;

    private final static Logger LOGGER = LoggerFactory.getLogger(QueueServiceImpl.class);

    public void receive(String message) {
        LOGGER.info("接收到來自C.Queue佇列的訊息:" + message);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        //執行一些初始化操作,如程式重新啟動,需要載入掃描引數
    }

    /**
     * 接收訊息佇列訊息
     *
     * @param message 訊息內容
     */
    @Override
    public void recvMessage(String message) {
        receive(message);
    }

    /**
     * 傳送訊息到訊息佇列
     *
     * @param message 訊息內容
     */
    @Override
    public void sendMessage(String routingKey,String message) {
        rabbitTemplate.convertAndSend("topicExchange", routingKey, JSON.toString(message));
    }

}
//最後定義一個controller
@Controller
public class TestController {

    @Autowired
    private QueueService queueService;

    @ApiOperation(value = "測試rabbit", notes = "測試rabbitmq")
    @RequestMapping(value = Path.TEST_RABBITMQ, method = RequestMethod.GET)
    @ResponseBody
    public Integer send(){
        Map<String, Object> ans = new HashMap<>();
        ans.put("hah", "hello spider");
        queueService.sendMessage("spider.*", JSON.toString(ans));
        return 1;
    }
}

然後在spider服務端:

public interface AmqpMessageService {
    /**
     * 接收訊息佇列訊息
     *
     * @param message 訊息內容
     */
    void recvMessage(String message);

    /**
     * 傳送訊息到訊息佇列
     *
     * @param message 訊息內容
     */
    void sendMessage(String routingKey, String message);
}


public interface QueueService extends AmqpMessageService {
    void receive(String message);
}

@Service
public class QueueServiceImpl implements QueueService, InitializingBean {

    @Autowired
    public RabbitTemplate rabbitTemplate;

    private final static Logger LOGGER = LoggerFactory.getLogger(QueueServiceImpl.class);

    public void receive(String message) {
        LOGGER.info("接收到來自spider_queue佇列的訊息:" + message);
        //處理一段時間然後,返回
        Map<String, Object> ans = new HashMap<String, Object>();
        ans.put("hello",message);
        sendMessage("C.*",JSON.toString(ans));
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        //執行一些初始化操作,如程式重新啟動,需要載入掃描引數
    }

    /**
     * 接收訊息佇列訊息
     *
     * @param message 訊息內容
     */
    @Override
    public void recvMessage(String message) {
        receive(message);
    }

    /**
     * 傳送訊息到訊息佇列
     *
     * @param message 訊息內容
     */
    @Override
    public void sendMessage(String routingKey, String message) {
        rabbitTemplate.convertAndSend("C.Exchange", routingKey, JSON.toString(message));
    }

}

最後分別開啟兩個spring boot服務即可,我的spider專案也是基於spring boot構建的服務,當訂閱了訊息佇列,有訊息來的時候會驅動整個服務繼續執行,所以說該服務是訊息驅動的服務。
開啟兩個服務後,我在web服務中構建了swagger2,
在這裡插入圖片描述
直接點開第三個介面執行就可以了,有關如何在springboot環境下整合swagger請看我前面的部落格。執行結果如下:
在這裡插入圖片描述

在這裡插入圖片描述

可以正常使用。接下來計劃完成spider專案與docker環境下的cockroachDB叢集的互動問題,cockroachDB是新一代newSql。