1. 程式人生 > 其它 >Spring Cloud整合RabbitMQ的使用

Spring Cloud整合RabbitMQ的使用

同步 or 非同步

前言:我們現在有一個用微服務架構模式開發的系統,系統裡有一個商品服務和訂單服務,且它們都是同步通訊的。

目前我們商品服務和訂單服務之間的通訊方式是同步的,當業務擴大之後,如果還繼續使用同步的方式進行服務之間的通訊,會使得服務之間的耦合增大。例如我們登入操作可能需要同步呼叫使用者服務、積分服務、簡訊服務等等,而服務之間可能又依賴別的服務,那麼這樣一個登入過程就會耗費不少的時間,以致使用者的體驗降低。

那我們在微服務架構下要如何對服務之間的通訊進行解耦呢?這就需要使用到訊息中介軟體了,訊息中介軟體可以幫助我們將同步的通訊轉化為非同步通訊,服務之間只需要對訊息佇列進行訊息的釋出、訂閱即可,從而解耦服務之間的通訊依賴。

目前較為主流的訊息中介軟體:

  • RabbitMQ
  • Kafka
  • ActiveMQ

非同步通訊特點:

  • 客戶端請求不會阻塞程序,服務端的響應可以是非即時的

非同步的常見形態:

  • 推送通知
  • 請求/非同步響應
  • 訊息佇列

MQ應用場景:

  • 非同步處理
  • 流量削峰
  • 日誌處理
  • 應用解耦

更多關於訊息中介軟體的描述,可以參考我另一篇文章:

RabbitMQ的基本使用

在上文 Spring Cloud Config - 統一配置中心 中,已經演示過使用Docker安裝RabbitMQ,所以這裡就不再浪費篇幅演示了。

直接進入正題,我們以訂單服務和商品服務示例,首先在訂單服務的專案中,加入mq的依賴:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在配置檔案中增加RabbitMQ的相關配置項:

到訂單服務的專案中,新建一個message包,在該包中建立一個MqReceiver類,我們來看看RabbitMQ的基本操作。程式碼如下:

package org.zero.springcloud.order.server.message;
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
/**
 * @program: sell_order
 * @description: 接收訊息,即消費者
 * @author: 01
 * @create: 2018-08-21 22:24
 **/
@Slf4j
@Component
public class MqReceiver {
 
    /**
     * 接收訊息並列印
     *
     * @param message message
     */
    @RabbitListener(queues = "myQueue")
    public void process(String message) {
        // @RabbitListener註解用於監聽RabbitMQ,queues指定監聽哪個佇列
        log.info(message);
    }
}

因為RabbitMQ上還沒有myQueue這個佇列,所以我們還得到RabbitMQ的管理介面上,建立這個佇列,如下:

然後新建一個測試類,用於傳送訊息到佇列中,程式碼如下:

package org.zero.springcloud.order.server;
 
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
 
/**
 * @program: sell_order
 * @description: 傳送訊息,即訊息釋出者
 * @author: 01
 * @create: 2018-08-21 22:28
 **/
@RunWith(SpringRunner.class)
@SpringBootTest
public class MqSenderTest {
 
    @Autowired
    private AmqpTemplate amqpTemplate;
 
    @Test
    public void send() {
        for (int i = 0; i < 100; i++) {
            amqpTemplate.convertAndSend("myQueue", "第" + i + "條訊息");
        }
    }
}

執行該測試類,執行成功後到OrderApplication的控制檯上,看看是否接收並列印了接收到的訊息。正常情況應如下:

基本的消費者和釋出者的程式碼我們都已經編寫過,並且也測試成功了。但有個小問題,我們要監聽一個不存在的佇列時,需要手動去新建這個佇列,感覺每次都手動新建挺麻煩的。有沒有辦法當佇列不存在時,自動建立該佇列呢?答案是有的,依舊使用之前的那個註解,只不過這次的引數要換成queuesToDeclare。示例程式碼如下:

package org.zero.springcloud.order.server.message;
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
/**
 * @program: sell_order
 * @description: 接收訊息,即消費者
 * @author: 01
 * @create: 2018-08-21 22:24
 **/
@Slf4j
@Component
public class MqReceiver {
 
    /**
     * 接收並列印訊息
     * 可以當佇列不存在時自動建立佇列
     *
     * @param message message
     */
    @RabbitListener(queuesToDeclare = @Queue("myQueue"))
    public void process2(String message) {
        // @RabbitListener註解用於監聽RabbitMQ,queuesToDeclare可以建立指定的佇列
        log.info(message);
    }
}

以上我們通過示例簡單的介紹了訊息的收發及佇列的建立,本小節則介紹一下exchange 的自動繫結方式。當需要自動繫結 exchange 時,我們也可以通過 bindings 引數完成。示例程式碼如下:

package org.zero.springcloud.order.server.message;
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
/**
 * @program: sell_order
 * @description: 接收訊息,即消費者
 * @author: 01
 * @create: 2018-08-21 22:24
 **/
@Slf4j
@Component
public class MqReceiver {
 
    /**
     * 接收並列印訊息
     * 可以當佇列不存在時自動建立佇列,以及自動繫結指定的Exchange
     * @param message message
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("myQueue"),
            exchange = @Exchange("myExchange")
    ))
    public void process3(String message) {
        // @RabbitListener註解用於監聽RabbitMQ,bindings可以建立指定的佇列及自動繫結Exchange
        log.info(message);
    }
}

訊息分組我們也是可以通過 bindings 引數完成,例如現在有一個數碼供應商服務和一個水果供應商服務,它們都監聽著同一個訂單服務的訊息佇列。但我希望數碼訂單的訊息被數碼供應商服務消費,而水果訂單的訊息被水果供應商服務消費。所以我們就需要用到訊息分組。示例程式碼如下:

package org.zero.springcloud.order.server.message;
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
/**
 * @program: sell_order
 * @description: 接收訊息,即消費者
 * @author: 01
 * @create: 2018-08-21 22:24
 **/
@Slf4j
@Component
public class MqReceiver {
 
    /**
     * 數碼供應商服務 - 接收訊息
     *
     * @param message message
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("computerOrder"),
            exchange = @Exchange("myOrder"),
            key = "computer"  // 指定路由的key
    ))
    public void processComputer(String message) {
        log.info("computer message : {}", message);
    }
 
    /**
     * 水果供應商服務 - 接收訊息
     *
     * @param message message
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("computerOrder"),
            exchange = @Exchange("myOrder"),
            key = "fruit"  // 指定路由的key
    ))
    public void processFruit(String message) {
        log.info("fruit message : {}", message);
    }
}

測試程式碼如下,通過指定key進行訊息的分組,將訊息傳送到數碼供應商服務:

package org.zero.springcloud.order.server;
 
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
 
/**
 * @program: sell_order
 * @description: 傳送訊息,即訊息釋出者
 * @author: 01
 * @create: 2018-08-21 22:28
 **/
@RunWith(SpringRunner.class)
@SpringBootTest
public class MqSenderTest {
 
    @Autowired
    private AmqpTemplate amqpTemplate;
 
    @Test
    public void sendOrder() {
        for (int i = 0; i < 100; i++) {
            // 第一個引數指定佇列,第二個引數來指定路由的key,第三個引數指定訊息
            amqpTemplate.convertAndSend("myOrder", "computer", "第" + i + "條訊息");
        }
    }
}

重啟專案後,執行以上測試程式碼,控制檯輸出如下,可以看到只有數碼供應商服務才能夠接收到訊息,而水果供應商服務是接收不到的。這就完成了訊息分組:


Spring Cloud Stream的使用

Spring Cloud Stream 是一個用來為微服務應用構建訊息驅動能力的框架。它可以基於Spring Boot 來建立獨立的,可用於生產的Spring 應用程式。他通過使用Spring Integration來連線訊息代理中介軟體以實現訊息事件驅動。Spring Cloud Stream 為一些供應商的訊息中介軟體產品提供了個性化的自動化配置實現,引用了釋出-訂閱、消費組、分割槽的三個核心概念。目前僅支援RabbitMQ、Kafka。

什麼是Spring Integration ? Integration 整合

企業應用整合(EAI)是整合應用之間資料和服務的一種應用技術。四種整合風格:

  1. ​ 檔案傳輸:兩個系統生成檔案,檔案的有效負載就是由另一個系統處理的訊息。該類風格的例子之一是針對檔案輪詢目錄或FTP目錄,並處理該檔案。
  2. 共享資料庫:兩個系統查詢同一個資料庫以獲取要傳遞的資料。一個例子是你部署了兩個EAR應用,它們的實體類(JPA、Hibernate等)共用同一個表。
  3. 遠端過程呼叫:兩個系統都暴露另一個能呼叫的服務。該類例子有EJB服務,或SOAP和REST服務。
  4. 訊息:兩個系統連線到一個公用的訊息系統,互相交換資料,並利用訊息呼叫行為。該風格的例子就是眾所周知的中心輻射式的(hub-and-spoke)JMS架構。

Spring Integration作為一種企業級整合框架,遵從現代經典書籍《企業整合模式》,為開發者提供了一種便捷的實現模式。Spring Integration構建在Spring控制反轉設計模式之上,抽象了訊息源和目標,利用訊息傳送和訊息操作來整合應用環境下的各種元件。訊息和整合關注點都被框架處理,所以業務元件能更好地與基礎設施隔離,從而降低開發者所要面對的複雜的整合職責。

模型圖:

現在我們來看看Spring Cloud Stream的基本使用,到訂單服務專案上,增加如下依賴:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

然後是在配置檔案中,配置rabbitmq的相關資訊,只不過我們之前已經配置過了所以不用配置了。

我們來看看如何使用Spring Cloud Stream傳送和接收訊息,首先建立一個介面,定義input和output方法。程式碼如下:

package org.zero.springcloud.order.server.message;
 
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
 
public interface StreamClient {
 
    // 接收訊息、入口
    @Input("myMessageInput")
    SubscribableChannel input();
 
    // 傳送訊息、
    @Output("myMessageOutput")
    MessageChannel output();
}

建立一個訊息接收者。程式碼如下:

package org.zero.springcloud.order.server.message;
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;
 
/**
 * @program: sell_order
 * @description: 訊息接收者
 * @author: 01
 * @create: 2018-08-22 22:16
 **/
@Slf4j
@Component
@EnableBinding(StreamClient.class)
public class StreamReceiver {
 
    @StreamListener("myMessageOutput")
    public void process(String message) {
        log.info("message : {}", message);
    }
}

訊息傳送者,這裡作為一個Controller存在。程式碼如下:

package org.zero.springcloud.order.server.controller;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.zero.springcloud.order.server.message.StreamClient;
 
/**
 * @program: sell_order
 * @description: 訊息傳送者
 * @author: 01
 * @create: 2018-08-22 22:18
 **/
@RestController
public class SendMessageController {
 
    private final StreamClient streamClient;
 
    @Autowired
    public SendMessageController(StreamClient streamClient) {
        this.streamClient = streamClient;
    }
 
    @GetMapping("/send/msg")
    public void send() {
        for (int i = 0; i < 100; i++) {
            MessageBuilder<String> messageBuilder = MessageBuilder.withPayload("這是第" + i + "條訊息");
            streamClient.output().send(messageBuilder.build());
        }
    }
}

因為我們的微服務可能會部署多個例項,若有多個例項需要對訊息進行分組,否則所有的服務例項都會接收到相同的訊息。在配置檔案中,增加如下配置完成訊息的分組:

spring:
  ...
  cloud:
    ...
    stream:
      bindings:
        myMessageOutput:
          group: order
...

重啟專案,訪問http://localhost:9080/send/msg,控制檯輸出如下:

注:Spring Cloud Stream可以在專案啟動的時候自動建立佇列,在專案關閉的時候自動刪除佇列

在實際的開發中,我們一般傳送的訊息通常會是一個java物件而不是字串。所以我們來看看如何傳送物件,其實和傳送字串幾乎是一樣的。訊息傳送者程式碼如下:

package org.zero.springcloud.order.server.controller;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.zero.springcloud.order.server.dto.OrderDTO;
import org.zero.springcloud.order.server.message.StreamClient;
 
/**
 * @program: sell_order
 * @description: 訊息傳送者
 * @author: 01
 * @create: 2018-08-22 22:18
 **/
@RestController
public class SendMessageController {
 
    private final StreamClient streamClient;
 
    @Autowired
    public SendMessageController(StreamClient streamClient) {
        this.streamClient = streamClient;
    }
 
    /**
     * 傳送OrderDTO物件
     */
    @GetMapping("/send/msg")
    public void send() {
        OrderDTO orderDTO = new OrderDTO();
        orderDTO.setOrderId("123465");
 
        MessageBuilder<OrderDTO> messageBuilder = MessageBuilder.withPayload(orderDTO);
        streamClient.output().send(messageBuilder.build());
    }
}

訊息接收者也只需要在方法引數上宣告這個物件的型別即可。程式碼如下:

package org.zero.springcloud.order.server.message;
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;
import org.zero.springcloud.order.server.dto.OrderDTO;
 
/**
 * @program: sell_order
 * @description: 訊息接收者
 * @author: 01
 * @create: 2018-08-22 22:16
 **/
@Slf4j
@Component
@EnableBinding(StreamClient.class)
public class StreamReceiver {
 
    /**
     * 接收OrderDTO物件
     * @param message message
     */
    @StreamListener("myMessageOutput")
    public void process(OrderDTO message) {
        log.info("message : {}", message);
    }
}

另外需要提到的一點是,預設情況下,java物件在訊息佇列中是以base64編碼存在的,我們也都知道base64不可讀。為了方便檢視堆積在訊息佇列裡的物件資料,我們希望java物件是以json格式的字串呈現,這樣就方便我們人類閱讀。至於這個問題,我們只需要在配置檔案中,增加一段content-type的配置即可。如下:

spring:
  ...
  cloud:
    ...
    stream:
      bindings:
        myMessageOutput:
          group: order
          content-type: application/json
...

重啟專案,訪問http://localhost:9080/send/msg,控制檯輸出如下:

2018-08-22 23:32:33.704  INFO 12436 --- [nio-9080-exec-4] o.z.s.o.server.message.StreamReceiver    
: message : OrderDTO(orderId=123465, buyerName=null, buyerPhone=null, buyerAddress=null, buyerOpenid=null, 
orderAmount=null, orderStatus=null, payStatus=null, createTime=null, updateTime=null, orderDetailList=null)

當我們接收到訊息的時候,可能會需要返回一段特定的訊息,表示訊息已收到之類的。至於這個功能,我們通過@SendTo註解即可完成。程式碼如下:

package org.zero.springcloud.order.server.message;
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
import org.zero.springcloud.order.server.dto.OrderDTO;
 
/**
 * @program: sell_order
 * @description: 訊息接收者
 * @author: 01
 * @create: 2018-08-22 22:16
 **/
@Slf4j
@Component
@EnableBinding(StreamClient.class)
public class StreamReceiver {
 
    /**
     * 接收OrderDTO物件
     * @param message message
     */
    @StreamListener("myMessageOutput")
    @SendTo("myMessageInput")
    public String process(OrderDTO message) {
        log.info("message : {}", message);
 
        return "success";
    }
 
    @StreamListener("myMessageInput")
    public void success(String message) {
        log.info("message : {}", message);
    }
}

重啟專案,訪問http://localhost:9080/send/msg,控制檯輸出如下:

Spring Cloud Stream 再一次簡化了我們在分散式環境下對訊息中介軟體的操作,配置好訊息中介軟體的連線地址及使用者密碼後,在開發的過程中,我們只需要關注input和output,對訊息中介軟體的操作基本是無感知的。