1. 程式人生 > >Kafka Producer 傳送資料

Kafka Producer 傳送資料

Kafka Producer 傳送資料

(1)生產者概覽

(1)不同的應用場景對訊息有不同的需求,即是否允許訊息丟失、重複、延遲以及吞吐量的要求。不同場景對Kafka生產者的API使用和配置會有直接的影響。
(2)Kafka傳送訊息的主要步驟
訊息格式:每個訊息是一個ProducerRecord物件,必須指定訊息所屬的Topic和訊息值Value,此外還可以指定訊息所屬的Partition以及訊息的Key。

1:序列化ProducerRecord有多個構造器,這裡使用了三個引數的,topic、key、value

2:如果ProducerRecord中指定了Partition,則Partitioner不做任何事情;否則,Partitioner根據訊息的key得到一個Partition。這是生產者就知道向哪個Topic下的哪個Partition傳送這條訊息。

3:訊息被新增到相應的batch中,獨立的執行緒將這些batch傳送到Broker上

4:broker收到訊息會返回一個響應。如果訊息成功寫入Kafka,則返回RecordMetaData物件,該物件包含了Topic資訊、Patition資訊、訊息在Partition中的Offset資訊;若失敗,返回一個錯誤在這裡插入圖片描述
(3)Kafka的順序保證。Kafka保證同一個partition中的訊息是有序的,即如果生產者按照一定的順序傳送訊息,broker就會按照這個順序把他們寫入partition,消費者也會按照相同的順序讀取他們。

(2)建立Kafka生產者

Properties props = new Properties();
props.put(“bootstrap.servers”, “broker:port”);
props.put(“acks”, “all”);// acks=0 配置適用於實現非常高的吞吐量 , acks=all 這是最安全的模式
//傳送到同一個partition的訊息會被先儲存在batch中,該引數指定一個batch可以使用的記憶體大小,單位是 byte。不一定需要等到batch被填滿才能傳送
props.put(“batch.size”, 16384);// 預設16384=16KB

//生產者在傳送訊息前等待linger.ms,從而等待更多的訊息加入到batch中。如果batch被填滿或者linger.ms達到上限,就把batch中的訊息傳送出去
props.put(“linger.ms”, 1);// 當linger.ms>0時,延時性會增加,但會提高吞吐量,因為會減少訊息傳送頻率
// props.put(“client.id”, 1);//用於標識傳送訊息的客戶端,通常用於日誌和效能指標以及配額
props.put(“buffer.memory”, 33554432);// 32MB
// Snappy壓縮技術是Google開發的,它可以在提供較好的壓縮比的同時,減少對CPU的使用率並保證好的效能,所以建議在同時考慮效能和頻寬的情況下使用。
// Gzip壓縮技術通常會使用更多的CPU和時間,但會產生更好的壓縮比,所以建議在網路頻寬更受限制的情況下使用
props.put(“compression.type”, “Gzip”);// 預設不壓縮,該引數可以設定成snappy、gzip或lz4對傳送給broker的訊息進行壓縮
// 預設值為0,當設定為大於零的值,客戶端會重新發送任何傳送失敗的訊息。
// 注意,此重試與客戶端收到錯誤時重新發送訊息是沒有區別的。
// 在配置max.in.flight.requests.per.connection不等於1的情況下,允許重試可能會改變訊息的順序
// 因為如果兩個批次的訊息被髮送到同一個分割槽,第一批訊息傳送失敗但第二批成功,而第一批訊息會被重新發送,則第二批訊息會先被寫入。
props.put(“retries”, 1);
props.put(“max.in.flight.requests.per.connection”, 2);// 生產者在收到伺服器響應之前可以傳送的訊息個數
props.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
props.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
Producer<String, String> producer = new KafkaProducer<String, String>(props);

Kafka的生產者有如下三個必選的屬性:

(1)bootstrap.servers,指定broker的地址清單

(2)key.serializer必須是一個實現org.apache.kafka.common.serialization.Serializer介面的類,將key序列化成位元組陣列。注意:key.serializer必須被設定,即使訊息中沒有指定key。

(3)value.serializer,將value序列化成位元組陣列

(3)傳送訊息到Kafka

1)同步傳送訊息

ProducerRecord<String, String> record = new ProducerRecord<>(“Kafka”, “Kafka_Products”, “測試”);//Topic Key Value
try{
Future future = producer.send(record);
future.get();//不關心是否傳送成功,則不需要這行。
} catch(Exception e) {
e.printStackTrace();//連線錯誤、No Leader錯誤都可以通過重試解決;訊息太大這類錯誤kafkaProducer不會進行任何重試,直接丟擲異常
}
或者
for (int i = 0; i < 100; i++) {
//ProducerRecord有多個構造器,這裡使用了三個引數的,topic、key、value。
producer.send(new ProducerRecord<String, String>(“topic”, Integer.toString(i), Integer.toString(i)));
}
producer.close();

2)非同步傳送訊息

ProducerRecord<String, String> record = new ProducerRecord<>(“Kafka”, “Kafka_Products”, “測試”);//Topic Key Value
producer.send(record, new DemoProducerCallback());//傳送訊息時,傳遞一個回撥物件,該回調物件必須實現org.apahce.kafka.clients.producer.Callback介面
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {//如果Kafka返回一個錯誤,onCompletion方法丟擲一個non null異常。
e.printStackTrace();//對異常進行一些處理,這裡只是簡單打印出來
}
}
}

(4)生產者的配置

上面我們只配置了bootstrap.servers和序列化類,其實生產者還有很多配置,上面只是使用了預設值。下面來看下這些配置引數。

acks

acks控制多少個副本必須寫入訊息後生產者才能認為寫入成功,這個引數對訊息丟失可能性有很大影響。這個引數有三種取值:

acks=0:生產者把訊息傳送到broker即認為成功,不等待broker的處理結果。這種方式的吞吐最高,但也是最容易丟失訊息的。
acks=1:生產者會在該分割槽的群首(leader)寫入訊息並返回成功後,認為訊息傳送成功。如果群首寫入訊息失敗,生產者會收到錯誤響應並進行重試。這種方式能夠一定程度避免訊息丟失,但如果群首宕機時該訊息沒有複製到其他副本,那麼該訊息還是會丟失。另外,如果我們使用同步方式來發送,延遲會比前一種方式大大增加(至少增加一個網路往返時間);如果使用非同步方式,應用感知不到延遲,吞吐量則會受非同步正在傳送中的數量限制。
acks=all:生產者會等待所有副本成功寫入該訊息,這種方式是最安全的,能夠保證訊息不丟失,但是延遲也是最大的。

buffer.memory

這個引數設定生產者緩衝傳送的訊息的記憶體大小,如果應用呼叫send方法的速度大於生產者傳送的速度,那麼呼叫會阻塞或者丟擲異常,具體行為取決於block.on.buffer.full(這個引數在0.9.0.0版本被max.block.ms代替,允許丟擲異常前等待一定時間)引數。

compresstion.type

預設情況下訊息是不壓縮的,這個引數可以指定使用訊息壓縮,引數可以取值為snappy、gzip或者lz4。snappy壓縮演算法由Google研發,這種演算法在效能和壓縮比取得比較好的平衡;相比之下,gzip消耗更多的CPU資源,但是壓縮效果也是最好的。通過使用壓縮,我們可以節省網路頻寬和Kafka儲存成本。

retries

當生產者傳送訊息收到一個可恢復異常時,會進行重試,這個引數指定了重試的次數。在實際情況中,這個引數需要結合retry.backoff.ms(重試等待間隔)來使用,建議總的重試時間比叢集重新選舉群首的時間長,這樣可以避免生產者過早結束重試導致失敗。

batch.size

當多條訊息傳送到一個分割槽時,生產者會進行批量傳送,這個引數指定了批量訊息的大小上限(以位元組為單位)。當批量訊息達到這個大小時,生產者會一起傳送到broker;但即使沒有達到這個大小,生產者也會有定時機制來發送訊息,避免訊息延遲過大。

linger.ms

這個引數指定生產者在傳送批量訊息前等待的時間,當設定此引數後,即便沒有達到批量訊息的指定大小,到達時間後生產者也會發送批量訊息到broker。預設情況下,生產者的傳送訊息執行緒只要空閒了就會發送訊息,即便只有一條訊息。設定這個引數後,傳送執行緒會等待一定的時間,這樣可以批量傳送訊息增加吞吐量,但同時也會增加延遲。

client.id

這個引數可以是任意字串,它是broker用來識別訊息是來自哪個客戶端的。在broker進行列印日誌、衡量指標或者配額限制時會用到。

max.in.flight.requests.per.connection

這個引數指定生產者可以傳送多少訊息到broker並且等待響應,設定此引數較高的值可以提高吞吐量,但同時也會增加記憶體消耗。另外,如果設定過高反而會降低吞吐量,因為批量訊息效率降低。設定為1,可以保證傳送到broker的順序和呼叫send方法順序一致,即便出現失敗重試的情況也是如此。

timeout.ms, request.timeout.ms, metadata.fetch.timeout.ms

這些引數控制生產者等待broker的響應時間。request.timeout.ms指定傳送資料的等待響應時間,metadata.fetch.timeout.ms指定獲取元資料(例如獲取分割槽的群首資訊)的等待響應時間。timeout.ms則指定broker在返回結果前等待其他副本(與acks引數相關)響應的時間,如果時間到了但其他副本沒有響應結果,則返回訊息寫入失敗。

max.block.ms

這個引數指定應用呼叫send方法或者獲取元資料方法(例如partitionFor)時的阻塞時間,超過此時間則丟擲timeout異常。

max.request.size

這個引數限制生產者傳送資料包的大小,資料包的大小與訊息的大小、訊息數相關。如果我們指定了最大資料包大小為1M,那麼最大的訊息大小為1M,或者能夠最多批量傳送1000條訊息大小為1K的訊息。另外,broker也有message.max.bytes引數來控制接收的資料包大小。在實際中,建議這些引數值是匹配的,避免生產者傳送了超過broker限定的資料大小。

receive.buffer.bytes, send.buffer.bytes

這兩個引數設定用來發送/接收資料的TCP連線的緩衝區,如果設定為-1則使用作業系統自身的預設值。如果生產者與broker在不同的資料中心,建議提高這個值,因為不同資料中心往往延遲比較大。

最後討論下順序保證。Kafka保證分割槽的順序,也就是說,如果生產者以一定的順序傳送訊息到Kafka的某個分割槽,那麼Kafka在分割槽內部保持此順序,而且消費者也按照同樣的順序消費。但是,應用呼叫send方法的順序和實際傳送訊息的順序不一定是一致的。舉個例子,如果retries引數不為0,而max.in.flights.requests.per.session引數大於1,那麼有可能第一個批量訊息寫入失敗,但是第二個批量訊息寫入成功,然後第一個批量訊息重試寫入成功,那麼這個順序亂序的。因此,如果需要保證訊息順序,建議設定max.in.flights.requests.per.session為1,這樣可以在第一個批量訊息傳送失敗重試時,第二個批量訊息需要等待。

(5)序列化器

上面提到了Kafka自帶的序列化類,現在來看下如何使用其他的序列化策略

自定義序列化

帶尺寸的
如果我們傳送的訊息不是整數或者字串時,我們需要自定義序列化策略或者使用通用的Avro、Thrift或者Protobuf這些序列化方案。下面來看下如何使用自定義的序列化方案,以及存在的問題。

假如我們要傳送的訊息物件是這麼一個Customer:

class Custom {
    private int customID;
    private String customerName;
    
    public Custom(int customID, String customerName) {
        super();
        this.customID = customID;
        this.customerName = customerName;
    }

    public int getCustomID() {
        return customID;
    }

    public String getCustomerName() {
        return customerName;
    }
}

那麼,自定義的序列化類實現樣例如下:

import org.apache.kafka.common.errors.SerializationException;
import java.nio.ByteBuffer;
import java.util.Map;

public class CustomerSerializer implements Serializer<Customer> {
    @Override
    public void configure(Map configs, boolean isKey) {
        // nothing to configure
    }

    @Override
    /**
     We are serializing Customer as:
     4 byte int representing customerId
     4 byte int representing length of customerName in UTF-8 bytes (0 if name is
     Null)
     N bytes representing customerName in UTF-8
     */
    public byte[] serialize(String topic, Customer data) {
        try {
            byte[] serializedName;
            int stringSize;
            if (data == null)
                return null;
            else {
                if (data.getName() != null) {
                    serializeName = data.getName().getBytes("UTF-8");
                    stringSize = serializedName.length;
                } else {
                    serializedName = new byte[0];
                    stringSize = 0;
                }
            }
            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
            buffer.putInt(data.getID());
            buffer.putInt(stringSize);
            buffer.put(serializedName);
            return buffer.array();
        } catch (Exception e) {
            throw new SerializationException("Error when serializing Customer to byte[] " + e);
        } 
    }

    @Override
    public void close() {
        // nothing to close
    } 
}

我們將Customer的ID和名字進行了序列化,通過這個序列化物件,我們可以傳送Customer的物件訊息。但這樣的序列化存在很多問題,比如想要將ID升級為Long型或者增加新的Customer域時,我們需要相容新老訊息。尤其是公司內多個團隊同時消費Customer資料時,他們需要同時修改程式碼進行相容。

因此,建議使用JSON、Apache Avro、Thrift或者Protobuf這些成熟的序列化/反序列化方案。下面來看下如何使用Avro來進行序列化
使用Avro序列化
Apache Avro是一個語言無關的序列化方案,使用Avro的資料使用語言無關的結構來描述,例如JSON。Avro一般序列化成位元組檔案,當然也可以序列化成JSON形式。Kafka使用Avro的好處是,當寫入訊息需要升級協議時,消費者讀取訊息可以不受影響。

//例如,原始的協議可能是這樣的:
{
  "namespace": "customerManagement.avro",
  "type": "record",
  "name": "Customer",
  "fields": [
      {"name": "id", "type": "int"},
      {"name": "name",  "type": "string""},
      {"name": "faxNumber", "type": ["null", "string"], "default": "null"}
  ] 
}

生成Avro物件傳送到Kafka

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", schemaUrl);

String topic = "customerContacts";
int wait = 500;
Producer<String, Customer> producer = new KafkaProducer<String, Customer>(props);

// We keep producing new events until someone ctrl-c
while (true) {
    Customer customer = CustomerGenerator.getNext();
    System.out.println("Generated customer " + customer.toString());
    ProducerRecord<String, Customer> record = new ProducerRecord<>(topic, customer.getId(), customer);
    producer.send(record);//將customer作為訊息的值傳送出去,KafkaAvroSerializer會處理剩下的事情
}

其中,
我們使用KafkaAvroSerializer來序列化物件,注意它可以處理原子型別,上面程式碼中使用其來序列化訊息的key。
schema.registry.url引數指定了註冊中心的地址,我們將資料的結構儲存在該註冊中心。
Customer是生成的物件,生產者傳送的訊息型別也為Customer。
我們建立一個包含Customer的記錄併發送,序列化的工作由KafkaAvroSerializer來完成。
當然,我們也可以使用通用的Avrod物件而不是生成的Avro物件,對於這種情況,我們需要提供資料的結構:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", schemaUrl);

String schemaString = "{\"namespace\": \"customerManagement.avro\",
                        \"type\": \"record\", " +
                        "\"name\": \"Customer\"," +
                        "\"fields\": [" +
                            "{\"name\": \"id\", \"type\": \"int\"}," +
                             "{\"name\": \"name\", \"type\": \"string\"}," +
                             "{\"name\": \"email\", \"type\": [\"null\",\"string\"], \"default\":\"null\" }" +
                       "]}";

Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(schemaString);

for (int nCustomers = 0; nCustomers < customers; nCustomers++) {
    String name = "exampleCustomer" + nCustomers;
    String email = "example " + nCustomers + "@example.com";
    GenericRecord customer = new GenericData.Record(schema);
    customer.put("id", nCustomer);
    customer.put("name", name);
    customer.put("email", email);
   
    ProducerRecord<String, GenericRecord> data = new ProducerRecord<String,GenericRecord>("customerContacts",name, customer);
    
    producer.send(data);
}

分割槽

我們建立訊息的時候,必須要提供主題和訊息的內容,而訊息的key是可選的,當不指定key時預設為null。訊息的key有兩個重要的作用:1)提供描述訊息的額外資訊;2)用來決定訊息寫入到哪個分割槽,所有具有相同key的訊息會分配到同一個分割槽中。

如果key為null,那麼生產者會使用預設的分配器,該分配器使用輪詢(round-robin)演算法來將訊息均衡到所有分割槽。

如果key不為null而且使用的是預設的分配器,那麼生產者會對key進行雜湊並根據結果將訊息分配到特定的分割槽。注意的是,在計算訊息與分割槽的對映關係時,使用的是全部的分割槽數而不僅僅是可用的分割槽數。這也意味著,如果某個分割槽不可用(雖然使用複製方案的話這極少發生),而訊息剛好被分配到該分割槽,那麼將會寫入失敗。另外,如果需要增加額外的分割槽,那麼訊息與分割槽的對映關係將會發生改變,因此儘量避免這種情況。

自定義分配器

現在來看下如何自定義一個分配器,下面將key為Banana的訊息單獨放在一個分割槽,與其他的訊息進行分割槽隔離:

public class BananaPartitioner implements Partitioner {
    public void configure(Map<String, ?> configs) {}
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if ((keyBytes == null) || (!(key instanceOf String)))
        throw new InvalidRecordException("We expect all messages to have customer name as key")
    if (((String) key).equals("Banana"))
        return numPartitions; // Banana will always go to last partition
   
     // Other records will get hashed to the rest of the partitions
    return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))
    }
    
    public void close() {}
 
}