1. 程式人生 > 實用技巧 >Kafka學習筆記之Kakfa異常分析-Magic v0 does not support record headers

Kafka學習筆記之Kakfa異常分析-Magic v0 does not support record headers

0x00 概述

最近測試跟我說,某個應用消費不到交易的訊息。登入到Kafka Broker看下了下日誌,發現一直在報錯:

java.lang.IllegalArgumentException: Magic v0 does not support record headers
    at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
    at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:
568) at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117) at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98) at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$
1$1$$anonfun$apply$5.apply(KafkaApis.scala:523) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521) at scala.Option.map(Option.scala:146) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$
1$1.apply(KafkaApis.scala:521) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511) at scala.Option.flatMap(Option.scala:171) at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:558) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:579) at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196) at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2012) at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:578) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:598) at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196) at kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188) at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:597) at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614) at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614) at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:639) at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606) at kafka.server.KafkaApis.handle(KafkaApis.scala:98) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66) at java.lang.Thread.run(Thread.java:748)
問了下相關開發人員,發現最近有個版本需要在Kafka資訊的Headers中增加LogId來做交易跟蹤,結合錯誤資訊中提示消費者Api版本太低,不支援header資訊,導致出錯,讓開發人員去掉header後,消費者可以正常消費訊息

0x01模擬重現

1.1 Kafka版本:0.11.0

生產者程式碼:

​寫了個攔截器,為每條訊息的header中新增LOG_ID

public  class KafkaProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {

    @Override
    public void configure(Map<String, ?> configs) {}


    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {}

    @Override
    public void close() {}

    @Override
    public ProducerRecord onSend(ProducerRecord<K, V> record) {
        String uuid = UUID.randomUUID().toString();
        record.headers().add("LOG_ID",uuid.getBytes());
        return record;
    }

}

public class App 
{
    public static void main( String[] args ) throws InterruptedException
    {
        Properties props = new Properties();
        // broker地址
        props.put("bootstrap.servers", "localhost:9092");

        // 請求時候需要驗證
        props.put("acks", "all");

        // 請求失敗時候需要重試
        props.put("retries", 1);

        // 記憶體快取區大小
        props.put("buffer.memory", 33554432);

        // 指定訊息key序列化方式
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 指定訊息本身的序列化方式
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        
        List<String> interceptors = new ArrayList<>();
        interceptors.add("com.df.KafkaTest.KafkaProducerInterceptor"); 
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
        Producer<String, String> producer = new KafkaProducer<>(props);
        
        for (int i = 10; i < 20; i++)
            try {
                
                AccessInfo ai = new AccessInfo();
                ai.setAccessId("123456");
                ai.setAccessName("原始碼婆媳"+Integer.toString(i));
                ai.setBusScope("01");
                ai.setIconUrl("http://www.baidu.com");
                ProducerRecord record = new ProducerRecord<>("testTopic",0,"H"+Integer.toString(i),JSON.toJSONString(ai));
                producer.send(record).get();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }

        System.out.println("Message sent successfully");
        producer.close();
    }
}

消費者程式碼

使用高版本Api的客戶端:

public class KafkaConsumerAsync {
 
    public static void main(String[] args) throws InterruptedException {
 
        // 1、準備配置檔案
        String kafkas = "127.0.0.1:9092";
        Properties props = new Properties();
        //kafka連線資訊
        props.put("bootstrap.servers",kafkas);
        //消費者組id
        props.put("group.id", "testTopic-002");
        //是否自動提交offset
        props.put("enable.auto.commit", "false");
        //在沒有offset的情況下采取的拉取策略
        props.put("auto.offset.reset", "earliest");
        //自動提交時間間隔
        props.put("auto.commit.interval.ms", "1000");
        //設定一次fetch請求取得的資料最大為1k
        props.put("fetch.max.bytes", "1024");
        //key反序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //value反序列化
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 
        String topic = "testTopic";
        // 2、建立KafkaConsumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 3、訂閱資料,不給定監聽器
        consumer.subscribe(Collections.singleton(topic));
 
        try{
            //最少處理100條
            int minCommitSize = 100;
            //定義計數器
            int icount = 0;
            // 4、獲取資料
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("topic = %s,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),record.offset(), record.key(), record.value());
                    icount++;
                }
            
                //在業務邏輯處理成功後提交offset
                if(icount >= minCommitSize){
                    //滿足最少消費100條,再進行非同步提交
                    consumer.commitAsync(new OffsetCommitCallback() {
                        @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                            if(exception == null){
                                System.out.println("commit success");
                            }else {
                                //提交失敗,對應處理
                                System.out.println("commit failed");
                            }
                        }
                    });
                    
                    //計數器歸零
                    icount = 0 ;
                }
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            //關閉連線
            consumer.close();
        }
    }
}

執行結果:

[28 10:20:08,923 INFO ] [main] internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=testTopic-002] Discovered group coordinator xxxxx.com:9092 (id: 2147483647 rack: null)
[28 10:20:08,931 INFO ] [main] internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=testTopic-002] Revoking previously assigned partitions []
[28 10:20:08,931 INFO ] [main] internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=testTopic-002] (Re-)joining group
[28 10:20:08,958 INFO ] [main] internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=testTopic-002] Successfully joined group with generation 1
[28 10:20:08,960 INFO ] [main] internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=testTopic-002] Setting newly assigned partitions [testTopic-0]
[28 10:20:08,978 INFO ] [main] internals.Fetcher - [Consumer clientId=consumer-1, groupId=testTopic-002] Resetting offset for partition testTopic-0 to offset 0.
topic = testTopic,partition = 0,offset = 0, key = H10, value = {"accessId":"123456","accessName":"原始碼婆媳10","busScope":"01","iconUrl":"http://www.baidu.com"}
topic = testTopic,partition = 0,offset = 1, key = H11, value = {"accessId":"123456","accessName":"原始碼婆媳11","busScope":"01","iconUrl":"http://www.baidu.com"}
topic = testTopic,partition = 0,offset = 2, key = H12, value = {"accessId":"123456","accessName":"原始碼婆媳12","busScope":"01","iconUrl":"http://www.baidu.com"}
topic = testTopic,partition = 0,offset = 3, key = H13, value = {"accessId":"123456","accessName":"原始碼婆媳13","busScope":"01","iconUrl":"http://www.baidu.com"}
topic = testTopic,partition = 0,offset = 4, key = H14, value = {"accessId":"123456","accessName":"原始碼婆媳14","busScope":"01","iconUrl":"http://www.baidu.com"}
topic = testTopic,partition = 0,offset = 5, key = H15, value = {"accessId":"123456","accessName":"原始碼婆媳15","busScope":"01","iconUrl":"http://www.baidu.com"}
topic = testTopic,partition = 0,offset = 6, key = H16, value = {"accessId":"123456","accessName":"原始碼婆媳16","busScope":"01","iconUrl":"http://www.baidu.com"}
topic = testTopic,partition = 0,offset = 7, key = H17, value = {"accessId":"123456","accessName":"原始碼婆媳17","busScope":"01","iconUrl":"http://www.baidu.com"}
topic = testTopic,partition = 0,offset = 8, key = H18, value = {"accessId":"123456","accessName":"原始碼婆媳18","busScope":"01","iconUrl":"http://www.baidu.com"}
topic = testTopic,partition = 0,offset = 9, key = H19, value = {"accessId":"123456","accessName":"原始碼婆媳19","busScope":"01","iconUrl":"http://www.baidu.com"}

使用低版本(0.8.22)客戶端程式碼

public class SimpleConsumerExample {
 
    private static kafka.javaapi.consumer.ConsumerConnector consumer;
 
    public static void consume() {
 
        Properties props = new Properties();
        // zookeeper 配置
        props.put("zookeeper.connect", "127.0.0.1:2181");
 
        // group 代表一個消費組
        props.put("group.id", "jd-group");
 
        // zk連線超時
        props.put("zookeeper.session.timeout.ms", "4000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        // 序列化類
        props.put("serializer.class", "kafka.serializer.StringEncoder");
 
        ConsumerConfig config = new ConsumerConfig(props);
 
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
 
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put("testTopic", new Integer(1));
 
        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
 
        Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,
                keyDecoder, valueDecoder);
        KafkaStream<String, String> stream = consumerMap.get("testTopic").get(0);
        ConsumerIterator<String, String> it = stream.iterator();
        while (it.hasNext())
            System.out.println(it.next().message());
    }
 
    public static void main(String[] args) {
        consume();
    }
}

消費者執行後,一直消費不到訊息

[28 09:51:41,590 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Connected to xxxxxx.xx.com:9092 for producing
[28 09:51:41,591 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Disconnecting from xxxxxx.xx.com:9092
[28 09:51:41,592 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1566957087042] Added fetcher for partitions ArrayBuffer([[testTopic,0], initOffset 0 to broker id:0,host:xxxxxx.xx.com,port:9092] )
[28 09:51:41,798 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Verifying properties
[28 09:51:41,798 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property client.id is overridden to jd-group
[28 09:51:41,798 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property metadata.broker.list is overridden to xxxxxx.xx.com:9092
[28 09:51:41,798 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property request.timeout.ms is overridden to 30000
[28 09:51:41,798 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] client.ClientUtils$ - Fetching metadata from broker id:0,host:xxxxxx.xx.com,port:9092 with correlation id 65 for 1 topic(s) Set(testTopic)
[28 09:51:41,799 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Connected to xxxxxx.xx.com:9092 for producing
[28 09:51:41,824 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Disconnecting from xxxxxx.xx.com:9092
[28 09:51:41,825 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1566957087042] Added fetcher for partitions ArrayBuffer([[testTopic,0], initOffset 0 to broker id:0,host:xxxxxx.xx.com,port:9092] )
[28 09:51:42,032 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Verifying properties
[28 09:51:42,032 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property client.id is overridden to jd-group
[28 09:51:42,032 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property metadata.broker.list is overridden to xxxxxx.xx.com:9092
[28 09:51:42,032 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property request.timeout.ms is overridden to 30000
[28 09:51:42,033 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] client.ClientUtils$ - Fetching metadata from broker id:0,host:xxxxxx.xx.com,port:9092 with correlation id 66 for 1 topic(s) Set(testTopic)
[28 09:51:42,035 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Connected to xxxxxx.xx.com:9092 for producing
[28 09:51:42,041 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Disconnecting from xxxxxx.xx.com:9092
[28 09:51:42,041 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1566957087042] Added fetcher for partitions ArrayBuffer([[testTopic,0], initOffset 0 to broker id:0,host:xxxxxx.xx.com,port:9092] )
[28 09:51:42,251 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Verifying properties
[28 09:51:42,252 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property client.id is overridden to jd-group
[28 09:51:42,252 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property metadata.broker.list is overridden to xxxxxx.xx.com:9092
[28 09:51:42,252 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property request.timeout.ms is overridden to 30000
[28 09:51:42,253 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] client.ClientUtils$ - Fetching metadata from broker id:0,host:xxxxxx.xx.com,port:9092 with correlation id 67 for 1 topic(s) Set(testTopic)
[28 09:51:42,254 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Connected to xxxxxx.xx.com:9092 for producing

Kafka Broker的server.log一直在刷錯誤日誌:

[2019-08-28 09:51:42,045] ERROR [KafkaApi-0] Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,topics=[{topic=testTopic,partitions=[{partition=0,fetch_offset=0,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v0 does not support record headers
    at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
    at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568)
    at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117)
    at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98)
    at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
    at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523)
    at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521)
    at scala.Option.map(Option.scala:146)
    at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521)
    at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511)
    at scala.Option.flatMap(Option.scala:171)
    at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511)
    at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559)
    at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:558)
    at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:579)
    at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
    at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2012)
    at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:578)
    at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:598)
    at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
    at kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188)
    at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:597)
    at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
    at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
    at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:639)
    at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606)
    at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
    at java.lang.Thread.run(Thread.java:748)
通過錯誤日誌資訊結合原始碼,我們發現,在Broker拉取到Kakfa訊息後,呼叫fetchResponseCallback回撥方法,建立返回資訊時,會校驗消費者Api版本,如果低於當前Broker版本與向下轉換訊息
      def fetchResponseCallback(bandwidthThrottleTimeMs: Int) {
        def createResponse(requestThrottleTimeMs: Int): RequestChannel.Response = {
          val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]
          fetchedPartitionData.asScala.foreach { case (tp, partitionData) =>
            convertedData.put(tp, convertedPartitionData(tp, partitionData))
          }
          val response = new FetchResponse(convertedData, 0)
          val responseStruct = response.toStruct(versionId)

          trace(s"Sending fetch response to client $clientId of ${responseStruct.sizeOf} bytes.")
          response.responseData.asScala.foreach { case (topicPartition, data) =>
            // record the bytes out metrics only when the response is being sent
            brokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes)
          }

          val responseSend = response.toSend(responseStruct, bandwidthThrottleTimeMs + requestThrottleTimeMs,
            request.connectionId, request.header)
          RequestChannel.Response(request, responseSend)
        }

        if (fetchRequest.isFromFollower)
          sendResponseExemptThrottle(createResponse(0))
        else
          sendResponseMaybeThrottle(request, request.header.clientId, requestThrottleMs =>
            requestChannel.sendResponse(createResponse(requestThrottleMs)))
      }



    def convertedPartitionData(tp: TopicPartition, data: FetchResponse.PartitionData) = {


      replicaManager.getMagic(tp).flatMap { magic =>
        val downConvertMagic = {
          if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0))
            Some(RecordBatch.MAGIC_VALUE_V0)
          else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1))
            Some(RecordBatch.MAGIC_VALUE_V1)
          else
            None
        }

        downConvertMagic.map { magic =>
          trace(s"Down converting records from partition $tp to message format version $magic for fetch request from $clientId")
          //在這裡會對訊息進行向下轉換
          val converted = data.records.downConvert(magic, fetchRequest.fetchData.get(tp).fetchOffset)
          new FetchResponse.PartitionData(data.error, data.highWatermark, FetchResponse.INVALID_LAST_STABLE_OFFSET,
            data.logStartOffset, data.abortedTransactions, converted)
        }

      }.getOrElse(data)
    }
對訊息轉換,最後會呼叫MemoryRecordsBuilder的appendWithOffset,在此方法中做一些呼叫,如果呼叫不通過就會丟擲異常,Magic v0 does not support record headers就是在此方法中丟擲的。因為Magic v0 和Magic v1版本的訊息格式中,不支援header
private Long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
                                  ByteBuffer value, Header[] headers) {
        try {
            if (isControlRecord != isControlBatch)
                throw new IllegalArgumentException("Control records can only be appended to control batches");

            if (lastOffset != null && offset <= lastOffset)
                throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s " +
                        "(Offsets must increase monotonically).", offset, lastOffset));

            if (timestamp < 0 && timestamp != RecordBatch.NO_TIMESTAMP)
                throw new IllegalArgumentException("Invalid negative timestamp " + timestamp);

            if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0)
                throw new IllegalArgumentException("Magic v" + magic + " does not support record headers");

            if (baseTimestamp == null)
                baseTimestamp = timestamp;

            if (magic > RecordBatch.MAGIC_VALUE_V1) {
                appendDefaultRecord(offset, timestamp, key, value, headers);
                return null;
            } else {
                return appendLegacyRecord(offset, timestamp, key, value);
            }
        } catch (IOException e) {
            throw new KafkaException("I/O exception when writing to the append stream, closing", e);
        }
    }

1.2 Broker的Kafka版本1.0

執行低版本消費者:
[28 14:26:23,068 INFO ] [jd-group_xx-1566973572731-a5b3105a-leader-finder-thread] consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1566973572960] Added fetcher for partitions ArrayBuffer([[testTopic,0], initOffset -1 to broker id:0,host:xx.xx.com,port:9092] )
{"accessId":"123456","accessName":"原始碼婆媳10","busScope":"01","iconUrl":"http://www.baidu.com"}
{"accessId":"123456","accessName":"原始碼婆媳11","busScope":"01","iconUrl":"http://www.baidu.com"}
{"accessId":"123456","accessName":"原始碼婆媳12","busScope":"01","iconUrl":"http://www.baidu.com"}
{"accessId":"123456","accessName":"原始碼婆媳13","busScope":"01","iconUrl":"http://www.baidu.com"}
{"accessId":"123456","accessName":"原始碼婆媳14","busScope":"01","iconUrl":"http://www.baidu.com"}
{"accessId":"123456","accessName":"原始碼婆媳15","busScope":"01","iconUrl":"http://www.baidu.com"}
{"accessId":"123456","accessName":"原始碼婆媳16","busScope":"01","iconUrl":"http://www.baidu.com"}
{"accessId":"123456","accessName":"原始碼婆媳17","busScope":"01","iconUrl":"http://www.baidu.com"}
{"accessId":"123456","accessName":"原始碼婆媳18","busScope":"01","iconUrl":"http://www.baidu.com"}
{"accessId":"123456","accessName":"原始碼婆媳19","busScope":"01","iconUrl":"http://www.baidu.com"}

很奇怪,怎麼升級下Broker版本就可以正常消費訊息了呢?不是說好的v0版本訊息格式不支援header嗎?

看了下1.0版本的原始碼,發現在做訊息向下轉換的時候呼叫的不是MemoryRecordsBuilder,而是RecordsUtil的convertRecordBatch,當發現v0或v1版本時,直接忽略header資訊,這樣消費者就能正常訊息訊息了

    private static MemoryRecordsBuilder convertRecordBatch(byte magic, ByteBuffer buffer, RecordBatchAndRecords recordBatchAndRecords) {
        RecordBatch batch = recordBatchAndRecords.batch;
        final TimestampType timestampType = batch.timestampType();
        long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ? batch.maxTimestamp() : RecordBatch.NO_TIMESTAMP;

        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, batch.compressionType(),
                timestampType, recordBatchAndRecords.baseOffset, logAppendTime);
        for (Record record : recordBatchAndRecords.records) {
            // Down-convert this record. Ignore headers when down-converting to V0 and V1 since they are not supported
            if (magic > RecordBatch.MAGIC_VALUE_V1)
                builder.append(record);
            else
                builder.appendWithOffset(record.offset(), record.timestamp(), record.key(), record.value());
        }

        builder.close();
        return builder;
    }

0x02 總結

​ 通過之前的分析,解決上面的錯誤有三種方法

1)升級消費端Api版本,升級到新版本後,支援header

2)升級Broker版本,1.0以上版本,向下轉換時,會忽略到header資訊

3)最後一種方式,也很簡單,那就是生產者不增加header資訊。因為我們專案中LOG-ID暫時不是必須的,我們選擇了此種方式,等消費者端版本全部升級之後,再新增header資訊

0x03 轉載

https://www.jianshu.com/p/80ca3ade8fb2

https://zhuanlan.zhihu.com/p/205676507?utm_source=wechat_session