1. 程式人生 > >Kafka consumer處理大訊息資料問題

Kafka consumer處理大訊息資料問題

案例分析

處理kafka consumer的程式的時候,發現如下錯誤:

ERROR [2017-01-12 07:16:02,466] com.flow.kafka.consumer.main.KafkaConsumer: Unexpected Error Occurred
! kafka.common.MessageSizeTooLargeException: Found a message larger than the maximum fetch size of this consumer on topic codeTopic partition 3 at fetch offset 94. Increase the fetch size, or decrease the maximum message size the broker will allow.
! at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:91) ~[pip-kafka-consumer.jar:na]
! at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) ~[pip-kafka-consumer.jar:na]
! at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) ~[pip-kafka-consumer.jar:na]
! at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) ~[pip-kafka-consumer.jar:na]
! at com.flow.kafka.consumer.main.KafkaConsumer$KafkaRiverFetcher.run(KafkaConsumer.java:291) ~[original-pip-kafka-consumer.jar:na]
! at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_51]
! at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_51]
! at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]

如上log可以看出,問題就是有一個較大的訊息資料在codeTopic的partition 3上,然後consumer未能消費,提示我可以減小broker允許進入的訊息資料的大小,或者增大consumer程式消費資料的大小。

從log上來看一目瞭然,如果要解決當前問題的話,

  1. 減小broker訊息體大小(設定message.max.bytes引數);
  2. 增大consumer獲取資料資訊大小(設定fetch.message.max.bytes引數)。預設broker訊息體大小為1000000位元組即為1M大小。

消費者方面:fetch.message.max.bytes——>這將決定消費者可以獲取的資料大小。
broker方面:replica.fetch.max.bytes——>這將允許broker的副本傳送訊息在叢集並確保訊息被正確地複製。如果這是太小,則訊息不會被複制,因此,消費者永遠不會看到的訊息,因為訊息永遠不會承諾(完全複製)。
broker方面:message.max.bytes——>可以接受資料生產者最大訊息資料大小。

由我的場景來看較大的訊息體已經進入到了kafka,我這裡要解決這個問題,只需要增加consumer的fetch.message.max.bytes數值就好。我單獨把那條資料消費出來,寫到一個檔案中發現那條訊息大小為1.5M左右,為了避免再次發生這種問題我把consumer程式的fetch.message.max.bytes引數調節為了3072000即為3M,重啟consumer程式,檢視log一切正常,解決這個消費錯誤到此結束,下面介紹一下kafka針對大資料處理的思考。

kafka的設計初衷

Kafka設計的初衷是迅速處理小量的訊息,一般10K大小的訊息吞吐效能最好(可參見LinkedIn的kafka效能測試)。但有時候,我們需要處理更大的訊息,比如XML文件或JSON內容,一個訊息差不多有10-100M,這種情況下,Kakfa應該如何處理?

針對這個問題,可以參考如下建議:

  • 最好的方法是不直接傳送這些大的資料。如果有共享儲存,如NAS, HDFS, S3等,可以把這些大的檔案存放到共享儲存,然後使用Kafka來傳送檔案的位置資訊。

  • 第二個方法是,將大的訊息資料切片或切塊,在生產端將資料切片為10K大小,使用分割槽主鍵確保一個大訊息的所有部分會被髮送到同一個kafka分割槽(這樣每一部分的拆分順序得以保留),如此以來,當消費端使用時會將這些部分重新還原為原始的訊息。

  • 第三,Kafka的生產端可以壓縮訊息,如果原始訊息是XML,當通過壓縮之後,訊息可能會變得不那麼大。在生產端的配置引數中使用compression.codeccommpressed.topics可以開啟壓縮功能,壓縮演算法可以使用GZipSnappy

不過如果上述方法都不是你需要的,而你最終還是希望傳送大的訊息,那麼,則可以在kafka中設定下面一些引數:

broker 配置

message.max.bytes (預設:1000000) – broker能接收訊息的最大位元組數,這個值應該比消費端的fetch.message.max.bytes更小才對,否則broker就會因為消費端無法使用這個訊息而掛起。
log.segment.bytes (預設: 1GB) – kafka資料檔案的大小,確保這個數值大於一個訊息的長度。一般說來使用預設值即可(一般一個訊息很難大於1G,因為這是一個訊息系統,而不是檔案系統)。
replica.fetch.max.bytes (預設: 1MB) – broker可複製的訊息的最大位元組數。這個值應該比message.max.bytes大,否則broker會接收此訊息,但無法將此訊息複製出去,從而造成資料丟失。

Consumer 配置

fetch.message.max.bytes (預設 1MB) – 消費者能讀取的最大訊息。這個值應該大於或等於message.max.bytes。所以,如果你一定要選擇kafka來傳送大的訊息,還有些事項需要考慮。要傳送大的訊息,不是當出現問題之後再來考慮如何解決,而是在一開始設計的時候,就要考慮到大訊息對叢集和主題的影響。

效能: 根據前面提到的效能測試,kafka在訊息為10K時吞吐量達到最大,更大的訊息會降低吞吐量,在設計叢集的容量時,尤其要考慮這點。
可用的記憶體和分割槽數:Brokers會為每個分割槽分配replica.fetch.max.bytes引數指定的記憶體空間,假設replica.fetch.max.bytes=1M,且有1000個分割槽,則需要差不多1G的記憶體,確保 分割槽數最大的訊息不會超過伺服器的記憶體,否則會報OOM錯誤。同樣地,消費端的fetch.message.max.bytes指定了最大訊息需要的記憶體空間,同樣,分割槽數最大需要記憶體空間 不能超過伺服器的記憶體。所以,如果你有大的訊息要傳送,則在記憶體一定的情況下,只能使用較少的分割槽數或者使用更大記憶體的伺服器。
垃圾回收:到現在為止,我在kafka的使用中還沒發現過此問題,但這應該是一個需要考慮的潛在問題。更大的訊息會讓GC的時間更長(因為broker需要分配更大的塊),隨時關注GC的日誌和伺服器的日誌資訊。如果長時間的GC導致kafka丟失了zookeeper的會話,則需要配置zookeeper.session.timeout.ms引數為更大的超時時間。

相關推薦

Kafka consumer處理訊息資料問題

案例分析 處理kafka consumer的程式的時候,發現如下錯誤: ERROR [2017-01-12 07:16:02,466] com.flow.kafka.consumer.main.KafkaConsumer: Unexpected Error Occurred ! kafka.common.Me

kafka實戰 - 處理檔案需要注意的配置引數

概述   kafka配置引數有很多,可以做到高度自定義。但是很多使用者拿到kafka的配置檔案後,基本就是配置一些host,port,id之類的資訊,其他的配置項採用預設配置,就開始使用了。這些預設配置是經過kafka官方團隊經過嚴謹寬泛的測試之後,求到的最優值。在單條資訊很小,大部分場景下都能得到優異的效

kafka實戰 - 處理文件需要註意的配置參數

了解 自定義 等於 副本 lead lar 做到 0.10 新的 概述   kafka配置參數有很多,可以做到高度自定義。但是很多用戶拿到kafka的配置文件後,基本就是配置一些host,port,id之類的信息,其他的配置項采用默認配置,就開始使用了。這些默認配置是經過k

kafka處理超大訊息的一些考慮

    時間:2015-02-01 00:38:26      閱讀:5854      評論:0      收藏:0&n

Spring+Hibernate處理批量資料

原文:http://blog.csdn.net/ye1992/article/details/9291237 關於使用spring+hibernate進行大批量資料的插入和更新,它的效能和使用JDBC  PreparedStatement的batch批量操作以及資料庫的儲

C# 多執行緒+佇列處理批量資料,進而縮短處理時間

public void DealData(){                int pageSize = 200; //建立佇列                         var queue = new MessageQueueManager<Model>

mybatis 處理批量資料。使用mysql的LOAD DATA INFILE

使用mybatis實現,檔案使用的是csv檔案 xml <select id="loadTest"> LOAD DATA INFILE 'E:/load_test.csv' ign

kafka consumer不能消費訊息及其處理辦法

我這裡的Kafka Consumer程式碼是拷貝網上的,就是開一個執行緒監聽kafka topic,一有訊息就處理。開始的程式碼是這樣的: public void kafkaStart() { final String topic = hipchatActi

Kafka consumer在項目中的多線程處理方式

-m all ade exec 線程處理 rop while log 安全 對於KafkaConsumer而言,它不像KafkaProducer,不是線程安全的,狀態是在consumer中維護的,所以實現時要註意多線程的使用,一般有2種使用方法: 1:每個Consume

DKhadoop資料處理平臺監控資料介紹

標題:DKhadoop大資料處理平臺監控資料介紹 2018年國內大資料公司50強榜單排名已經公佈了出來,大快以黑馬之姿闖入50強,並摘得多項桂冠。Hanlp自然語言處理技術也榮膺了“2018中國資料星技術”獎。對這份榜單感興趣的可以找一下看看。本篇承接上一篇《DKM平臺監控引數說明》,繼續就

乾貨!處理資料的技術盤點

資訊科技蓬勃發展,每天都有新產品問世,同時不斷地形成新的趨勢。這種不斷的變化使得資訊科技和軟體專業人員、開發人員、科學家以及投資者都不敢怠慢,並引發了新的職業機會和有意義的工作。然而,競爭是激烈的,與最新的技術和趨勢保持同步是永恆的要求。對於專業人士來說,在全球IT行業中,入行、生存和成長都變得

物件(LOB)、批處理與元資料

1.大物件LOB LOB,即Large Objects(大物件),是用來儲存大量的二進位制和文字資料的一種資料型別(一個LOB欄位可儲存可多達4GB的資料)。分為BLOB 和CLOB。 大文字CLOB: CLOB(Character Large Object) – 用於儲存大量的文字資料

Python環境安裝及資料基本預處理-資料ML樣本集案例實戰

版權宣告:本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。QQ郵箱地址:[email protected],如有任何學術交流,可隨時聯絡。 1 Python環

玩轉MySQL -----處理資料物件

一、大資料物件簡介  1.LOB(Large Object,大型物件)型別的欄位現在用得越來越多了。因為這種型別的欄位,容量大(最多能容納4GB的資料),且一個表中可以有多個這種型別的欄位,很靈活,適用於資料量非常大的業務領域(如圖象、檔案等)。  2.LOB型別分為BLO

Mysql資料庫文字資料處理

資料庫大文字資料處理 目標:把 mp3檔案儲存到資料庫中! 在my.ini中新增如下配置: max_allowed_packet=10485760 1 什麼是大文字資料 所謂大文字資料,就是大的位元組資料,或大的字元資料。標準SQL中提供瞭如下型別來

利用feather快速處理資料

Feather是一個快速、輕量級的儲存框架,可以在應用在pandas的Dataframe資料結構中。 讀寫資料 import feather import pandas as pd def read_csv_feature(file_in): # 讀 f = ope

Hibernate處理Oracle文字資料Clob

近日做了一個專案,頁面上有一個textarea框,資料量比較大,剛開始,沒考慮資料量超大的情況,pojo類中textarea框對應的欄位用string,XX.hbm.xml裡對應的欄位也用string,Oracle資料庫對應欄位用Clob,儲存完全沒問題。直到測試人員直接貼上

利用POI框架的SAX方式處理資料2007版Excel(xlsx)【第2版】

【第1版】地址 針對老早寫的POI處理Excel的大資料讀取問題,看到好多人關注,感覺自己還是更新一版,畢竟雖然是自己備份,但是如果新手能少走彎路,也算欣慰。下面的版本是我的專案迭代過程中個人認為畢竟穩定和健壯的,算作【第2版】吧,裡面修復了【第1版】的很多bug,諸如

Java處理資料小技巧總結

眾所周知,java在處理資料量比較大的時候,載入到記憶體必然會導致記憶體溢位,而在一些資料處理中我們不得不去處理海量資料,在做資料處理中,我們常見的手段是分解,壓縮,並行,臨時檔案等方法。 例如,我們要將資料庫(不論是什麼資料庫)的資料匯出到一個檔案,一般是Ex

MongoTemplate使用Cursor處理數量的資料

對於資料量巨大的情況下,使用mongoTemplate.find()方法返回一個列表,如果不分頁的話恐怕比較麻煩。mongoTemplate提供了另外一種方法,使用遊標逐個獲取資料,同時可以指定只獲取