1. 程式人生 > >spark運算結果寫入hbase及優化

spark運算結果寫入hbase及優化

Spark中利用map-reduce或者spark sql分析了資料之後,我們需要將結果寫入外部檔案系統。

本文,以向Hbase中寫資料,為例,說一下,Spark怎麼向Hbase中寫資料。

首先,需要說一下,下面的這個方法。

foreach (func)

最通用的輸出操作,把func作用於從map-reduce生成的每一個RDD(spark  sql生成的DataFrame可轉成RDD)

注意:這個函式是在執行spark程式的driver程序中執行的。

下面跟著思路,看一下,怎麼優雅的向Hbase中寫入資料

向外部資料庫寫資料,通常會建立連線,使用連線傳送資料(也就是儲存資料)

<pre name="code" class="java"><span style="font-size:18px;color:#003300;">DataFrame dataFrame = Contexts.hiveContext.sql("select * from tableName");
dataFrame.javaRDD().foreach(new VoidFunction<Row> () {
    public void call(Row row) {
        HConnection conn = ...
        HTableInterface htable = conn.getTable("");
        //save to hbase
    }
});</span>




很遺憾!這種寫法是有極大風險的,這會導致,對於每條資料,都建立一個connection(建立connection是消耗資源的)

事實上,由於資料是分割槽的,基於這個特性,還可以有更高效的方式

下面的方法會好一些:

<span style="font-size:18px;color:#003300;">DataFrame dataFrame = Contexts.hiveContext.sql("select * from tableName");
dataFrame.javaRDD().foreachPartition(new VoidFunction<Iterator<Row>> () {
    public void call(Iterator<Row> rows) {
        HConnection conn = ...
        HTableInterface htable = conn.getTable("");
        while(rows.hasNext()){
            //save to hbase
        }
    }
});</span>


上面的方法,使用 rdd.foreachPartition建立一個connection物件,一個RDD分割槽中的所有資料,都使用這一個connection

在多個RDD之間,connection物件是可以重用的,所以可以建立一個連線池。如下:

<span style="font-size:18px;color:#003300;">dataFrame.javaRDD().foreachPartition(new VoidFunction<Iterator<Row>> () {
    public void call(Iterator<Row> rows) {
        HTableInterface htable = TablePool.getHTable("");
        while(rows.hasNext()){
            //save to hbase
        }
    }
});</span>

注意:連線池中的連線應該是,應需求而延遲建立,並且,如果一段時間沒用,就超時了(也就是關閉該連線)。