Flume RegexHbaseEventSerializer自定義rowKey
String rowKey = String.format("%s-%s-%s", cal.getTimeInMillis(), randomKey, nonce.getAndIncrement());
如果要自定義rowkey,修改原始碼是唯一的辦法,RegexHbaseEventSerializer.java就是我們要修改的檔案。我們可以新建一個類來繼承,原始檔案不要去修改。不過今天我測試的時候是直接修改原始檔的。
我的需要的rowkey是:B13612145#1529637655#3#1530085147015#54
String rowKey = String.format("%s#%s#%s#%s#%s", machineNo, fileTimeStamp , fileNo , cal.getTimeInMillis(), nonce.getAndIncrement());
前3個欄位都在檔名中,這就意味著,我必須解析檔名獲得這3個欄位,那麼配置檔案必須新增header:
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /data/flume/r1/data
a1.sources.r1.batchSize = 100
#a1.sources.r1.fileHeader = true
a1.sources.r1.basenameHeader = true
a1.sources.r1.channels = c1
OK,來整理一下思路,我需要解析檔名來獲取3個欄位作為rowkey組成部分,那麼配置需要新增header,然後把原始碼自動生成rowkey的規則替換成我們自己的規則,就是這麼簡單。
1. 新建解析檔名的方法:
public String splitFileName() {
for (Map.Entry<String, String> entry : headers.entrySet()) {
return entry.getValue();
}
return null;
}
既然是要解析檔名,很顯然要知道怎麼獲取檔名,從程式碼可以知道headers.entrySet就是獲取header的方法,因為我就一個header,所以一次迴圈就return結果。
2. 替換預設rowkey生成規則
protected byte[] getRowKey(Calendar cal) { /* * NOTE: This key generation strategy has the following properties: * * 1) Within a single JVM, the same row key will never be duplicated. 2) * Amongst any two JVM's operating at different time periods (according * to their respective clocks), the same row key will never be * duplicated. 3) Amongst any two JVM's operating concurrently * (according to their respective clocks), the odds of duplicating a * row-key are non-zero but infinitesimal. This would require * simultaneous collision in (a) the timestamp (b) the respective nonce * and (c) the random string. The string is necessary since (a) and (b) * could collide if a fleet of Flume agents are restarted in tandem. * * Row-key uniqueness is important because conflicting row-keys will * cause data loss. */ this.fileName = splitFileName(); this.machineNo = fileName.split("_")[1]; this.fileTimeStamp = fileName.split("_")[2]; this.fileNo = fileName.split("_")[3].split("\\.")[0]; String rowKey = String.format("%s#%s#%s#%s#%s", machineNo, fileTimeStamp , fileNo , cal.getTimeInMillis(), nonce.getAndIncrement()); //String rowKey = String.format("%s-%s-%s", cal.getTimeInMillis(), randomKey, nonce.getAndIncrement()); return rowKey.getBytes(charset); }
註釋掉的那行就是預設的規則,新的是我自己要的規則。
就這樣完成了,打個包替換之前的包,消費一個檔案來測試,結果正如我們所期望的:
B13612145#1529637655#3#1530085147016#55 column=cf:cnc_exeprgname, timestamp=1530085147229, value=418
B13612145#1529637655#3#1530085147016#55 column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0
B13612145#1529637655#3#1530085147016#55 column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=7,7,93,0
B13612145#1529637655#3#1530085147016#55 column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3
B13612145#1529637655#3#1530085147016#55 column=cf:ext_toolno, timestamp=1530085147229, value=30
B13612145#1529637655#3#1530085147017#56 column=cf:cnc_exeprgname, timestamp=1530085147229, value=418
B13612145#1529637655#3#1530085147017#56 column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0
B13612145#1529637655#3#1530085147017#56 column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=6,11,92,0
B13612145#1529637655#3#1530085147017#56 column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3
B13612145#1529637655#3#1530085147017#56 column=cf:ext_toolno, timestamp=1530085147229, value=30
B13612145#1529637655#3#1530085147018#57 column=cf:cnc_exeprgname, timestamp=1530085147229, value=418
B13612145#1529637655#3#1530085147018#57 column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0
B13612145#1529637655#3#1530085147018#57 column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=6,7,93,0
B13612145#1529637655#3#1530085147018#57 column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3
B13612145#1529637655#3#1530085147018#57 column=cf:ext_toolno, timestamp=1530085147229, value=30
B13612145#1529637655#3#1530085147018#58 column=cf:cnc_exeprgname, timestamp=1530085147229, value=418
B13612145#1529637655#3#1530085147018#58 column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0
B13612145#1529637655#3#1530085147018#58 column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=5,8,93,0
B13612145#1529637655#3#1530085147018#58 column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3
B13612145#1529637655#3#1530085147018#58 column=cf:ext_toolno, timestamp=1530085147229, value=30
B13612145#1529637655#3#1530085147019#59 column=cf:cnc_exeprgname, timestamp=1530085147229, value=418
B13612145#1529637655#3#1530085147019#59 column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0
B13612145#1529637655#3#1530085147019#59 column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=7,8,93,0
B13612145#1529637655#3#1530085147019#59 column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3
B13612145#1529637655#3#1530085147019#59 column=cf:ext_toolno, timestamp=1530085147229, value=30
今天測試的時候碰到2個問題:
1. 消費檔案有幾次出現檔名已經修改為.COMPLETE,但是我HBASE資料沒有任何增加,而且沒有報任何錯誤,。給我的感覺就是沒有消費。測試了幾次,都是如此,很是困惑,後來突然想起來之前有人提到過如果一個很大的檔案需要放到spooldir目錄會發生錯誤,因為檔案一進去就會消費,但是檔案又在拷貝過程。後來我改成先把原始檔名新增.COMPLETE,拷貝完成之後,再修改檔名去掉.COMPLETE.
2. 時間衝突
rowkey的規則裡有時間,我有一個檔案60行資料,消費之後只有48條,因為之前我同過spark 消費也出現過這個問題,因此很容易知道這是因為rowkey衝突了,導致資料覆蓋了,因此把原始檔的nonce.getAndIncrement()加到ROWKEY即可。
簡單說就是迴圈的過程cal.getTimeInMillis()這個玩意會可能重複,很多人覺得微秒級別不應該出現重複,事實上我碰到過2次,因此現在對通過時間作為rowkey格外小心。