1. 程式人生 > 實用技巧 >如何使用kafka傳送順序訊息

如何使用kafka傳送順序訊息

正常情況下單個producer寫入kafka單個分割槽的資料是有序的

producer配置示例:

    Properties props = new Properties();
       // props.put("bootstrap.servers", "10.4.4.91:9092,10.4.4.92:9092,10.4.4.93:9092");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");//
"acks" props.put(ProducerConfig.RETRIES_CONFIG, 120000);//retries props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);//"batch.size" props.put(ProducerConfig.LINGER_MS_CONFIG, 0);//"linger.ms" props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);//"buffer.memory" props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);//
"request.timeout.ms" props.put(ProducerConfig.CLIENT_ID_CONFIG,"drtest");//"client.id" props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"gzip");//"compression.type" props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//"key.serializer"
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// "value.serializer" props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");//max.in.flight.requests.per.connection
Producer producer = new KafkaProducer<>(props);
Producer<String,String> producer = new KafkaProducer<>(props);
RecordMetadata recordMetadata = producer.send(new ProducerRecord<>("test",
"testV")).get();
...

其中有一個配置引數max.in.flight.requests.per.connection是幹嘛的呢

max.in.flight.requests.per.connection預設配置為5

props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");//max.in.flight.requests.per.connection

kafkaproducer內部維護了一個傳送快取區,具體在RecordAccumulator類下

使用者呼叫producer的send方法時會通過accumulator的append方法寫入記憶體中

public final class RecordAccumulator {

...
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
...

kafkaProducer初始化時:
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.totalMemorySize,
this.compressionType,
config.getLong(ProducerConfig.LINGER_MS_CONFIG),
retryBackoffMs,
metrics,
time,
apiVersions,
transactionManager);

}

初始化KafkaProducer時會例項化Sender並用到max.in.flight.requests.per.connection

guaranteeMessageOrder屬性標誌了此producer對broker傳送資料是否是順序性的

Sender類實現了Runnable介面

public class Sender implements Runnable 
....
this
.sender = new Sender(logContext, client, this.metadata, this.accumulator, maxInflightRequests == 1, //guaranteeMessageOrder屬性配置 config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), acks, retries, metricsRegistry.senderMetrics, Time.SYSTEM, this.requestTimeoutMs, config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), this.transactionManager, apiVersions);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
....

Sender啟動時會去讀取accumulator中儲存的資料並判斷對應的客戶端是否準備好,未準備好的會從此次已準備好的集合中移除

其中會通過

InFlightRequests物件的
public boolean canSendMore(String node) {
        Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
      //能繼續傳送資料的條件之一為佇列大小小於max.in.flight.requests.per.connection配置的值
return queue == null || queue.isEmpty() || (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection); }

final class InFlightRequests {

    private final int maxInFlightRequestsPerConnection;//此項配置即為max.in.flight.requests.per.connection
    private final Map<String, Deque<NetworkClient.InFlightRequest>> requests = new HashMap<>();
    /** Thread safe total number of in flight requests. */
    private final AtomicInteger inFlightRequestCount = new AtomicInteger(0);

移除未準備好的節點

// remove any nodes we aren't ready to send to
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
            }
        }

Client進行資料傳送前會構造InFlightRequest物件加入InFlightRequests的requests對映表

NetworkClient的doSend方法:

Send send = request.toSend(destination, header);
        InFlightRequest inFlightRequest = new InFlightRequest(
                clientRequest,
                header,
                isInternalRequest,
                request,
                send,
                now);
        this.inFlightRequests.add(inFlightRequest);
        selector.send(send);

max.in.flight.requests.per.connection引數決定了在客戶端收到伺服器響應前能傳送的訊息個數,如果設定為1可以保證訊息在失敗重試後依然是有序的,設定的值大於1如果傳送失敗造成重試,producer內部會進行重排序;值設定過大會佔用較多記憶體

如果要保證訊息嚴格有序:

1.產生有序訊息的生產客戶端需要指定topic分割槽

  -topic設定為單分割槽

  -使用kafka提供的客戶端指定分割槽

  -使用自定義分割槽策略類實現Partitioner介面

  -指定key

2.max.in.flight.requests.per.connection引數設定為1

吞吐量影響

1.使用順序訊息對吞吐量影響較大,可以將多條訊息合併後作為一條訊息提交

訊息大小可根據實際環境調優進行配置,也可將資料在生產點序列化,在消費端反序列化減少傳輸量