HadoopConsumer——消費kafka中若干topic的訊息,追加儲存至hdfs的不同檔案內
在kafka原始碼提供的hadoopconsumer的基礎上進行開發,該程式可消費多個topic的訊息,追加至hdfs檔案中。
本程式的輸入輸出檔案有:
配置檔案:topics.properties,指定要消費的topic列表,broker列表,以及程式被呼叫的時 間間隔Interval、每次消費的上限limit、以及三個輸入輸出檔案的父目錄
Offset.dat:輸入檔案,儲存leader broker以及offset,根目錄:input
Message.txt:輸出檔案,將消費的消費追加寫入,根目錄:outputfilepath
Output:輸出資料夾,mapreduce的輸出資料夾,不使用
DateGenerator
初始化操作,為每一個topic的每一個partition建立相關的資料夾以及檔案,僅呼叫一次:
1) 建立offset.dat儲存topic、partition、leader broker IP、Port、以及offset值;
offset初始值為-1,之後儲存上一次消費過的offset,下次hadoop-consumer被呼叫則繼 續消費;
2) 為map-reduce的輸出建立資料夾,他儲存的檔名預設為part-00000;
3) 程式實現了直接向hdfs的檔案追加訊息的功能,所以還需建立進行追加的檔案message.txt;
增加的功能:
首先依據配置檔案獲取topic
建立offset.dat時要先獲取該partition此刻的leader,然後寫入檔案;
KafkatoHDFSMain
增加的功能:
使用執行緒池,依據topic的個數建立執行緒,一個topic開啟一個執行緒去消費訊息;
每個執行緒內的run函式裡面sleep一定時間間隔後,無限次地呼叫SimpleKafkaETLJob的excute方法,不斷地消費訊息;
SimpleKafkaETLJob
建構函式中需指定topic,然後為該topic的每一個分割槽建立一個hadoop的job,消費訊息並存儲;
之後就是一個個job在執行;
runningJob.isSuccessful()會判斷此job是否執行成功,如果不成功,會輸出資訊;
在KafkaETLJob的createJobConf()函式中給JobConf增加一項配置:partition_id,原來只有topic;
KafkaETLRecordReader
通過KafkaETLInputFormat BytesWritable獲取offset.dat檔案的記錄作為輸入,消費指定topic、指定partition、指定offset的訊息,然後先寫入本地檔案,之後將檔案追加至message.txt
增加的功能:
Next()函式消費訊息,通過limit指定消費的資料量;
消費完之後輸出消費的總量System.out.println("_readBytes"+_readBytes);
在本地新建檔案,將context中得到訊息的ArrayList一條一條寫入其中,之後將整個檔案追加至hdfs的message.txt檔案;
而不是之前的一條一條訊息追加;
最後重寫offset.dat檔案,記錄本次消費過的offset值,便於下次繼續消費;
KafkaETLContext
使用simpleconsumer的API,獲取offset值,實時獲取leader broker,一條一條消費訊息,並存儲至ArrayList中。
增加的功能:
the_last_offset:建立一個變數,記錄上一次被呼叫消費到的offset值;
message_list:消費的訊息先儲存於此,之後將list中的訊息寫入本地檔案,再批量追加至hdfs;
獲取此刻該partition的leader broker,使用新的leader建立request和consumer,開始消費;
_limit = props.getInt("kafka.request.limit", -1);
_offset = _offsetRange[0];
if((_offsetRange[0]+_limit) < _offsetRange[1])
the_last_offset = _offsetRange[0]+_limit;
else
the_last_offset = _offsetRange[1];
如果訊息量大於limit,則只消費limit條資料,如果不足,則全部消費完;
容錯處理:
1) 獲取offset值getLastOffset:
if (response.hasError()),表明有錯,列印錯誤,返回-1,
2) 獲取offset,決定本次消費的範圍getOffsetRange:
range[0]:最早的訊息的offset,應該是0,
range[1]:此刻為止最新訊息的offset,
由於我們是接著上一次繼續消費,所以基本不會使用range[0],而是消費offset.dat中的offset和range[1]之間的訊息;所以最重要的是防止range[1]的值出錯;
所以:if (range[1] == -1) ,說明offset值有誤,那麼再次呼叫getLastOffset函式,重新嘗試一次;
一般情況下不會,因為建立context物件,在建構函式中,實時獲取了此刻的leader,之後就呼叫getoffset,所以短時間內應該不會變化,或有異常;
3) fetchMore:
增加了容錯,
_response = _consumer.fetch(fetchRequest);
/* simpleconsumer自己沒有處理leader broker壞掉的情況,所以需要自己處理 */
if (_response.hasError()) {
Offset無效的處理,選擇最多消費一次(出現錯誤則丟掉部分資料,從最新的開始消費),或者至少消費一次(從offset.dat檔案中的offset處繼續消費,可能重複消費)的機制:
4) Get()獲取每一條訊息
//add to message list
message_list.add(new String(messagebytes, "UTF-8") + "\n");
新增換行符,加入message list中;
總結:
至此,此程式可以消費任意多個topic的訊息,並追加至以topic+partition命名的hdfs檔案內。