1. 程式人生 > >kafka (搜索) 在idea api操作(官方apihttp://kafka.apache.org/documentation/#producerapi)

kafka (搜索) 在idea api操作(官方apihttp://kafka.apache.org/documentation/#producerapi)

減少 tails ray valueof 記錄 org 代理 交互 為我

https://blog.csdn.net/isea533/article/details/73822881 這個不推薦,可以看一下(https://www.cnblogs.com/biehongli/p/8335538.html)

Kafka API 簡單用法

本篇會用到以下依賴:(本人包和這個不同,去maven裏查找)

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>

</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.2.0</version>
</dependency>

生產者API
參考官方文檔中 KafkaProducer 的介紹。

Kafka客戶端用於向 Kafka 集群發布記錄。生產者是線程安全的,跨線程共享一個生產者實例通常比擁有多個實例要快。這是一個簡單的例子,使用生產者發送包含序列號的字符串作為鍵/值對的記錄,代碼如下。

package com.github.abel533.kafka.api;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class ProducerApi {

  public static void main(String[] args) {

    Properties props = new Properties();
    props.put("bootstrap.servers", "192.168.16.150:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer",
    "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer",
    "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);
    for (int i = 0; i < 100; i++) {
      producer.send(new ProducerRecord<String, String>(
      "t1", Integer.toString(i), Integer.toString(i)));
    }
    producer.close();
  }
}

生產者包括一個緩沖區池,它保存尚未發送到服務器的記錄,以及一個後臺I/O線程,負責將這些記錄轉換為請求並將其傳輸到集群。使用後未能關閉生產者將泄漏這些資源。

send()方法是異步的。當被調用時,它將記錄添加到待處理記錄發送的緩沖區並立即返回。這允許生產者將各個記錄收集在一起以獲得效率。

acks配置其請求被視為完整性的標準。"all"意味著領導者將等待完整的同步副本來確認記錄。只要至少有一個同步復制品仍然存在,這將保證記錄不會丟失。這是最強大的保證。這相當於設置acks = -1。

如果請求失敗,生產者可以自動重試,但是由於我們指定retries0,所以不會重試。啟用重試還會產生重復的可能性(有關詳細信息,請參閱有關消息傳遞語義的文檔 )。

生產者維護每個分區的未發送出去的緩沖區。這些緩沖區的大小由batch.size指定。使此更大可以緩存更多,但需要更多的內存(因為我們通常會為每個活動分區提供緩沖區)。

默認情況下,即使緩沖區中存在額外的未使用空間,緩沖區也可立即發送。但是,如果要減少請求數可以設置linger.ms為大於0 的毫秒數。這將指示生產者在發送請求之前等待該毫秒數,這樣將有更多記錄到達緩沖區。這類似於Nagle在TCP中的算法。例如,在上面的代碼片段中,可能所有100條記錄都將在單個請求中發送,因為我們將延遲時間設置為1毫秒。但是,如果我們沒有填滿緩沖區,則此設置會為我們的請求增加1毫秒的延遲,以便等待更多記錄到達。在重負荷下 ,即使linger.ms=0,在時間上緊接在一起的記錄也將一起批量處理。將其設置為大於0的值可能會讓請求更少和更高效,而不是在最大負載下以少量延遲為代價。

buffer.memory控制生產者可用於緩沖的總內存量。如果記錄的發送速度比可以傳輸到服務器的速度快,那麽這個緩沖空間就會耗盡。當緩沖區空間耗盡時,附加的發送呼叫將被阻塞。max.block.ms決定阻塞時間的閾值,超出此時間時,會引發TimeoutException。

key.serializervalue.serializer指導如何將用戶提供的ProducerRecord的鍵和值轉換成字節。您可以使用提供的ByteArraySerializer或 StringSerializer用於簡單的字符串或字節類型。

該客戶端可以與0.10.0版本或更高版本的broker進行通信。舊的或較新的broker可能不支持某些功能。當調用運行的broker程序版本不可用的API時,會產生UnsupportedVersionException異常。

有關生產者更多的配置屬性可以參考Producer Configs。

消費者API
參考官方文檔中的 KafkaConsumer 介紹。

從Kafka集群中消費記錄的客戶端。

這個客戶端透明地處理卡夫卡經紀人的失敗,並透明地適應作為在集群中遷移的主題分區。該客戶端還與代理商進行交互,以允許消費群體使用消費者群體來負載平衡消費。

消費者保持TCP連接到必要的經紀人以獲取數據。使用後未能關閉消費者將泄漏這些連接。消費者不是線程安全的。有關詳細信息,請參閱多線程處理。

自動提交偏移
下面這個例子使用了自動提交,設定了每1000ms提交一次偏移(就是當前已讀取消息的位置)。

package com.github.abel533.kafka.api;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerAOC {
  public static void main(String[] args) {
    final Properties props = new Properties();
    props.put("bootstrap.servers", "192.168.16.150:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("key.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("t1"));
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(1000);
      for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s%n",
          record.offset(), record.key(), record.value());
    }

  }
}


首先通過bootstrap.servers設置要連接的Broker,多個可以使用逗號隔開。通過group.id設置了當前的分組id,同一個分組id中的多個消費者可以通過負載均衡處理消息(消費者數量多於主題的分區數時,多出來的消費者不會被分配任何消息)。

通過設置enable.auto.commit為true開啟自動提交,自動提交的頻率由 auto.commit.interval.ms 設置。

後面兩個 deserializer 用於序列化 key 和 value。

通過 consumer.subscribe 定義了主題 t1,一個消費者可以訂閱多個主題。通過consumer.poll獲取消息,參數1000(毫秒)的含義是,當緩沖區中沒有可用消息時,以此時間進行輪訓等待。當設置為0時,理解返回當前可用的消息或者返回空。

手動提交偏移
消費者不是必須自動提交偏移。用戶也可以手動控制提交偏移來決定消息是否已被消費。當消息需要經過一些特殊邏輯進行處理時,手動提交就非常有必要,沒有經過處理的消息不應該當成已消費。

package com.github.abel533.kafka.api;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class ConsumerManual {

  public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "192.168.16.150:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");
    props.put("key.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("t1", "t2"));
    final int minBatchSize = 200;
    List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records) {
        buffer.add(record);
      }
      if (buffer.size() >= minBatchSize) {
        //邏輯處理,例如保存到數據庫
        consumer.commitSync();
        buffer.clear();
      }
    }
  }
}

在這個例子中,我們將enable.auto.commit設置為false,這是因為這個值默認情況下是true,只有手動設置為false後才能進行手動提交。

每當buffer的大小超過設置的批量大小後就會通過consumer.commitSync()進行提交。

在某些情況下,您可能希望通過明確指定偏移量來更精確地控制已經提交的記錄。在下面的例子中,我們在完成處理每個分區中的記錄之後提交偏移量。

package com.github.abel533.kafka.api;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.*;

public class ConsumerManualPartition {

  public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "192.168.16.150:9092");
    props.put("group.id", "test2");
    props.put("enable.auto.commit", "false");
    props.put("key.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("t1"));

    try {
      while(true) {
        ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
        for (TopicPartition partition : records.partitions()) {
          List<ConsumerRecord<String, String>> partitionRecords =
          records.records(partition);
          for (ConsumerRecord<String, String> record : partitionRecords) {
            System.out.println(partition.partition() + ": "
            + record.offset() + ": "
            + record.value());
          }
          long lastOffset = partitionRecords.get(
          partitionRecords.size() - 1).offset();
          consumer.commitSync(
          Collections.singletonMap(partition,
          new OffsetAndMetadata(lastOffset + 1)));
        }
      }
    } finally {
      consumer.close();
    }
  }
}

因為每個主題可能存在多個分區,每個分區都維護了一個索引,因此上面針對不同的分區進行處理。

消費者API中還存在很多有用的方法,可以通過查看官方的API文檔了解更多。

流API
參考官方文檔中 KafkaStreams 的介紹。

流API允許對來自一個或多個主題的消息進行連續計算,並將結果發送到零個,一個或多個主題中。

可以從Kafka獲取某個主題的消息,經過處理後輸出到另一個主題。相當於是對主題做了一個加工。下面是一個示例,這個示例從t1主題獲取消息,然後計算數字的平方後發送消息到t2主題中。

package com.github.abel533.kafka.api;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import java.util.HashMap;
import java.util.Map;

public class StreamApi {
  public static void main(String[] args) {
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.16.150:9092");
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    StreamsConfig config = new StreamsConfig(props);

    KStreamBuilder builder = new KStreamBuilder();
    builder.stream("t1").mapValues(value -> {
      Integer i = Integer.parseInt((String)value);
      return String.valueOf(i * i);
    }).to("t2");

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();
  }
}

在例子中StreamsConfig.APPLICATION_ID_CONFIG用於設置當前流處理的ID,具有相同流ID的應用會根據輸入主題的分區來分配任務。當流處理應用的數量大於主題的分區數時,超出部分的流處理不會被分配任何消息。

kafka (搜索) 在idea api操作(官方apihttp://kafka.apache.org/documentation/#producerapi)