通過Spring Boot Webflux實現Reactor Kafka
我們將使用兩個小型示例應用程序,Paymentprocessor Gateway和PaymentValidator。這些應用程序的代碼可以在這裏找到。
Paymentprocessor網關提供了一個小網頁,可以生成一個隨機的信用卡號碼(顯然是偽造的),以及支付金額。當用戶單擊提交按鈕時,表單將提交給網關的API。API具有針對Kafka群集上的未確認事務主題的反應流,這個未確認事務的主題的另外一邊消費者是PaymentValidator,監聽要驗證的傳入消息。然後,這些消息通過響應管道,驗證方法將其打印到命令行。
通過Reactive Streams向Kafka發送消息
我們的應用程序構建在Spring 5和Spring Boot 2之上,使我們能夠快速設置和使用Project Reactor。
Gateway應用程序的目標是設置從Web控制器到Kafka集群的Reactive流。這意味著我們需要特定的依賴關系來彈簧webflux和reactor-kafka。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> <dependency> <groupId>io.projectreactor.kafka</groupId> <artifactId>reactor-kafka</artifactId> <version>1.1.0.RELEASE</version> </dependency>
Spring Webflux RestController提供支付API,為paymentGateway類的doPayment方法創建一個Reactive流。
/ ** *調用返回的Mono將被發送到Spring Webflux,後者依賴於multi-reactor 事件循環和NIO *以非阻塞方式處理請求,從而實現更多的並發請求。結果將 通過一個名為Server Sent Events 發送。 ** / @PostMapping(value = "/payment") public Mono<Void> doPayment(@RequestBody CreatePaymentCommand payment) { / ** 當調用doPayment方法時,我們發送付款信息,獲得Mono <Void>作為響應。 當我們的付款成功發送事件到Kafka主題 ** / return paymentGateway.doPayment(payment); }
paymentGateway需要一個kafkaProducer,它使我們能夠將消息作為管道的一部分放在Kafka主題中。它可以使用KafkaSender.create方法輕松創建,傳遞許多生產者選項。
public PaymentGatewayImpl() {
final Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
final SenderOptions<Integer, String> producerOptions = SenderOptions.create(producerProps);
kafkaProducer = KafkaSender.create(producerOptions);
}
創建之後,kafkaProducer可以用來輕松地將我們的消息發送到選擇的Kafka主題,成為控制器中啟動的管道的一部分。因為消息是以非阻塞方式發送到Kafka集群的,所以我們可以使用項目Reactor的事件循環接收並將來自Web API的大量並發消息路由到Kafka。
@Override
public Mono<Void> doPayment(final CreatePaymentCommand createPayment) {
final PaymentEvent payment = new PaymentEvent(createPayment.getId(), createPayment.getCreditCardNumber(), createPayment.getAmount(), gatewayName);
String payload = toBinary(payment);
SenderRecord<Integer, String, Integer> message = SenderRecord.create(new ProducerRecord<>("unconfirmed-transactions", payload), 1);
return kafkaProducer.send(Mono.just(message)).next();
}
private String toBinary(Object object) {
try {
return objectMapper.writeValueAsString(object);
}
catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
}
}
從Kafka主題創建反應流
當沒有消費者監聽時,向主題發送消息沒有多大意義,因此我們的第二個應用程序將使用一個反應管道來監聽未確認的事務主題。為此,使用KafkaReceiver.create方法創建kafkaReceiver對象,類似於我們之前創建kafkaProducer的方法。
通過使用kafkaReceiver.receive方法,我們可以獲得receiverRecords的Flux。進入我們讀取的主題中每條消息都放入receiverRecord中。流入應用程序後,它們會進一步通過反應管道。然後,這些消息傳遞processEvent方法,該方法調用paymentValidator,該方法將一些信息輸出到控制臺。最後,在receiverOffset上調用acknowledge方法,向Kafka集群發送一條消息已被處理的確認。
public PaymentValidatorListenerImpl(PaymentValidator paymentValidator) {
this.paymentValidator = paymentValidator;
final Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "payment-validator-1");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-validator");
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
ReceiverOptions<Object, Object> consumerOptions = ReceiverOptions.create(consumerProps)
.subscription(Collections.singleton("unconfirmed-transactions"))
.addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
.addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));
kafkaReceiver = KafkaReceiver.create(consumerOptions);
/**
* We create a receiver for new unconfirmed transactions
*/
((Flux<ReceiverRecord>) kafkaReceiver.receive())
.doOnNext(r -> {
/**
* Each unconfirmed payment we receive, we convert to a PaymentEvent and process it
*/
final PaymentEvent paymentEvent = fromBinary((String) r.value(), PaymentEvent.class);
processEvent(paymentEvent);
r.receiverOffset().acknowledge();
}
)
.subscribe();
}
private void processEvent(PaymentEvent paymentEvent) {
paymentValidator.calculateResult(paymentEvent);
}
private <T> T fromBinary(String object, Class<T> resultType) {
try {
return objectMapper.readValue(object, resultType);
}
catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
通過Spring Boot Webflux實現Reactor Kafka