1. 程式人生 > 實用技巧 >kafka使用(1)

kafka使用(1)

kafka使用(1)1ConsumerRecord2建立主題2.1註解方式2.2KafkaAdmin和AdminClient2.3為什麼要分割槽2.4獲得主題的資訊3自定義傳送訊息監聽器4kafka事務管理使用事務結果:

kafka使用(1)

1ConsumerRecord

@Component
public class ConsumerDemo {
@KafkaListener(topics = "topicTEST")
public void listen (ConsumerRecord<?, ?> record){
System.out.printf("topic is %s, offset is %d, timestamp is %s, value is %s \n"
, record.topic(), record.offset(), record.timestamp(),record.value());

}
}

假設我們有3個kafka broker分別brokerA、brokerB、brokerC.

  1. 當我們建立的topic有3個分割槽partition時並且replication-factor為1,基本上一個broker上一個分割槽。擋一個broker宕機了,該topic就無法使用了,因為三個分割槽只有兩個能用,

  2. 當我們建立的topic有3個分割槽partition時並且replication-factor為2時,可能分割槽資料分佈情況是

節點分割槽分割槽副本
brokerA partiton0 partiton1
brokerB partiton1 partiton2
brokerC partiton2 partiton0

2建立主題

2.1註解方式

  //建立TopicName為topic.quick.initial的Topic並設定分割槽數為8以及副本數為1
@Bean
public NewTopic initialTopic() {
return new NewTopic("topic.quick.initial",8, (short) 1 );
}

2.2KafkaAdmin和AdminClient

  @Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> props = new HashMap<>();
//配置Kafka例項的連線地址
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.228.128:9090");
KafkaAdmin admin = new KafkaAdmin(props);
return admin;
}

@Bean
public AdminClient adminClient() {
return AdminClient.create(kafkaAdmin().getConfigurationProperties());
}
  @Autowired
private AdminClient adminClient;

@Test
public void testCreateTopic() throws InterruptedException {
NewTopic topic = new NewTopic("topic.quick.initial2", 1, (short) 1);
adminClient.createTopics(Arrays.asList(topic));
Thread.sleep(1000);
}
  • 結果:

2.3為什麼要分割槽

分割槽提高系統的吞吐量,但是分割槽的引數只能增大,不能減小。

2.4獲得主題的資訊

  /**
* 獲得主題的資訊
*/
@Test
public void testSelectTopicInfo() throws ExecutionException, InterruptedException {
DescribeTopicsResult topicTest = adminClient.describeTopics(Arrays.asList("topic.quick.initial"));
topicTest.all().get().forEach((k,v)->{
System.out.println("k: "+k+" ,v: "+v.toString()+"\n");
});
}
k: topic.quick.initial ,v: (name=topic.quick.initial, internal=false, partitions=(partition=0, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=1, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=2, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=3, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=4, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=5, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=6, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=7, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)), authorizedOperations=null)

3自定義傳送訊息監聽器

@Configuration
public class KafkaSendListenerHandler implements ProducerListener {
private static final Logger log = LoggerFactory.getLogger(KafkaSendListenerHandler.class);


@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
log.info("監聽訊息傳送成功...");
String key = (String)producerRecord.key();
log.info("key : " + key);
log.info("Message send success : " + producerRecord.toString());
log.info("-----------------------------");
}

@Override
public void onError(ProducerRecord producerRecord, Exception exception) {

}
}

4kafka事務管理

  • KafkaTemplate 的 executeInTransaction 方法來宣告事務

  @Test
public void testExecuteInTransaction() throws InterruptedException {
kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
@Override
public Object doInOperations(KafkaOperations kafkaOperations) {
kafkaOperations.send("topicTEST", "test executeInTransaction");
throw new RuntimeException("fail");
//return true;
}
});
}
  • 使用@Transactional註解方式使用註解方式開啟事務,首先需要配置KafkaTransactionManager,這個類是Kafka提供事務管理類,需要使用生產者工廠來建立這個事務管理類。需要注意的是,在producerFactory中開啟事務功能,並設定TransactionIdPrefix,TransactionIdPrefix是用來生成Transactional.id的字首。

 @Bean
public ProducerFactory<String, Object> producerFactory() {
DefaultKafkaProducerFactory<String, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs());
factory.transactionCapable();
factory.setTransactionIdPrefix("tran-");
return factory;
}

@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.228.128:9090");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<String, Object>(producerFactory());
}

@Bean
public KafkaTransactionManager transactionManager(ProducerFactory producerFactory) {
KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory);
return manager;
}
使用事務結果:

org.apache.kafka.common.KafkaException: Failing batch since transaction was aborted
at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:422) [kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312) [kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) [kafka-clients-2.5.1.jar:na]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_152]

2020-09-15 18:50:54.064 INFO 14636 --- [ main] o.s.t.c.transaction.TransactionContext : Rolled back transaction for test: ....