1. 程式人生 > 其它 >execute與executeUpdate的區別

execute與executeUpdate的區別

技術標籤:kafkajava分散式zookeeper大資料

1、Producer的攔截器interceptor,和consumer端的攔截器interceptor是在kafka0.10版本被引入的,主要用於實現clients端的定製化控制邏輯,生產者攔截器可以用在訊息傳送前做一些準備工作,使用場景,如下所示:

  1)、按照某個規則過濾掉不符合要求的訊息。
  2)、修改訊息的內容。
  3)、統計類需求。

 1 package com.demo.kafka.listener;
 2 
 3 import java.util.Map;
 4 
 5 import org.apache.kafka.clients.producer.ProducerInterceptor;
 6 import org.apache.kafka.clients.producer.ProducerRecord;
 7 import org.apache.kafka.clients.producer.RecordMetadata;
 8 
 9 /**
10  * 生產者攔截器
11  * 
12  * @author 生產者攔截器
13  *
14  */
15 
16 public class ProducerInterceptorPrefix implements ProducerInterceptor<String, String> {
17 
18     // 傳送成功計數
19     private volatile long sendSuccess = 0;
20 
21     // 傳送失敗計數
22     private volatile long sendFailure = 0;
23 
24     /**
25      * 
26      */
27     @Override
28     public void configure(Map<String, ?> configs) {
29 
30     }
31 
32     /**
33      * 傳送訊息已經操作訊息的方法
34      */
35     @Override
36     public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
37         String modifiedValue = "字首prefix : " + record.value();
38         ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(
39                 record.topic(), // 主題
40                 record.partition(), // 分割槽
41                 record.timestamp(), // 時間戳
42                 record.key(), // key值
43                 modifiedValue,  // value值
44                 record.headers()); // 訊息頭
45         return producerRecord;
46     }
47 
48     /**
49      * ack確認的方法
50      */
51     @Override
52     public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
53         if(exception == null) {
54             sendSuccess++;
55         }else {
56             sendFailure++;
57         }
58     }
59 
60     /**
61      * 關閉的方法,傳送成功之後會將攔截器關閉,呼叫此方法
62      */
63     @Override
64     public void close() {
65         double successRation = (double)sendSuccess / (sendSuccess + sendFailure);
66         System.out.println("【INFO 】 傳送成功率: " + String.format("%f", successRation * 100) + "%");
67     }
68 
69 }

生產者客戶端要配置一下Producer的攔截器interceptor,如下所示:

 1 package com.demo.kafka.producer;
 2 
 3 import java.util.Properties;
 4 import java.util.concurrent.ExecutionException;
 5 
 6 import org.apache.kafka.clients.producer.KafkaProducer;
 7 import org.apache.kafka.clients.producer.ProducerConfig;
 8 import org.apache.kafka.clients.producer.ProducerRecord;
 9 import org.apache.kafka.clients.producer.RecordMetadata;
10 import org.apache.kafka.common.serialization.StringSerializer;
11 
12 import com.demo.kafka.listener.ProducerInterceptorPrefix;
13 
14 public class KafkaProducerSimple {
15 
16     // 設定伺服器地址
17     private static final String brokerList = "192.168.110.142:9092";
18 
19     // 設定主題
20     private static final String topic = "topic-demo";
21 
22     public static void main(String[] args) {
23         Properties properties = new Properties();
24         // 設定key的序列化器
25         properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
26 
27         // 設定重試次數
28         properties.put(ProducerConfig.RETRIES_CONFIG, 10);
29 
30         // 設定值的序列化器
31         properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
32 
33         // 列印輸出序列化器的路徑資訊
34         System.err.println(StringSerializer.class.getName());
35 
36         // 設定叢集地址
37         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
38 
39         // 自定義攔截器使用,可以計算髮送成功率或者失敗率,進行訊息的拼接或者過濾操作
40         properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName());
41 
42         // 將引數配置到生產者物件中
43         KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
44 
45         for (int i = 0; i < 100000; i++) {
46             // 生產者訊息記錄
47             ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "hello world!!!" + i);
48             // 同步獲取訊息
49 //            RecordMetadata recordMetadata = producer.send(record).get();
50             producer.send(record);
51         }
52 
53         // 關閉
54         producer.close();
55     }
56 
57 }

消費者程式碼,如下所示:

 1 package com.demo.kafka.consumer;
 2 
 3 import java.time.Duration;
 4 import java.util.Collections;
 5 import java.util.Properties;
 6 
 7 import org.apache.kafka.clients.consumer.ConsumerConfig;
 8 import org.apache.kafka.clients.consumer.ConsumerRecord;
 9 import org.apache.kafka.clients.consumer.ConsumerRecords;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.clients.producer.ProducerConfig;
12 import org.apache.kafka.common.serialization.StringDeserializer;
13 
14 public class KafkaConsumerSimple {
15 
16     // 設定伺服器地址
17     private static final String bootstrapServer = "192.168.110.142:9092";
18 
19     // 設定主題
20     private static final String topic = "topic-demo";
21 
22     // 設定消費者組
23     private static final String groupId = "group.demo";
24 
25     public static void main(String[] args) {
26         Properties properties = new Properties();
27         // 設定反序列化key引數資訊
28         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
29         // 設定反序列化value引數資訊
30         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
31 
32         // 設定伺服器列表資訊
33         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
34 
35         // 設定消費者組資訊
36         properties.put("group.id", groupId);
37 
38         // 將引數設定到消費者引數中
39         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
40 
41         // 訊息訂閱
42         consumer.subscribe(Collections.singletonList(topic));
43 
44         while (true) {
45             // 每隔一秒監聽一次
46             ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
47             // 獲取到訊息資訊
48             for (ConsumerRecord<String, String> record : records) {
49                 System.err.println(record.toString());
50             }
51         }
52 
53     }
54 
55 }

2、生產者的acks引數,這個引數用來指定分割槽中必須有多少副本來收到這條訊息,之後生產者才會認為這條訊息寫入成功的。acks是生產者客戶端中非常重要的一個引數,它涉及到訊息的可靠性和吞吐量之間的權衡。

  1)、ack等於0,生產者在成功寫入訊息之前不會等待任何來自伺服器的響應。如果出現問題生產者是感知不到的,訊息就丟失了,不過因為生產者不需要等待伺服器響應,所以他可以以網路能夠支援的最大速度傳送訊息,從而達到很高的吞吐量。  2)、acks等於1,預設值為1,只要叢集的首領節點收到訊息,生產者就會收到一個來自伺服器的成功響應。如果訊息無法達到首領節點,比如首領節點崩潰,新的首領節點還沒有被選舉出來,生產者會收到一個錯誤響應,為了避免資料丟失,生產者會重發訊息。但是這樣還有可能會導致資料丟失,如果收到寫成功通知,此時首領節點還沒有來的及同步資料到follower節點,首領節點崩潰,就會導致資料丟失。  3)、acks等於-1,只有當所有參與複製的節點收到訊息時候,生產者會收到一個來自伺服器額成功響應,這種模式 最安全的,他可以保證不止一個伺服器收到訊息。

  注意,acks引數配置的是一個字串型別,而不是整數型別,如果配置為整數型別會丟擲異常資訊。

3、kafka消費者訂閱主題和分割槽,建立完消費者後我們便可以訂閱主題了,只需要呼叫subscribe方法即可,這個方法會接受一個主題列表,如下所示:

  另外,我們也可以使用正則表示式來匹配多個主題,而且訂閱之後如果又有匹配的新主題,那麼這個消費組立即對其進行消費。正則表示式在連線kafka與其他系統非常有用。比如訂閱所有的測試主題。

 1 package com.demo.kafka.consumer;
 2 
 3 import java.time.Duration;
 4 import java.util.Arrays;
 5 import java.util.Collections;
 6 import java.util.Properties;
 7 import java.util.regex.Pattern;
 8 
 9 import org.apache.kafka.clients.consumer.ConsumerConfig;
10 import org.apache.kafka.clients.consumer.ConsumerRecord;
11 import org.apache.kafka.clients.consumer.ConsumerRecords;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.clients.producer.ProducerConfig;
14 import org.apache.kafka.common.TopicPartition;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 
17 public class KafkaConsumerSimple {
18 
19     // 設定伺服器地址
20     private static final String bootstrapServer = "192.168.110.142:9092";
21 
22     // 設定主題
23     private static final String topic = "topic-demo";
24 
25     // 設定主題
26     private static final String topic2 = "topic-demo2";
27 
28     // 設定消費者組
29     private static final String groupId = "group.demo";
30 
31     public static void main(String[] args) {
32         Properties properties = new Properties();
33         // 設定反序列化key引數資訊
34         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
35         // 設定反序列化value引數資訊
36         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
37 
38         // 設定伺服器列表資訊,必填引數,該引數和生產者相同,,制定連結kafka叢集所需的broker地址清單,可以設定一個或者多個
39         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
40 
41         // 設定消費者組資訊,消費者隸屬的消費組,預設為空,如果設定為空,則會丟擲異常,這個引數要設定成具有一定業務含義的名稱
42         properties.put("group.id", groupId);
43 
44         // 制定kafka消費者對應的客戶端id,預設為空,如果不設定kafka消費者會自動生成一個非空字串。
45         properties.put("client.id", "consumer.client.id.demo");
46 
47         // 將引數設定到消費者引數中
48         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
49 
50         // 訊息訂閱
51         consumer.subscribe(Collections.singletonList(topic));
52         // 可以訂閱多個主題
53         consumer.subscribe(Arrays.asList(topic, topic2));
54         // 可以使用正則表示式進行訂閱
55         consumer.subscribe(Pattern.compile("topic-demo*"));
56 
57         // 指定訂閱的分割槽
58         consumer.assign(Arrays.asList(new TopicPartition(topic, 0)));
59 
60         while (true) {
61             // 每隔一秒監聽一次
62             ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
63             // 獲取到訊息資訊
64             for (ConsumerRecord<String, String> record : records) {
65                 System.err.println(record.toString());
66             }
67         }
68 
69     }
70 
71 }