Spring整合Kafka中的事務
原文連結:https://docs.spring.io/spring-kafka/reference/htmlsingle/#transactions
事務Transactions
Kafka0.11.0.0版本客戶端提供了事務支援。Spring for Apache Kafka通過如下幾種方式提供事務支援:
KafkaTransactionManager
-和普通的Spring事務支援一起使用(@Transactional,TransactionTemplate等等)。- 事務性的
KafkaMessageListenerContainer
。 - 通過
KafkaTemplate
實現本地事務。
通過給DefaultKafkaProducerFactory
提供一個事務id字首transactionIdPrefix
開啟事務。開啟事務後,生產者工廠快取一些事務性的生產者(transactional producers),而不是管理一個共享的生產者Producer
。當用戶使用close()
方法關閉一個生產者時,它並沒有真正被關閉,而是被放回快取中複用。每個生產者的transactional.id
transactionIdPrefix
+n
,n從0開始,每個生產者自增。
Kafka事務管理器KafkaTransactionManager
KafkaTransactionManager
是Spring框架中的平臺事務管理器PlatformTransactionManager
介面的實現,在KafkaTransactionManager
構造器中需要提供一個生產者工廠引用。如果你提供一個自定義的生產者工廠,它必須支援事務,參考ProducerFactory.transactionCapable()
。
你可以和Spring事務支援(@Transactional,TransactionTemplate等等)一起使用KafkaTransactionManager
KafkaTemplate
操作都將使用這個事務內的生產者Producer
。事務管理器將根據成功或失敗來決定提交還是回滾事務。注意KafkaTemplate
必須和事務管理器使用同樣的生產者工廠ProducerFactory
。
事務性的監聽器容器Transactional Listener Container
你可以給監聽器容器(listener container)提供一個KafkaTransactionManager
例項,當這麼配置的時候,容器在呼叫監聽器之前會開啟事務。如果監聽器成功處理一條記錄(或者一批記錄,當使用BatchMessageListener
的時候),容器將在事務管理器提交事務前,使用producer.sendOffsetsToTransaction())
給事務傳送偏移量(offset(s)
)。如果監聽器丟擲異常,事務會回滾,下次拉取(poll
)的時候消費者仍可以消費到之前出錯的記錄。
事務同步Transaction Synchronization
如果你需要用其他事務來同步Kafka事務,只需要簡單地給監聽器容器(listener container)配置合適的事務管理器(一個支援同步的事務管理器,比如DataSourceTransactionManager
)。