HBase資料遷移到Kafka實戰
1.概述
在實際的應用場景中,資料儲存在HBase叢集中,但是由於一些特殊的原因,需要將資料從HBase遷移到Kafka。正常情況下,一般都是源資料到Kafka,再有消費者處理資料,將資料寫入HBase。但是,如果逆向處理,如何將HBase的資料遷移到Kafka呢?今天筆者就給大家來分享一下具體的實現流程。
2.內容
一般業務場景如下,資料來源頭產生資料,進入Kafka,然後由消費者(如Flink、Spark、Kafka API)處理資料後進入到HBase。這是一個很典型的實時處理流程。流程圖如下:
上述這類實時處理流程,處理資料都比較容易,畢竟資料流向是順序處理的。但是,如果將這個流程逆向,那麼就會遇到一些問題。
2.1 海量資料
HBase的分散式特性,叢集的橫向拓展,HBase中的資料往往都是百億、千億級別,或者數量級更大。這類級別的資料,對於這類逆向資料流的場景,會有個很麻煩的問題,那就是取數問題。如何將這海量資料從HBase中取出來?
2.2 沒有資料分割槽
我們知道HBase做資料Get或者List<Get>很快,也比較容易。而它又沒有類似Hive這類資料倉庫分割槽的概念,不能提供某段時間內的資料。如果要提取最近一週的資料,可能全表掃描,通過過濾時間戳來獲取一週的資料。數量小的時候,可能問題不大,而資料量很大的時候,全表去掃描HBase很困難。
3.解決思路
對於這類逆向資料流程,如何處理。其實,我們可以利用HBase Get和List<Get>的特性來實現。因為HBase通過RowKey來構建了一級索引,對於RowKey級別的取數,速度是很快的。實現流程細節如下:
資料流程如上圖所示,下面筆者為大家來剖析每個流程的實現細節,以及注意事項。
3.1 Rowkey抽取
我們知道HBase針對Rowkey取數做了一級索引,所以我們可以利用這個特性來展開。我們可以將海量資料中的Rowkey從HBase表中抽取,然後按照我們制定的抽取規則和儲存規則將抽取的Rowkey儲存到HDFS上。
這裡需要注意一個問題,那就是關於HBase Rowkey的抽取,海量資料級別的Rowkey抽取,建議採用MapReduce來實現。這個得益於HBase提供了TableMapReduceUtil類來實現,通過MapReduce任務,將HBase中的Rowkey在map階段按照指定的時間範圍進行過濾,在reduce階段將rowkey拆分為多個檔案,最後儲存到HDFS上。
這裡可能會有同學有疑問,都用MapReduce抽取Rowkey了,為啥不直接在掃描處理列簇下的列資料呢?這裡,我們在啟動MapReduce任務的時候,Scan HBase的資料時只過濾Rowkey(利用FirstKeyOnlyFilter來實現),不對列簇資料做處理,這樣會快很多。對HBase RegionServer的壓力也會小很多。
Row | Column |
row001 | info:name |
row001 | info:age |
row001 | info:sex |
row001 | info:sn |
這裡舉個例子,比如上表中的資料,其實我們只需要取出Rowkey(row001)。但是,實際業務資料中,HBase表描述一條資料可能有很多特徵屬性(例如姓名、性別、年齡、身份證等等),可能有些業務資料一個列簇下有十幾個特徵,但是他們卻只有一個Rowkey,我們也只需要這一個Rowkey。那麼,我們使用FirstKeyOnlyFilter來實現就很合適了。
/** * A filter that will only return the first KV from each row. * <p> * This filter can be used to more efficiently perform row count operations. */
這個是FirstKeyOnlyFilter的一段功能描述,它用於返回第一條KV資料,官方其實用它來做計數使用,這裡我們稍加改進,把FirstKeyOnlyFilter用來做抽取Rowkey。
3.2 Rowkey生成
抽取的Rowkey如何生成,這裡可能根據實際的數量級來確認Reduce個數。建議生成Rowkey檔案時,切合實際的資料量來算Reduce的個數。儘量不用為了使用方便就一個HDFS檔案,這樣後面不好維護。舉個例子,比如HBase表有100GB,我們可以拆分為100個檔案。
3.3 資料處理
在步驟1中,按照抽取規則和儲存規則,將資料從HBase中通過MapReduce抽取Rowkey並存儲到HDFS上。然後,我們在通過MapReduce任務讀取HDFS上的Rowkey檔案,通過List<Get>的方式去HBase中獲取資料。拆解細節如下:
Map階段,我們從HDFS讀取Rowkey的資料檔案,然後通過批量Get的方式從HBase取數,然後組裝資料傳送到Reduce階段。在Reduce階段,獲取來自Map階段的資料,寫資料到Kafka,通過Kafka生產者回調函式,獲取寫入Kafka狀態資訊,根據狀態資訊判斷資料是否寫入成功。如果成功,記錄成功的Rowkey到HDFS,便於統計成功的進度;如果失敗,記錄失敗的Rowkey到HDFS,便於統計失敗的進度。
3.4 失敗重跑
通過MapReduce任務寫資料到Kafka中,可能會有失敗的情況,對於失敗的情況,我們只需要記錄Rowkey到HDFS上,當任務執行完成後,再去程式檢查HDFS上是否存在失敗的Rowkey檔案,如果存在,那麼再次啟動步驟3,即讀取HDFS上失敗的Rowkey檔案,然後再List<Get> HBase中的資料,進行資料處理後,最後再寫Kafka,以此類推,直到HDFS上失敗的Rowkey處理完成為止。
4.實現程式碼
這裡實現的程式碼量也並不複雜,下面提供一個虛擬碼,可以在此基礎上進行改造(例如Rowkey的抽取、MapReduce讀取Rowkey並批量Get HBase表,然後在寫入Kafka等)。示例程式碼如下:
public class MRROW2HDFS { public static void main(String[] args) throws Exception { Configuration config = HBaseConfiguration.create(); // HBase Config info Job job = Job.getInstance(config, "MRROW2HDFS"); job.setJarByClass(MRROW2HDFS.class); job.setReducerClass(ROWReducer.class); String hbaseTableName = "hbase_tbl_name"; Scan scan = new Scan(); scan.setCaching(1000); scan.setCacheBlocks(false); scan.setFilter(new FirstKeyOnlyFilter()); TableMapReduceUtil.initTableMapperJob(hbaseTableName, scan, ROWMapper.class, Text.class, Text.class, job); FileOutputFormat.setOutputPath(job, new Path("/tmp/rowkey.list")); // input you storage rowkey hdfs path System.exit(job.waitForCompletion(true) ? 0 : 1); } public static class ROWMapper extends TableMapper<Text, Text> { @Override protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context) throws IOException, InterruptedException { for (Cell cell : value.rawCells()) { // Filter date range // context.write(...); } } } public static class ROWReducer extends Reducer<Text,Text,Text,Text>{ private Text result = new Text(); @Override protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException { for(Text val:values){ result.set(val); context.write(key, result); } } } }
5.總結
整個逆向資料處理流程,並不算複雜,實現也是很基本的MapReduce邏輯,沒有太複雜的邏輯處理。在處理的過程中,需要幾個細節問題,Rowkey生成到HDFS上時,可能存在行位空格的情況,在讀取HDFS上Rowkey檔案去List<Get>時,最好對每條資料做個過濾空格處理。另外,就是對於成功處理Rowkey和失敗處理Rowkey的記錄,這樣便於任務失敗重跑和資料對賬。可以知曉資料遷移進度和完成情況。同時,我們可以使用Kafka Eagle監控工具來檢視Kafka寫入進度。
6.結束語
這篇部落格就和大家分享到這裡,如果大家在研究學習的過程當中有什麼問題,可以加群進行討論或傳送郵件給我,我會盡我所能為您解答,與君共勉!
另外,博主出書了《Kafka並不難學》和《Hadoop大資料探勘從入門到進階實戰》,喜歡的朋友或同學, 可以在公告欄那裡點選購買連結購買博主的書進行學習,在此感謝大家的支援。關注下面公眾號,根據提示,可免費獲取書籍的教學視