execute與executeUpdate的區別
1、Producer的攔截器interceptor,和consumer端的攔截器interceptor是在kafka0.10版本被引入的,主要用於實現clients端的定製化控制邏輯,生產者攔截器可以用在訊息傳送前做一些準備工作,使用場景,如下所示:
1)、按照某個規則過濾掉不符合要求的訊息。
2)、修改訊息的內容。
3)、統計類需求。
1 package com.demo.kafka.listener; 2 3 import java.util.Map; 4 5 import org.apache.kafka.clients.producer.ProducerInterceptor; 6 import org.apache.kafka.clients.producer.ProducerRecord; 7 import org.apache.kafka.clients.producer.RecordMetadata; 8 9 /** 10 * 生產者攔截器 11 * 12 * @author 生產者攔截器 13 * 14 */ 15 16 public class ProducerInterceptorPrefix implements ProducerInterceptor<String, String> { 17 18 // 傳送成功計數 19 private volatile long sendSuccess = 0; 20 21 // 傳送失敗計數 22 private volatile long sendFailure = 0; 23 24 /** 25 * 26 */ 27 @Override 28 public void configure(Map<String, ?> configs) { 29 30 } 31 32 /** 33 * 傳送訊息已經操作訊息的方法 34 */ 35 @Override 36 public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { 37 String modifiedValue = "字首prefix : " + record.value(); 38 ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>( 39 record.topic(), // 主題 40 record.partition(), // 分割槽 41 record.timestamp(), // 時間戳 42 record.key(), // key值 43 modifiedValue, // value值 44 record.headers()); // 訊息頭 45 return producerRecord; 46 } 47 48 /** 49 * ack確認的方法 50 */ 51 @Override 52 public void onAcknowledgement(RecordMetadata metadata, Exception exception) { 53 if(exception == null) { 54 sendSuccess++; 55 }else { 56 sendFailure++; 57 } 58 } 59 60 /** 61 * 關閉的方法,傳送成功之後會將攔截器關閉,呼叫此方法 62 */ 63 @Override 64 public void close() { 65 double successRation = (double)sendSuccess / (sendSuccess + sendFailure); 66 System.out.println("【INFO 】 傳送成功率: " + String.format("%f", successRation * 100) + "%"); 67 } 68 69 }
生產者客戶端要配置一下Producer的攔截器interceptor,如下所示:
1 package com.demo.kafka.producer; 2 3 import java.util.Properties; 4 import java.util.concurrent.ExecutionException; 5 6 import org.apache.kafka.clients.producer.KafkaProducer; 7 import org.apache.kafka.clients.producer.ProducerConfig; 8 import org.apache.kafka.clients.producer.ProducerRecord; 9 import org.apache.kafka.clients.producer.RecordMetadata; 10 import org.apache.kafka.common.serialization.StringSerializer; 11 12 import com.demo.kafka.listener.ProducerInterceptorPrefix; 13 14 public class KafkaProducerSimple { 15 16 // 設定伺服器地址 17 private static final String brokerList = "192.168.110.142:9092"; 18 19 // 設定主題 20 private static final String topic = "topic-demo"; 21 22 public static void main(String[] args) { 23 Properties properties = new Properties(); 24 // 設定key的序列化器 25 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 26 27 // 設定重試次數 28 properties.put(ProducerConfig.RETRIES_CONFIG, 10); 29 30 // 設定值的序列化器 31 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 32 33 // 列印輸出序列化器的路徑資訊 34 System.err.println(StringSerializer.class.getName()); 35 36 // 設定叢集地址 37 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); 38 39 // 自定義攔截器使用,可以計算髮送成功率或者失敗率,進行訊息的拼接或者過濾操作 40 properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName()); 41 42 // 將引數配置到生產者物件中 43 KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); 44 45 for (int i = 0; i < 100000; i++) { 46 // 生產者訊息記錄 47 ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "hello world!!!" + i); 48 // 同步獲取訊息 49 // RecordMetadata recordMetadata = producer.send(record).get(); 50 producer.send(record); 51 } 52 53 // 關閉 54 producer.close(); 55 } 56 57 }
消費者程式碼,如下所示:
1 package com.demo.kafka.consumer; 2 3 import java.time.Duration; 4 import java.util.Collections; 5 import java.util.Properties; 6 7 import org.apache.kafka.clients.consumer.ConsumerConfig; 8 import org.apache.kafka.clients.consumer.ConsumerRecord; 9 import org.apache.kafka.clients.consumer.ConsumerRecords; 10 import org.apache.kafka.clients.consumer.KafkaConsumer; 11 import org.apache.kafka.clients.producer.ProducerConfig; 12 import org.apache.kafka.common.serialization.StringDeserializer; 13 14 public class KafkaConsumerSimple { 15 16 // 設定伺服器地址 17 private static final String bootstrapServer = "192.168.110.142:9092"; 18 19 // 設定主題 20 private static final String topic = "topic-demo"; 21 22 // 設定消費者組 23 private static final String groupId = "group.demo"; 24 25 public static void main(String[] args) { 26 Properties properties = new Properties(); 27 // 設定反序列化key引數資訊 28 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 29 // 設定反序列化value引數資訊 30 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 31 32 // 設定伺服器列表資訊 33 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); 34 35 // 設定消費者組資訊 36 properties.put("group.id", groupId); 37 38 // 將引數設定到消費者引數中 39 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); 40 41 // 訊息訂閱 42 consumer.subscribe(Collections.singletonList(topic)); 43 44 while (true) { 45 // 每隔一秒監聽一次 46 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); 47 // 獲取到訊息資訊 48 for (ConsumerRecord<String, String> record : records) { 49 System.err.println(record.toString()); 50 } 51 } 52 53 } 54 55 }
2、生產者的acks引數,這個引數用來指定分割槽中必須有多少副本來收到這條訊息,之後生產者才會認為這條訊息寫入成功的。acks是生產者客戶端中非常重要的一個引數,它涉及到訊息的可靠性和吞吐量之間的權衡。
1)、ack等於0,生產者在成功寫入訊息之前不會等待任何來自伺服器的響應。如果出現問題生產者是感知不到的,訊息就丟失了,不過因為生產者不需要等待伺服器響應,所以他可以以網路能夠支援的最大速度傳送訊息,從而達到很高的吞吐量。 2)、acks等於1,預設值為1,只要叢集的首領節點收到訊息,生產者就會收到一個來自伺服器的成功響應。如果訊息無法達到首領節點,比如首領節點崩潰,新的首領節點還沒有被選舉出來,生產者會收到一個錯誤響應,為了避免資料丟失,生產者會重發訊息。但是這樣還有可能會導致資料丟失,如果收到寫成功通知,此時首領節點還沒有來的及同步資料到follower節點,首領節點崩潰,就會導致資料丟失。 3)、acks等於-1,只有當所有參與複製的節點收到訊息時候,生產者會收到一個來自伺服器額成功響應,這種模式 最安全的,他可以保證不止一個伺服器收到訊息。
注意,acks引數配置的是一個字串型別,而不是整數型別,如果配置為整數型別會丟擲異常資訊。
3、kafka消費者訂閱主題和分割槽,建立完消費者後我們便可以訂閱主題了,只需要呼叫subscribe方法即可,這個方法會接受一個主題列表,如下所示:
另外,我們也可以使用正則表示式來匹配多個主題,而且訂閱之後如果又有匹配的新主題,那麼這個消費組立即對其進行消費。正則表示式在連線kafka與其他系統非常有用。比如訂閱所有的測試主題。
1 package com.demo.kafka.consumer;
2
3 import java.time.Duration;
4 import java.util.Arrays;
5 import java.util.Collections;
6 import java.util.Properties;
7 import java.util.regex.Pattern;
8
9 import org.apache.kafka.clients.consumer.ConsumerConfig;
10 import org.apache.kafka.clients.consumer.ConsumerRecord;
11 import org.apache.kafka.clients.consumer.ConsumerRecords;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.clients.producer.ProducerConfig;
14 import org.apache.kafka.common.TopicPartition;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16
17 public class KafkaConsumerSimple {
18
19 // 設定伺服器地址
20 private static final String bootstrapServer = "192.168.110.142:9092";
21
22 // 設定主題
23 private static final String topic = "topic-demo";
24
25 // 設定主題
26 private static final String topic2 = "topic-demo2";
27
28 // 設定消費者組
29 private static final String groupId = "group.demo";
30
31 public static void main(String[] args) {
32 Properties properties = new Properties();
33 // 設定反序列化key引數資訊
34 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
35 // 設定反序列化value引數資訊
36 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
37
38 // 設定伺服器列表資訊,必填引數,該引數和生產者相同,,制定連結kafka叢集所需的broker地址清單,可以設定一個或者多個
39 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
40
41 // 設定消費者組資訊,消費者隸屬的消費組,預設為空,如果設定為空,則會丟擲異常,這個引數要設定成具有一定業務含義的名稱
42 properties.put("group.id", groupId);
43
44 // 制定kafka消費者對應的客戶端id,預設為空,如果不設定kafka消費者會自動生成一個非空字串。
45 properties.put("client.id", "consumer.client.id.demo");
46
47 // 將引數設定到消費者引數中
48 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
49
50 // 訊息訂閱
51 consumer.subscribe(Collections.singletonList(topic));
52 // 可以訂閱多個主題
53 consumer.subscribe(Arrays.asList(topic, topic2));
54 // 可以使用正則表示式進行訂閱
55 consumer.subscribe(Pattern.compile("topic-demo*"));
56
57 // 指定訂閱的分割槽
58 consumer.assign(Arrays.asList(new TopicPartition(topic, 0)));
59
60 while (true) {
61 // 每隔一秒監聽一次
62 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
63 // 獲取到訊息資訊
64 for (ConsumerRecord<String, String> record : records) {
65 System.err.println(record.toString());
66 }
67 }
68
69 }
70
71 }