kafka的客戶端操作
阿新 • • 發佈:2020-08-02
一:Consumer API
1.自動提交程式
這種不建議在實際中使用
/** * 簡單的消費kafka訊息,自動提交 * 消費過的資料再消費不到了 */ public static void helloConsumer() { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.19.129:9092"); properties.setProperty("group.id", "test"); properties.setProperty("enable.auto.commit", "true"); properties.setProperty("auto.commit.interval.ms", "1000"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer(properties); // 訂閱 consumer.subscribe(Arrays.asList(TOPIC_NAME)); while (true) { // 1000毫秒拉取一次 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } }
2.手動提交offset
如果事務失敗了,麼有提交,下次還能繼續獲取到資料
/** * 手動提交 */ public static void commitedOffset() { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.19.129:9092"); properties.setProperty("group.id", "test"); properties.setProperty("enable.auto.commit", "false"); properties.setProperty("auto.commit.interval.ms", "1000"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer(properties); // 訂閱 consumer.subscribe(Arrays.asList(TOPIC_NAME)); while (true) { // 1000毫秒拉取一次 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } // 手動提交,for迴圈成功才執行;不然不執行,在下一次還會再拉取資料 consumer.commitAsync(); } }
3.ConsumerGroup
單個分割槽的訊息只能有ConsumerGroup中的某個Consumer消費
Consumer從partition中的消費是順序,預設從頭開始
單個ConsumerGroup會消費所有partition中的訊息
4.特性
5.按照patition維度進行處理
/** * 按照patition維度進行處理 */ public static void commitedOffsetWithPartition() { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.19.129:9092"); properties.setProperty("group.id", "test"); properties.setProperty("enable.auto.commit", "false"); properties.setProperty("auto.commit.interval.ms", "1000"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer(properties); // 訂閱 consumer.subscribe(Arrays.asList(TOPIC_NAME)); while (true) { // 1000毫秒拉取一次 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); // 操作維度是partition了,每個partition單獨處理 for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> pRecords = records.records(partition); for (ConsumerRecord<String, String> record : pRecords) { System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } long lastOffset = pRecords.get(pRecords.size() - 1).offset(); // 手動提交 Map<TopicPartition, OffsetAndMetadata> offset = new HashMap(); offset.put(partition, new OffsetAndMetadata(lastOffset + 1)); consumer.commitSync(offset); } } }
6.只消費某個partition
/** * 訂閱topic下的partition中的內容 * */ public static void commitedOffsetWithTopicPartition() { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.19.129:9092"); properties.setProperty("group.id", "test"); properties.setProperty("enable.auto.commit", "false"); properties.setProperty("auto.commit.interval.ms", "1000"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer(properties); TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0); TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1); // 訂閱partition consumer.assign(Arrays.asList(p1)); while (true) { // 1000毫秒拉取一次 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); // 操作維度是partition了,每個partition單獨處理 for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> pRecords = records.records(partition); for (ConsumerRecord<String, String> record : pRecords) { System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } long lastOffset = pRecords.get(pRecords.size() - 1).offset(); // 手動提交 Map<TopicPartition, OffsetAndMetadata> offset = new HashMap(); offset.put(partition, new OffsetAndMetadata(lastOffset + 1)); consumer.commitSync(offset); } } }
二:Consumer API的多執行緒處理
1.第一種方式
2.程式
3.第二種方式
這種方式,是沒有辦法提交offset的,只是為了快速消費資料
4.程式
三:一些其他的特性
1.手動控制offset
/** * 手動指定offset的起始位置,手動提交offset * * 手動指定offset起始位置 * 1、人為控制offset起始位置 * 2、如果出現程式錯誤,重複消費一次 * * 步驟 * 1、第一次從0消費【一般情況】 * 2、比如一次消費了100條, offset置為101並且存入Redis * 3、每次poll之前,從redis中獲取最新的offset位置 * 4、每次從這個位置開始消費 * * 建議 * 1.使用redis進行儲存 */ public static void controllerOffset() { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.19.129:9092"); properties.setProperty("group.id", "test"); properties.setProperty("enable.auto.commit", "false"); properties.setProperty("auto.commit.interval.ms", "1000"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer(properties); TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0); // 訂閱partition consumer.assign(Arrays.asList(p0)); while (true) { // 設定offset consumer.seek(p0, 5); // 1000毫秒拉取一次 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); // 操作維度是partition了,每個partition單獨處理 for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> pRecords = records.records(partition); for (ConsumerRecord<String, String> record : pRecords) { System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } long lastOffset = pRecords.get(pRecords.size() - 1).offset(); // 手動提交 Map<TopicPartition, OffsetAndMetadata> offset = new HashMap(); offset.put(partition, new OffsetAndMetadata(lastOffset + 1)); consumer.commitSync(offset); } } }
2.限流
/** * 限流 */ public static void controllerLimit() { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.19.129:9092"); properties.setProperty("group.id", "test"); properties.setProperty("enable.auto.commit", "false"); properties.setProperty("auto.commit.interval.ms", "1000"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer(properties); TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0); TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1); long totalNum = 100; // 訂閱partition consumer.assign(Arrays.asList(p0, p1)); while (true) { // 1000毫秒拉取一次 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); // 操作維度是partition了,每個partition單獨處理 for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> pRecords = records.records(partition); long num = 0; for (ConsumerRecord<String, String> record : pRecords) { System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); /* 1、接收到record資訊以後,去令牌桶中拿取令牌 2、如果獲取到令牌,則繼續業務處理 3、如果獲取不到令牌, 則pause等待令牌 4、當令牌桶中的令牌足夠, 則將consumer置為resume狀態 */ num++; if(record.partition() == 0){ if(num >= totalNum){ consumer.pause(Arrays.asList(p0)); } } if(record.partition() == 1){ if(num == 40){ consumer.resume(Arrays.asList(p0)); } } } long lastOffset = pRecords.get(pRecords.size() - 1).offset(); // 手動提交 Map<TopicPartition, OffsetAndMetadata> offset = new HashMap(); offset.put(partition, new OffsetAndMetadata(lastOffset + 1)); consumer.commitSync(offset); } } }