1. 程式人生 > >spark批量匯入hbase

spark批量匯入hbase

方法1

使用hbase put方式,這種方式效率不高

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
 
val conf = HBaseConfiguration.create()
val tableName = "/t1"
conf.set(TableInputFormat.INPUT_TABLE, tableName)
 
val myTable = new HTable(conf, tableName);
var p = new Put();
p = new Put(new String("row999").getBytes());
p.add("cf".getBytes(), "column_name".getBytes(), new String("value999").getBytes());
myTable.put(p);
myTable.flushCommits();

方法2

先生成hfile檔案再將hfile檔案匯入hbase 效率較高

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
 
val conf = HBaseConfiguration.create()
val tableName = "hao"
val table = new HTable(conf, tableName) 
 
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val job = Job.getInstance(conf)
job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
job.setMapOutputValueClass (classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad (job, table)
 
// Generate 10 sample data:
val num = sc.parallelize(1 to 10)
val rdd = num.map(x=>{
    val kv: KeyValue = new KeyValue(Bytes.toBytes(x), "cf".getBytes(), "c1".getBytes(), "value_xxx".getBytes() )
    (new ImmutableBytesWritable(Bytes.toBytes(x)), kv)
})
 
// Save Hfiles on HDFS 
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf)
 
//Bulk load Hfiles to Hbase
val bulkLoader = new LoadIncrementalHFiles(conf)
bulkLoader.doBulkLoad(new Path("/tmp/xxxx19"), table)

方法3

省去方法2的第三步

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
 
val conf = HBaseConfiguration.create()
val tableName = "hao"
val table = new HTable(conf, tableName) 
 
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val job = Job.getInstance(conf)
job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
job.setMapOutputValueClass (classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad (job, table)
 
// Generate 10 sample data:
val num = sc.parallelize(1 to 10)
val rdd = num.map(x=>{
    val kv: KeyValue = new KeyValue(Bytes.toBytes(x), "cf".getBytes(), "c1".getBytes(), "value_xxx".getBytes() )
    (new ImmutableBytesWritable(Bytes.toBytes(x)), kv)
})
 
// Directly bulk load to Hbase/MapRDB tables.
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())


相關推薦

spark批量匯入hbase

方法1 使用hbase put方式,這種方式效率不高 import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfig

解決mapreduce無法將資料批量匯入hbase的問題

hadoop版本:hadoop-2.6.0-cdh5.5.2 hbase版本:hbase-1.0.0-cdh5.5.2 建立日誌表: import java.io.IOException; import org.apache.hadoop.conf.Configura

HBase利用bulk load批量匯入資料

OneCoder只是一個初學者,記錄的只是自己的一個過程。不足之處還望指導。 看網上說匯入大量資料,用bulk load的方式效率比較高。bulk load可以將固定格式的資料檔案轉換為HFile檔案匯入,當然也可以直接匯入HFile檔案。所以

Hbase批量匯入資料,支援多執行緒同時操作

/** * HBase操作工具類:快取模式多執行緒批量提交作業到hbase * * @Auther: ning.zhang * @Email: [email protected] * @CreateDate: 2018/7/30 */ public c

Spark Hive 匯入資料到 HBase

導讀:          如何使用MapReduce將textfile、rcfile格式的Hive表資料匯入到HBase,上一篇部落格中已經做了介紹 MapReduce Hive 匯入資料到 HBase,MR這種方式有兩個缺點,一是當資料量特別大的時候,執行較慢

使用Spark批量上傳圖片到HBase中並同時使用OpenCV提取SIFT特徵值

最近正在學習利用Spark做影象的分類和檢索實驗,首先需要上傳影象資料(儲存在本地檔案系統中)到HBase中,提取的影象特徵是SIFT,藉助OpenCV庫提取,剛開始是寫一個任務上傳圖片,然後再寫一個任務提取HBase中影象的特徵值,考慮到圖片的序列化和反序列化

Phoenix通過MapReduce對HBase批量匯入資料

在用該方法之前,請確保hadoop、hbase、phoenix已經啟動。 在phoenix的安裝目錄下使用以下程式碼,啟動MR HADOOP_CLASSPATH=/opt/hbase-0.98.

Hbase通過命令將資料批量匯入的方法

拋磚引玉: hbase建表: hbase(main):003:0> create 'people','0' 將提前準備好的資料上傳到hdfs: [hadoop@h71 ~]$ vi people.txt 1,jimmy,25,jiujinshan 2,tina,25

Spark訪問與HBase關聯的Hive表

刪除 sql 也會 影響 ron ble lec lang nbsp 知識點1:創建關聯Hbase的Hive表 知識點2:Spark訪問Hive 知識點3:Spark訪問與Hbase關聯的Hive表 知識點1:創建關聯Hbase的Hive表 兩種方式創建,內部表和外部表

大牛博客!Spark / Hadoop / Kafka / HBase / Storm

logs 梳理 ubuntu http log 9.png kafka 分布 努力     在這裏,非常感謝下面的著名大牛們,一路的幫助和學習,給予了我很大的動力!   有了Hadoop,再次有了Spark,一次又一次,一晚又一晚的努力相伴! HBa

批量匯入資料(Mysql)報MySQL server has gone away 問題的解決方法

問題分析 首先度娘:mysql出現ERROR : (2006, 'MySQL server has gone away') 的問題意思就是指client和MySQL server之間的連結斷開了。    造成這樣的原因一般是sql操作的時間過長,或者是傳送的資料太大(例如使用inser

C++批量匯入檔案

如果這樣做: for (int i = 1; i<= 100; i++) { img = imread(“D:\低解析度截圖重新命名\%d.jpg”, i); imgs.push_back(img); } 看似不報錯,但是如果imshow一下,就會發現根本沒有讀圖。 對於檔案流,和普通

jdbc百萬資料批量匯入,流式讀取

Connection connReader = null; Connection connWrite = null; String url = "jdbc:mysql://XX:3306/XX?" + "user=XX&password=XX&

CAS統一登入認證(13): ldap 批量匯入使用者

cas登入後臺驗證賬號來自ldap,需要批量匯入賬號到ldap,這個問題幾經研究,最後方法如下:  先編寫程式生成ldif格式檔案,然後再在ldapadmin.exe程式或 phpldapadmin網頁中匯入這個ldif檔案 1. 生成的ldif檔案格式: 【空行】 dn: u

使用kettle來根據時間戳或者批次號來批量匯入資料,達到增量的效果。

1、Kettle是一款國外開源的ETL工具,純java編寫,可以在Window、Linux、Unix上執行,資料抽取高效穩定。下載圖形化介面的zip包格式的,直接解壓縮使用即可。安裝部署模式這裡不說了,自己可以根據自己的需求安裝為單機模式或者叢集模式。    Kettle的社群官

C# 批量匯入 SqlBulkCopy

  快速批量匯入方法 SqlBulkCopy 1 public string InsertSqlBulkCopy(DataTable dt, string tableName, ref string errinfo) 2 {

spark examples 匯入idea並測試

記錄下自己使用idea匯入spark examples專案的過程。 spark examples 專案可以給我們提供很多有益的參考,經常看看這些程式碼有助於提高我們寫scala程式碼的水平。 只匯入spark-example專案,其他不管,這個專案使用maven管理依賴,匯入的時候選擇maven. 我並

【轉載】QP02檢驗計劃批量匯入

其他參考: https://archive.sap.com/discussions/thread/3430191 *&---------------------------------------------------------------------* *& PROGRAM

使用solr批量匯入mysql資料庫,以及Unable to read: dataimport.properties等坑

折騰了一下午終於成功了!先放一張成功圖: 成功把mysql的資料新增進去了,我這裡是整合了tomcat9,整合步驟挺麻煩的,百度一大堆! 這裡主要介紹批量匯入資料,這裡有些坑,所以記錄一下: 步驟: 第一: 引入需要的jar包 我這裡把home目錄放外面了,所以核心配置檔案引入jar包需要改一下

Solr批量匯入資料

1、從solr 4.10.3 的dist 下 所需Jar包複製一份到 solr_home的索引庫(db1 是我自己新建的索引庫)下lib下 如果沒有就新建 2、在索引庫(db1)的核心配置檔案(solrconfig.xml)中 新增請求處理器。 <requestHand