kafka和springboot以及如何使用事務
阿新 • • 發佈:2020-08-25
- 1、引入依賴
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.4.RELEASE</version> </dependency>
//這個是kafka的<dependency>
<groupId>org.slf4j</groupId>
</dependency>
//這個是slf4j的 - 2、配置檔案:
server: port: 9001 servlet: context-path: /kafka spring: kafka: producer: bootstrap-servers: 10.203.103.66:9092 //生產者的
consumer:
bootstrap-servers: 10.203.103.66:9092 //這個是消費者的
logging:
root: INFO
//紅色的是kafka的配置,藍色的是本地的地址 - 3、訊息的傳送和訊息的監聽
@RestController @RequestMapping("kafka") public class KafkaController { @Autowired private KafkaTemplate template; private static final String topic="heima";//設定主題topic private static final Logger LOGGER=LoggerFactory.getLogger(KafkaApplication.class
如何使用事務控制回滾(已經發了一個訊息,但是後面的訊息出錯了,然後全部回滾)
- 事務的支援:
spring: kafka: producer: bootstrap-servers: 10.203.103.66:9092 transaction-id-prefix: kafka_tx. consumer: bootstrap-servers: 10.203.103.66:9092
//標紅的是事務的支援 - 程式碼部分:控制事務的方式
//編碼方式 @GetMapping("/send/{input}") public String sendToKafka(@PathVariable String input){ // this.template.send(topic,input); //事務的支援 template.executeInTransaction(t->{ t.send(topic,input); if("error".equals(input)) { throw new RuntimeException("input is error"); } t.send(topic,input+"anthor"); return true; }); return "send success!"+input; } //第二種 (註解方式) @GetMapping("/send2/{input}") @Transactional(rollbackFor = RuntimeException.class) public String sendToKafka2(@PathVariable String input){ // this.template.send(topic,input); //事務的支援 template.send(topic,input); if("error".equals(input)) { throw new RuntimeException("input is error"); } template.send(topic,input+"anthor"); return "send success!"+input; }