1. 程式人生 > >logstash 消費資料到kafka異常

logstash 消費資料到kafka異常

報錯 :[logstash.outputs.kafka ] Sending batch to Kafka failed. Will retry after a delay. {:batch_size=>1, :failures=>1, :sleep=>0.1}

原因: logstash 日誌報錯生產資料到 kafka 失敗

解決辦法:

         檢視kafka配置,預設單條訊息最大為1M,當單條訊息長度超過1M時,就會出現傳送到broker失敗,從而導致訊息在producer的佇列中一直累積,直到撐爆生產者的記憶體。

於是趕緊修改kafka配置,解決問題。主要修改步驟如下:

         1.修改kafka的broker配置:message.max.bytes(預設:1000000B),這個引數表示單條訊息的最大長度。在使用kafka的時候,應該預估單條訊息的最大長度,不然導致傳送失敗。

       2.修改kafka的broker配置:replica.fetch.max.bytes (預設: 1MB),broker可複製的訊息的最大位元組數。這個值應該比message.max.bytes大,否則broker會接收此訊息,但無法將此訊息複製出去,從而造成資料丟失。

      3.修改消費者程式端配置:fetch.message.max.bytes (預設 1MB) – 消費者能讀取的最大訊息。這個值應該大於或等於message.max.bytes。如果不調節這個引數,就會導致消費者無法消費到訊息,並且不會爆出異常或者警告,導致訊息在broker中累積,此處要注意。

      根據需要,調整上述三個引數的大小。但是否引數調節得越大越好,或者說單條訊息越大越好呢?參考http://www.mamicode.com/info-detail-453907.html的說法:

      1.從效能上考慮:通過效能測試,kafka在訊息為10K時吞吐量達到最大,更大的訊息會降低吞吐量,在設計叢集的容量時,尤其要考慮這點。

      2.可用的記憶體和分割槽數:Brokers會為每個分割槽分配replica.fetch.max.bytes引數指定的記憶體空間,假設replica.fetch.max.bytes=1M,且有1000個分割槽,則需要差不多1G的記憶體,確保 分割槽數最大的訊息不會超過伺服器的記憶體,否則會報OOM錯誤。同樣地,消費端的fetch.message.max.bytes指定了最大訊息需要的記憶體空間,同樣,分割槽數最大需要記憶體空間 不能超過伺服器的記憶體。所以,如果你有大的訊息要傳送,則在記憶體一定的情況下,只能使用較少的分割槽數或者使用更大記憶體的伺服器。

     3.垃圾回收:更大的訊息會讓GC的時間更長(因為broker需要分配更大的塊),隨時關注GC的日誌和伺服器的日誌資訊。如果長時間的GC導致kafka丟失了zookeeper的會話,則需要配置zookeeper.session.timeout.ms引數為更大的超時時間。