spark運算結果寫入hbase及優化
阿新 • • 發佈:2019-01-29
在Spark中利用map-reduce或者spark sql分析了資料之後,我們需要將結果寫入外部檔案系統。
本文,以向Hbase中寫資料,為例,說一下,Spark怎麼向Hbase中寫資料。
首先,需要說一下,下面的這個方法。
foreach (func)
最通用的輸出操作,把func作用於從map-reduce生成的每一個RDD(spark sql生成的DataFrame可轉成RDD)。
注意:這個函式是在執行spark程式的driver程序中執行的。
下面跟著思路,看一下,怎麼優雅的向Hbase中寫入資料
向外部資料庫寫資料,通常會建立連線,使用連線傳送資料(也就是儲存資料)。
<pre name="code" class="java"><span style="font-size:18px;color:#003300;">DataFrame dataFrame = Contexts.hiveContext.sql("select * from tableName"); dataFrame.javaRDD().foreach(new VoidFunction<Row> () { public void call(Row row) { HConnection conn = ... HTableInterface htable = conn.getTable(""); //save to hbase } });</span>
很遺憾!這種寫法是有極大風險的,這會導致,對於每條資料,都建立一個connection(建立connection是消耗資源的)。
事實上,由於資料是分割槽的,基於這個特性,還可以有更高效的方式
下面的方法會好一些:
<span style="font-size:18px;color:#003300;">DataFrame dataFrame = Contexts.hiveContext.sql("select * from tableName"); dataFrame.javaRDD().foreachPartition(new VoidFunction<Iterator<Row>> () { public void call(Iterator<Row> rows) { HConnection conn = ... HTableInterface htable = conn.getTable(""); while(rows.hasNext()){ //save to hbase } } });</span>
上面的方法,使用 rdd.foreachPartition建立一個connection物件,一個RDD分割槽中的所有資料,都使用這一個connection。
在多個RDD之間,connection物件是可以重用的,所以可以建立一個連線池。如下:
<span style="font-size:18px;color:#003300;">dataFrame.javaRDD().foreachPartition(new VoidFunction<Iterator<Row>> () { public void call(Iterator<Row> rows) { HTableInterface htable = TablePool.getHTable(""); while(rows.hasNext()){ //save to hbase } } });</span>
注意:連線池中的連線應該是,應需求而延遲建立,並且,如果一段時間沒用,就超時了(也就是關閉該連線)。