Micronaut 微服務中使用 Kafka
今天,我們將通過Apache Kafka
topic構建一些彼此非同步通訊的微服務。我們使用Micronaut
框架,它為與Kafka
整合提供專門的庫。讓我們簡要介紹一下示例系統的架構。我們有四個微型服務:訂單服務
,行程服務
,司機服務
和乘客服務
。這些應用程式的實現非常簡單。它們都有記憶體儲存,並連線到同一個Kafka
例項。
我們系統的主要目標是為客戶安排行程。訂單服務應用程式還充當閘道器。它接收來自客戶的請求,儲存歷史記錄並將事件傳送到orders
topic。所有其他微服務都在監聽orders
這個topic,並處理order-service
傳送的訂單。每個微服務都有自己的專用topic,其中傳送包含更改資訊的事件。此類事件由其他一些微服務接收。架構如下圖所示。
在閱讀本文之前,有必要熟悉一下Micronaut
框架。您可以閱讀之前的一篇文章,該文章描述了通過REST API構建微服務通訊的過程
:使用microaut框架構建微服務的快速指南。
1. 執行Kafka
要在本地機器上執行Apache Kafka
,我們可以使用它的Docker映像。最新的映象是由https://hub.docker.com/u/wurstmeister共享的。在啟動Kafka
容器之前,我們必須啟動kafka
所用使用的ZooKeeper
伺服器。如果在Windows
上執行Docker
,其虛擬機器的預設地址是192.168.99.100
。它還必須設定為Kafka
容器的環境。
Zookeeper
Kafka
容器都將在同一個網路中啟動。在docker中執行Zookeeper以zookeeper
的名稱提供服務,並在暴露2181
埠。Kafka
容器需要在環境變數使用KAFKA_ZOOKEEPER_CONNECT
的地址。
$ docker network create kafka $ docker run -d --name zookeeper --network kafka -p 2181:2181 wurstmeister/zookeeper $ docker run -d --name kafka -p 9092:9092 --network kafka --env KAFKA_ADVERTISED_HOST_NAME=192.168.99.100 --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 wurstmeister/kafka
2. 引入Micronaut Kafka依賴
使用Kafka
構建的microaut
應用程式可以在HTTP伺服器存在的情況下啟動,也可以在不存在HTTP伺服器的情況下啟動。要啟用Micronaut Kafka
,需要新增micronaut-kafka
庫到依賴項。如果您想暴露HTTP API
,您還應該新增micronaut-http-server-netty
:
<dependency>
<groupId>io.micronaut.configuration</groupId>
<artifactId>micronaut-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-http-server-netty</artifactId>
</dependency>
3. 構建訂單微服務
訂單微服務
是唯一一個啟動嵌入式HTTP伺服器並暴露REST API
的應用程式。這就是為什麼我們可以為Kafka
提供內建Micronaut
健康檢查。要做到這一點,我們首先應該新增micronaut-management
依賴:
<dependency>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-management</artifactId>
</dependency>
為了方便起見,我們將通過在application.yml
中定義以下配置來啟用所有管理端點並禁用它們的HTTP身份驗證。
endpoints:
all:
enabled: true
sensitive: false
現在,可以在地址http://localhost:8080/health下使用health check
。我們的示例應用程式還將暴露新增新訂單
和列出所有以前建立的訂單
的簡單REST API
。下面是暴露這些端點的Micronaut
控制器實現:
@Controller("orders")
public class OrderController {
@Inject
OrderInMemoryRepository repository;
@Inject
OrderClient client;
@Post
public Order add(@Body Order order) {
order = repository.add(order);
client.send(order);
return order;
}
@Get
public Set<Order> findAll() {
return repository.findAll();
}
}
每個微服務都使用記憶體儲存庫實現。以下是訂單微服務(Order-Service)
中的儲存庫實現:
@Singleton
public class OrderInMemoryRepository {
private Set<Order> orders = new HashSet<>();
public Order add(Order order) {
order.setId((long) (orders.size() + 1));
orders.add(order);
return order;
}
public void update(Order order) {
orders.remove(order);
orders.add(order);
}
public Optional<Order> findByTripIdAndType(Long tripId, OrderType type) {
return orders.stream().filter(order -> order.getTripId().equals(tripId) && order.getType() == type).findAny();
}
public Optional<Order> findNewestByUserIdAndType(Long userId, OrderType type) {
return orders.stream().filter(order -> order.getUserId().equals(userId) && order.getType() == type)
.max(Comparator.comparing(Order::getId));
}
public Set<Order> findAll() {
return orders;
}
}
記憶體儲存庫儲存Order
物件例項。Order
物件還被髮送到名為orders
的Kafkatopic。下面是Order
類的實現:
public class Order {
private Long id;
private LocalDateTime createdAt;
private OrderType type;
private Long userId;
private Long tripId;
private float currentLocationX;
private float currentLocationY;
private OrderStatus status;
// ... GETTERS AND SETTERS
}
4. 使用Kafka非同步通訊
現在,讓我們想一個可以通過示例系統實現的用例——新增新的行程
。
我們建立了OrderType.NEW_TRIP
型別的新訂單。在此之後,(1)訂單服務
建立一個訂單並將其傳送到orders
topic。訂單由三個微服務接收:司機服務
、乘客服務
和行程服務
。
(2)所有這些應用程式都處理這個新訂單。乘客服務
應用程式檢查乘客帳戶上是否有足夠的資金。如果沒有,它就取消了行程,否則什麼也做不了。司機服務
正在尋找最近可用的司機,(3)行程服務
建立和儲存新的行程。司機服務
和行程服務
都將事件傳送到它們的topic(drivers
, trips
),其中包含相關更改的資訊。
每一個事件可以被其他microservices
訪問,例如,(4)行程服務
偵聽來自司機服務
的事件,以便為行程分配一個新的司機
下圖說明了在新增新的行程時,我們的微服務之間的通訊過程。
現在,讓我們繼續討論實現細節。
4.1. 傳送訂單
首先,我們需要建立Kafka 客戶端,負責向topic傳送訊息。我們建立的一個介面,命名為OrderClient
,為它新增@KafkaClient
並宣告用於傳送訊息的一個或多個方法。每個方法都應該通過@Topic
註解設定目標topic名稱。對於方法引數,我們可以使用三個註解@KafkaKey
、@Body
或@Header
。@KafkaKey
用於分割槽,這是我們的示例應用程式所需要的。在下面可用的客戶端實現中,我們只使用@Body
註解。
@KafkaClient
public interface OrderClient {
@Topic("orders")
void send(@Body Order order);
}
4.2. 接收訂單
一旦客戶端傳送了一個訂單,它就會被監聽orders
topic的所有其他微服務接收。下面是司機服務
中的監聽器實現。監聽器類OrderListener
應該新增@KafkaListener
註解。我們可以宣告groupId
作為一個註解引數,以防止單個應用程式的多個例項接收相同的訊息。然後,我們宣告用於處理傳入訊息的方法。與客戶端方法相同,應該通過@Topic
註解設定目標topic名稱,因為我們正在監聽Order
物件,所以應該使用@Body
註解——與對應的客戶端方法相同。
@KafkaListener(groupId = "driver")
public class OrderListener {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);
private DriverService service;
public OrderListener(DriverService service) {
this.service = service;
}
@Topic("orders")
public void receive(@Body Order order) {
LOGGER.info("Received: {}", order);
switch (order.getType()) {
case NEW_TRIP -> service.processNewTripOrder(order);
}
}
}
4.3. 傳送到其他topic
現在,讓我們看一下司機服務
中的processNewTripOrder
方法。DriverService
注入兩個不同的Kafka Client
bean: OrderClient
和DriverClient
。當處理新訂單時,它將試圖尋找與傳送訂單的乘客最近的司機。找到他之後,將該司機的狀態更改為UNAVAILABLE
,並將帶有Driver
物件的事件傳送到drivers
topic。
@Singleton
public class DriverService {
private static final Logger LOGGER = LoggerFactory.getLogger(DriverService.class);
private DriverClient client;
private OrderClient orderClient;
private DriverInMemoryRepository repository;
public DriverService(DriverClient client, OrderClient orderClient, DriverInMemoryRepository repository) {
this.client = client;
this.orderClient = orderClient;
this.repository = repository;
}
public void processNewTripOrder(Order order) {
LOGGER.info("Processing: {}", order);
Optional<Driver> driver = repository.findNearestDriver(order.getCurrentLocationX(), order.getCurrentLocationY());
driver.ifPresent(driverLocal -> {
driverLocal.setStatus(DriverStatus.UNAVAILABLE);
repository.updateDriver(driverLocal);
client.send(driverLocal, String.valueOf(order.getId()));
LOGGER.info("Message sent: {}", driverLocal);
});
}
// ...
}
這是Kafka Client
在司機服務
中的實現,用於向driver
topic傳送訊息。因為我們需要將Driver
與Order
關聯起來,所以我們使用@Header
註解 的orderId
引數。沒有必要把它包括到Driver
類中,將其分配給監聽器端的正確行程。
@KafkaClient
public interface DriverClient {
@Topic("drivers")
void send(@Body Driver driver, @Header("Order-Id") String orderId);
}
4.4. 服務間通訊
由DriverListener
收到@KafkaListener
在行程服務
中宣告。它監聽傳入到trip
topic。接收方法的引數和客戶端傳送方法的類似,如下所示:
@KafkaListener(groupId = "trip")
public class DriverListener {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);
private TripService service;
public DriverListener(TripService service) {
this.service = service;
}
@Topic("drivers")
public void receive(@Body Driver driver, @Header("Order-Id") String orderId) {
LOGGER.info("Received: driver->{}, header->{}", driver, orderId);
service.processNewDriver(driver);
}
}
最後一步,將orderId
查詢到的行程Trip
與driverId
關聯,這樣整個流程就結束。
@Singleton
public class TripService {
private static final Logger LOGGER = LoggerFactory.getLogger(TripService.class);
private TripInMemoryRepository repository;
private TripClient client;
public TripService(TripInMemoryRepository repository, TripClient client) {
this.repository = repository;
this.client = client;
}
public void processNewDriver(Driver driver, String orderId) {
LOGGER.info("Processing: {}", driver);
Optional<Trip> trip = repository.findByOrderId(Long.valueOf(orderId));
trip.ifPresent(tripLocal -> {
tripLocal.setDriverId(driver.getId());
repository.update(tripLocal);
});
}
// ... OTHER METHODS
}
5. 跟蹤
我們可以使用Micronaut Kafka
輕鬆地啟用分散式跟蹤。首先,我們需要啟用和配置Micronaut
跟蹤。要做到這一點,首先應該新增一些依賴項:
<dependency>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-tracing</artifactId>
</dependency>
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-instrumentation-http</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.opentracing.brave</groupId>
<artifactId>brave-opentracing</artifactId>
</dependency>
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-kafka-client</artifactId>
<version>0.0.16</version>
<scope>runtime</scope>
</dependency>
我們還需要在application.yml
配置檔案中,配置Zipkin 的追蹤的地址等。
tracing:
zipkin:
enabled: true
http:
url: http://192.168.99.100:9411
sampler:
probability: 1
在啟動應用程式之前,我們必須執行Zipkin
容器:
$ docker run -d --name zipkin -p 9411:9411 openzipkin/zipkin
6. 總結
在本文中,您將瞭解通過Apache Kafka
使用非同步通訊構建微服務架構的過程。我已經向大家展示了Microaut Kafka
庫最重要的特性,它允許您輕鬆地宣告Kafka
topic的生產者和消費者,為您的微服務啟用健康檢查
和分散式跟蹤
。我已經為我們的系統描述了一個簡單的場景的實現,包括根據客戶的請求新增一個新的行程。本示例系統的整體實現,請檢視GitHub上的原始碼
原文連結:https://piotrminkowski.wordpress.com/2019/08/06/kafka-in-microservices-with-micronaut/
作者:Piotr's
譯者:李東