1. 程式人生 > >HadoopConsumer——消費kafka中若干topic的訊息,追加儲存至hdfs的不同檔案內

HadoopConsumer——消費kafka中若干topic的訊息,追加儲存至hdfs的不同檔案內

在kafka原始碼提供的hadoopconsumer的基礎上進行開發,該程式可消費多個topic的訊息,追加至hdfs檔案中。

本程式的輸入輸出檔案有:

配置檔案:topics.properties,指定要消費的topic列表,broker列表,以及程式被呼叫的時     間間隔Interval、每次消費的上限limit、以及三個輸入輸出檔案的父目錄

Offset.dat:輸入檔案,儲存leader broker以及offset,根目錄:input

Message.txt:輸出檔案,將消費的消費追加寫入,根目錄:outputfilepath

Output:輸出資料夾,mapreduce的輸出資料夾,不使用

DateGenerator

初始化操作,為每一個topic的每一個partition建立相關的資料夾以及檔案,僅呼叫一次:

1) 建立offset.dat儲存topicpartitionleader broker IPPort、以及offset值;

offset初始值為-1,之後儲存上一次消費過的offset,下次hadoop-consumer被呼叫則繼 續消費;

2) 為map-reduce的輸出建立資料夾,他儲存的檔名預設為part-00000

3) 程式實現了直接向hdfs的檔案追加訊息的功能,所以還需建立進行追加的檔案message.txt

增加的功能:

首先依據配置檔案獲取topic

列表,以及broker列表,然後獲取每個topic的分割槽數,為每個分割槽建立以上的三個檔案(資料夾);

建立offset.dat時要先獲取該partition此刻的leader,然後寫入檔案;

KafkatoHDFSMain

增加的功能:

使用執行緒池,依據topic的個數建立執行緒,一個topic開啟一個執行緒去消費訊息;

每個執行緒內的run函式裡面sleep一定時間間隔後,無限次地呼叫SimpleKafkaETLJobexcute方法,不斷地消費訊息;

SimpleKafkaETLJob

建構函式中需指定topic,然後為該topic每一個分割槽建立一個hadoopjob,消費訊息並存儲;

之後就是一個個job在執行;

runningJob.isSuccessful()會判斷此job是否執行成功,如果不成功,會輸出資訊;

KafkaETLJobcreateJobConf()函式中給JobConf增加一項配置:partition_id,原來只有topic

KafkaETLRecordReader

通過KafkaETLInputFormat BytesWritable獲取offset.dat檔案的記錄作為輸入,消費指定topic、指定partition、指定offset的訊息,然後先寫入本地檔案,之後將檔案追加至message.txt

增加的功能:

Next()函式消費訊息,通過limit指定消費的資料量;

消費完之後輸出消費的總量System.out.println("_readBytes"+_readBytes);

在本地新建檔案,將context中得到訊息的ArrayList一條一條寫入其中,之後將整個檔案追加至hdfsmessage.txt檔案;

而不是之前的一條一條訊息追加;

最後重寫offset.dat檔案,記錄本次消費過的offset值,便於下次繼續消費;

KafkaETLContext

使用simpleconsumerAPI,獲取offset值,實時獲取leader broker,一條一條消費訊息,並存儲至ArrayList中。

增加的功能:

the_last_offset:建立一個變數,記錄上一次被呼叫消費到的offset值;

message_list:消費的訊息先儲存於此,之後將list中的訊息寫入本地檔案,再批量追加至hdfs

獲取此刻該partitionleader broker,使用新的leader建立requestconsumer,開始消費;

_limit = props.getInt("kafka.request.limit", -1);

_offset = _offsetRange[0];

if((_offsetRange[0]+_limit) < _offsetRange[1])

the_last_offset = _offsetRange[0]+_limit;

else

the_last_offset = _offsetRange[1];

如果訊息量大於limit,則只消費limit條資料,如果不足,則全部消費完;

容錯處理:

1) 獲取offsetgetLastOffset

if (response.hasError()),表明有錯,列印錯誤,返回-1

2) 獲取offset,決定本次消費的範圍getOffsetRange

range[0]:最早的訊息的offset,應該是0

range[1]:此刻為止最新訊息的offset

由於我們是接著上一次繼續消費,所以基本不會使用range[0],而是消費offset.dat中的offsetrange[1]之間的訊息;所以最重要的是防止range[1]的值出錯;

所以:if (range[1] == -1) ,說明offset值有誤,那麼再次呼叫getLastOffset函式,重新嘗試一次;

一般情況下不會,因為建立context物件,在建構函式中,實時獲取了此刻的leader,之後就呼叫getoffset,所以短時間內應該不會變化,或有異常;

3) fetchMore

增加了容錯,

_response = _consumer.fetch(fetchRequest);

/* simpleconsumer自己沒有處理leader broker壞掉的情況,所以需要自己處理 */

if (_response.hasError()) {

Offset無效的處理,選擇最多消費一次(出現錯誤則丟掉部分資料,從最新的開始消費),或者至少消費一次(從offset.dat檔案中的offset處繼續消費,可能重複消費)的機制:

4) Get()獲取每一條訊息

//add to message list

message_list.add(new String(messagebytes, "UTF-8") + "\n");

新增換行符,加入message list中;

總結:

至此,此程式可以消費任意多個topic的訊息,並追加至以topic+partition命名的hdfs檔案內。