1. 程式人生 > >Micronaut 微服務中使用 Kafka

Micronaut 微服務中使用 Kafka

今天,我們將通過Apache Kafkatopic構建一些彼此非同步通訊的微服務。我們使用Micronaut框架,它為與Kafka整合提供專門的庫。讓我們簡要介紹一下示例系統的架構。我們有四個微型服務:訂單服務行程服務司機服務乘客服務。這些應用程式的實現非常簡單。它們都有記憶體儲存,並連線到同一個Kafka例項。

我們系統的主要目標是為客戶安排行程。訂單服務應用程式還充當閘道器。它接收來自客戶的請求,儲存歷史記錄並將事件傳送到orderstopic。所有其他微服務都在監聽orders這個topic,並處理order-service傳送的訂單。每個微服務都有自己的專用topic,其中傳送包含更改資訊的事件。此類事件由其他一些微服務接收。架構如下圖所示。

在閱讀本文之前,有必要熟悉一下Micronaut框架。您可以閱讀之前的一篇文章,該文章描述了通過REST API構建微服務通訊的過程:使用microaut框架構建微服務的快速指南。

1. 執行Kafka

要在本地機器上執行Apache Kafka,我們可以使用它的Docker映像。最新的映象是由https://hub.docker.com/u/wurstmeister共享的。在啟動Kafka容器之前,我們必須啟動kafka所用使用的ZooKeeper伺服器。如果在Windows上執行Docker,其虛擬機器的預設地址是192.168.99.100。它還必須設定為Kafka容器的環境。

Zookeeper

Kafka容器都將在同一個網路中啟動。在docker中執行Zookeeper以zookeeper的名稱提供服務,並在暴露2181埠。Kafka容器需要在環境變數使用KAFKA_ZOOKEEPER_CONNECT的地址。

$ docker network create kafka
$ docker run -d --name zookeeper --network kafka -p 2181:2181 wurstmeister/zookeeper
$ docker run -d --name kafka -p 9092:9092 --network kafka --env KAFKA_ADVERTISED_HOST_NAME=192.168.99.100 --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 wurstmeister/kafka

2. 引入Micronaut Kafka依賴

使用Kafka構建的microaut應用程式可以在HTTP伺服器存在的情況下啟動,也可以在不存在HTTP伺服器的情況下啟動。要啟用Micronaut Kafka,需要新增micronaut-kafka庫到依賴項。如果您想暴露HTTP API,您還應該新增micronaut-http-server-netty:

<dependency>
    <groupId>io.micronaut.configuration</groupId>
    <artifactId>micronaut-kafka</artifactId>
</dependency>
<dependency>
    <groupId>io.micronaut</groupId>
    <artifactId>micronaut-http-server-netty</artifactId>
</dependency>

3. 構建訂單微服務

訂單微服務是唯一一個啟動嵌入式HTTP伺服器並暴露REST API的應用程式。這就是為什麼我們可以為Kafka提供內建Micronaut健康檢查。要做到這一點,我們首先應該新增micronaut-management依賴:

<dependency>
    <groupId>io.micronaut</groupId>
    <artifactId>micronaut-management</artifactId>
</dependency>

為了方便起見,我們將通過在application.yml中定義以下配置來啟用所有管理端點並禁用它們的HTTP身份驗證。

endpoints:
  all:
    enabled: true
    sensitive: false

現在,可以在地址http://localhost:8080/health下使用health check。我們的示例應用程式還將暴露新增新訂單列出所有以前建立的訂單的簡單REST API。下面是暴露這些端點的Micronaut控制器實現:

@Controller("orders")
public class OrderController {

    @Inject
    OrderInMemoryRepository repository;
    @Inject
    OrderClient client;

    @Post
    public Order add(@Body Order order) {
        order = repository.add(order);
        client.send(order);
        return order;
    }

    @Get
    public Set<Order> findAll() {
        return repository.findAll();
    }

}

每個微服務都使用記憶體儲存庫實現。以下是訂單微服務(Order-Service)中的儲存庫實現:

@Singleton
public class OrderInMemoryRepository {

    private Set<Order> orders = new HashSet<>();

    public Order add(Order order) {
        order.setId((long) (orders.size() + 1));
        orders.add(order);
        return order;
    }

    public void update(Order order) {
        orders.remove(order);
        orders.add(order);
    }

    public Optional<Order> findByTripIdAndType(Long tripId, OrderType type) {
        return orders.stream().filter(order -> order.getTripId().equals(tripId) && order.getType() == type).findAny();
    }

    public Optional<Order> findNewestByUserIdAndType(Long userId, OrderType type) {
        return orders.stream().filter(order -> order.getUserId().equals(userId) && order.getType() == type)
                .max(Comparator.comparing(Order::getId));
    }

    public Set<Order> findAll() {
        return orders;
    }

}

記憶體儲存庫儲存Order物件例項。Order物件還被髮送到名為orders的Kafkatopic。下面是Order類的實現:

public class Order {

    private Long id;
    private LocalDateTime createdAt;
    private OrderType type;
    private Long userId;
    private Long tripId;
    private float currentLocationX;
    private float currentLocationY;
    private OrderStatus status;

    // ... GETTERS AND SETTERS
}

4. 使用Kafka非同步通訊

現在,讓我們想一個可以通過示例系統實現的用例——新增新的行程

我們建立了OrderType.NEW_TRIP型別的新訂單。在此之後,(1)訂單服務建立一個訂單並將其傳送到orderstopic。訂單由三個微服務接收:司機服務乘客服務行程服務
(2)所有這些應用程式都處理這個新訂單。乘客服務應用程式檢查乘客帳戶上是否有足夠的資金。如果沒有,它就取消了行程,否則什麼也做不了。司機服務正在尋找最近可用的司機,(3)行程服務建立和儲存新的行程。司機服務行程服務都將事件傳送到它們的topic(drivers, trips),其中包含相關更改的資訊。

每一個事件可以被其他microservices訪問,例如,(4)行程服務偵聽來自司機服務的事件,以便為行程分配一個新的司機

下圖說明了在新增新的行程時,我們的微服務之間的通訊過程。
現在,讓我們繼續討論實現細節。

4.1. 傳送訂單

首先,我們需要建立Kafka 客戶端,負責向topic傳送訊息。我們建立的一個介面,命名為OrderClient,為它新增@KafkaClient並宣告用於傳送訊息的一個或多個方法。每個方法都應該通過@Topic註解設定目標topic名稱。對於方法引數,我們可以使用三個註解@KafkaKey@Body@Header@KafkaKey用於分割槽,這是我們的示例應用程式所需要的。在下面可用的客戶端實現中,我們只使用@Body註解。

@KafkaClient
public interface OrderClient {

    @Topic("orders")
    void send(@Body Order order);

}

4.2. 接收訂單

一旦客戶端傳送了一個訂單,它就會被監聽orderstopic的所有其他微服務接收。下面是司機服務中的監聽器實現。監聽器類OrderListener應該新增@KafkaListener註解。我們可以宣告groupId作為一個註解引數,以防止單個應用程式的多個例項接收相同的訊息。然後,我們宣告用於處理傳入訊息的方法。與客戶端方法相同,應該通過@Topic註解設定目標topic名稱,因為我們正在監聽Order物件,所以應該使用@Body註解——與對應的客戶端方法相同。

@KafkaListener(groupId = "driver")
public class OrderListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);

    private DriverService service;

    public OrderListener(DriverService service) {
        this.service = service;
    }

    @Topic("orders")
    public void receive(@Body Order order) {
        LOGGER.info("Received: {}", order);
        switch (order.getType()) {
            case NEW_TRIP -> service.processNewTripOrder(order);
        }
    }

}

4.3. 傳送到其他topic

現在,讓我們看一下司機服務中的processNewTripOrder方法。DriverService注入兩個不同的Kafka Client
bean: OrderClientDriverClient。當處理新訂單時,它將試圖尋找與傳送訂單的乘客最近的司機。找到他之後,將該司機的狀態更改為UNAVAILABLE,並將帶有Driver物件的事件傳送到driverstopic。

@Singleton
public class DriverService {

    private static final Logger LOGGER = LoggerFactory.getLogger(DriverService.class);

    private DriverClient client;
    private OrderClient orderClient;
    private DriverInMemoryRepository repository;

    public DriverService(DriverClient client, OrderClient orderClient, DriverInMemoryRepository repository) {
        this.client = client;
        this.orderClient = orderClient;
        this.repository = repository;
    }

    public void processNewTripOrder(Order order) {
        LOGGER.info("Processing: {}", order);
        Optional<Driver> driver = repository.findNearestDriver(order.getCurrentLocationX(), order.getCurrentLocationY());
        driver.ifPresent(driverLocal -> {
            driverLocal.setStatus(DriverStatus.UNAVAILABLE);
            repository.updateDriver(driverLocal);
            client.send(driverLocal, String.valueOf(order.getId()));
            LOGGER.info("Message sent: {}", driverLocal);
        });
    }

    // ...
}

這是Kafka Client司機服務中的實現,用於向drivertopic傳送訊息。因為我們需要將DriverOrder 關聯起來,所以我們使用@Header註解 的orderId引數。沒有必要把它包括到Driver類中,將其分配給監聽器端的正確行程。

@KafkaClient
public interface DriverClient {

    @Topic("drivers")
    void send(@Body Driver driver, @Header("Order-Id") String orderId);

}

4.4. 服務間通訊

DriverListener收到@KafkaListener行程服務中宣告。它監聽傳入到triptopic。接收方法的引數和客戶端傳送方法的類似,如下所示:

@KafkaListener(groupId = "trip")
public class DriverListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);

    private TripService service;

    public DriverListener(TripService service) {
        this.service = service;
    }

    @Topic("drivers")
    public void receive(@Body Driver driver, @Header("Order-Id") String orderId) {
        LOGGER.info("Received: driver->{}, header->{}", driver, orderId);
        service.processNewDriver(driver);
    }

}

最後一步,將orderId查詢到的行程TripdriverId關聯,這樣整個流程就結束。

@Singleton
public class TripService {

    private static final Logger LOGGER = LoggerFactory.getLogger(TripService.class);

    private TripInMemoryRepository repository;
    private TripClient client;

    public TripService(TripInMemoryRepository repository, TripClient client) {
        this.repository = repository;
        this.client = client;
    }


    public void processNewDriver(Driver driver, String orderId) {
        LOGGER.info("Processing: {}", driver);
        Optional<Trip> trip = repository.findByOrderId(Long.valueOf(orderId));
        trip.ifPresent(tripLocal -> {
            tripLocal.setDriverId(driver.getId());
            repository.update(tripLocal);
        });
    }

    // ... OTHER METHODS

}

5. 跟蹤

我們可以使用Micronaut Kafka輕鬆地啟用分散式跟蹤。首先,我們需要啟用和配置Micronaut跟蹤。要做到這一點,首先應該新增一些依賴項:

<dependency>
    <groupId>io.micronaut</groupId>
    <artifactId>micronaut-tracing</artifactId>
</dependency>
<dependency>
    <groupId>io.zipkin.brave</groupId>
    <artifactId>brave-instrumentation-http</artifactId>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>io.zipkin.reporter2</groupId>
    <artifactId>zipkin-reporter</artifactId>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>io.opentracing.brave</groupId>
    <artifactId>brave-opentracing</artifactId>
</dependency>
<dependency>
    <groupId>io.opentracing.contrib</groupId>
    <artifactId>opentracing-kafka-client</artifactId>
    <version>0.0.16</version>
    <scope>runtime</scope>
</dependency>

我們還需要在application.yml配置檔案中,配置Zipkin 的追蹤的地址等。

tracing:
  zipkin:
    enabled: true
    http:
      url: http://192.168.99.100:9411
    sampler:
      probability: 1

在啟動應用程式之前,我們必須執行Zipkin容器:

$ docker run -d --name zipkin -p 9411:9411 openzipkin/zipkin

6. 總結

在本文中,您將瞭解通過Apache Kafka使用非同步通訊構建微服務架構的過程。我已經向大家展示了Microaut Kafka庫最重要的特性,它允許您輕鬆地宣告Kafkatopic的生產者和消費者,為您的微服務啟用健康檢查分散式跟蹤。我已經為我們的系統描述了一個簡單的場景的實現,包括根據客戶的請求新增一個新的行程。本示例系統的整體實現,請檢視GitHub上的原始碼

原文連結:https://piotrminkowski.wordpress.com/2019/08/06/kafka-in-microservices-with-micronaut/

作者:Piotr's

譯者:李東