kafka使用(1)
阿新 • • 發佈:2020-09-15
kafka使用(1)1ConsumerRecord2建立主題2.1註解方式2.2KafkaAdmin和AdminClient2.3為什麼要分割槽2.4獲得主題的資訊3自定義傳送訊息監聽器4kafka事務管理使用事務結果:
1ConsumerRecord
public class ConsumerDemo {
topics = "topicTEST") (
public void listen (ConsumerRecord<?, ?> record){
System.out.printf("topic is %s, offset is %d, timestamp is %s, value is %s \n", record.topic(), record.offset(), record.timestamp(),record.value());
}
}
假設我們有3個kafka broker分別brokerA、brokerB、brokerC.
-
當我們建立的topic有3個分割槽partition時並且replication-factor為1,基本上一個broker上一個分割槽。擋一個broker宕機了,該topic就無法使用了,因為三個分割槽只有兩個能用,
-
當我們建立的topic有3個分割槽partition時並且replication-factor為2時,可能分割槽資料分佈情況是
節點 | 分割槽 | 分割槽副本 |
---|---|---|
brokerA | partiton0 | partiton1 |
brokerB | partiton1 | partiton2 |
brokerC | partiton2 | partiton0 |
2建立主題
2.1註解方式
//建立TopicName為topic.quick.initial的Topic並設定分割槽數為8以及副本數為1
public NewTopic initialTopic() {
return new NewTopic("topic.quick.initial",8, (short) 1 );
}
2.2KafkaAdmin和AdminClient
public KafkaAdmin kafkaAdmin() {
Map<String, Object> props = new HashMap<>();
//配置Kafka例項的連線地址
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.228.128:9090");
KafkaAdmin admin = new KafkaAdmin(props);
return admin;
}
public AdminClient adminClient() {
return AdminClient.create(kafkaAdmin().getConfigurationProperties());
}
private AdminClient adminClient;
public void testCreateTopic() throws InterruptedException {
NewTopic topic = new NewTopic("topic.quick.initial2", 1, (short) 1);
adminClient.createTopics(Arrays.asList(topic));
Thread.sleep(1000);
}
-
結果:
2.3為什麼要分割槽
分割槽提高系統的吞吐量,但是分割槽的引數只能增大,不能減小。
2.4獲得主題的資訊
/**
* 獲得主題的資訊
*/
public void testSelectTopicInfo() throws ExecutionException, InterruptedException {
DescribeTopicsResult topicTest = adminClient.describeTopics(Arrays.asList("topic.quick.initial"));
topicTest.all().get().forEach((k,v)->{
System.out.println("k: "+k+" ,v: "+v.toString()+"\n");
});
}
k: topic.quick.initial ,v: (name=topic.quick.initial, internal=false, partitions=(partition=0, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=1, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=2, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=3, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=4, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=5, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=6, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=7, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)), authorizedOperations=null)
3自定義傳送訊息監聽器
public class KafkaSendListenerHandler implements ProducerListener {
private static final Logger log = LoggerFactory.getLogger(KafkaSendListenerHandler.class);
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
log.info("監聽訊息傳送成功...");
String key = (String)producerRecord.key();
log.info("key : " + key);
log.info("Message send success : " + producerRecord.toString());
log.info("-----------------------------");
}
public void onError(ProducerRecord producerRecord, Exception exception) {
}
}
4kafka事務管理
-
KafkaTemplate 的 executeInTransaction 方法來宣告事務
public void testExecuteInTransaction() throws InterruptedException {
kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
public Object doInOperations(KafkaOperations kafkaOperations) {
kafkaOperations.send("topicTEST", "test executeInTransaction");
throw new RuntimeException("fail");
//return true;
}
});
}
-
使用@Transactional註解方式使用註解方式開啟事務,首先需要配置KafkaTransactionManager,這個類是Kafka提供事務管理類,需要使用生產者工廠來建立這個事務管理類。需要注意的是,在producerFactory中開啟事務功能,並設定TransactionIdPrefix,TransactionIdPrefix是用來生成Transactional.id的字首。
public ProducerFactory<String, Object> producerFactory() {
DefaultKafkaProducerFactory<String, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs());
factory.transactionCapable();
factory.setTransactionIdPrefix("tran-");
return factory;
}
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.228.128:9090");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<String, Object>(producerFactory());
}
public KafkaTransactionManager transactionManager(ProducerFactory producerFactory) {
KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory);
return manager;
}
使用事務結果:
org.apache.kafka.common.KafkaException: Failing batch since transaction was aborted
at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:422) [kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312) [kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) [kafka-clients-2.5.1.jar:na]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_152]
2020-09-15 18:50:54.064 INFO 14636 --- [ main] o.s.t.c.transaction.TransactionContext : Rolled back transaction for test: ....