spark批量匯入hbase
方法1
使用hbase put方式,這種方式效率不高
import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.HTable; val conf = HBaseConfiguration.create() val tableName = "/t1" conf.set(TableInputFormat.INPUT_TABLE, tableName) val myTable = new HTable(conf, tableName); var p = new Put(); p = new Put(new String("row999").getBytes()); p.add("cf".getBytes(), "column_name".getBytes(), new String("value999").getBytes()); myTable.put(p); myTable.flushCommits();
方法2
先生成hfile檔案再將hfile檔案匯入hbase 效率較高
import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.hbase.KeyValue import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles val conf = HBaseConfiguration.create() val tableName = "hao" val table = new HTable(conf, tableName) conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) val job = Job.getInstance(conf) job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]) job.setMapOutputValueClass (classOf[KeyValue]) HFileOutputFormat.configureIncrementalLoad (job, table) // Generate 10 sample data: val num = sc.parallelize(1 to 10) val rdd = num.map(x=>{ val kv: KeyValue = new KeyValue(Bytes.toBytes(x), "cf".getBytes(), "c1".getBytes(), "value_xxx".getBytes() ) (new ImmutableBytesWritable(Bytes.toBytes(x)), kv) }) // Save Hfiles on HDFS rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf) //Bulk load Hfiles to Hbase val bulkLoader = new LoadIncrementalHFiles(conf) bulkLoader.doBulkLoad(new Path("/tmp/xxxx19"), table)
方法3
省去方法2的第三步
import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.hbase.KeyValue import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles val conf = HBaseConfiguration.create() val tableName = "hao" val table = new HTable(conf, tableName) conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) val job = Job.getInstance(conf) job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]) job.setMapOutputValueClass (classOf[KeyValue]) HFileOutputFormat.configureIncrementalLoad (job, table) // Generate 10 sample data: val num = sc.parallelize(1 to 10) val rdd = num.map(x=>{ val kv: KeyValue = new KeyValue(Bytes.toBytes(x), "cf".getBytes(), "c1".getBytes(), "value_xxx".getBytes() ) (new ImmutableBytesWritable(Bytes.toBytes(x)), kv) }) // Directly bulk load to Hbase/MapRDB tables. rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())
相關推薦
spark批量匯入hbase
方法1 使用hbase put方式,這種方式效率不高 import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfig
解決mapreduce無法將資料批量匯入hbase的問題
hadoop版本:hadoop-2.6.0-cdh5.5.2 hbase版本:hbase-1.0.0-cdh5.5.2 建立日誌表: import java.io.IOException; import org.apache.hadoop.conf.Configura
HBase利用bulk load批量匯入資料
OneCoder只是一個初學者,記錄的只是自己的一個過程。不足之處還望指導。 看網上說匯入大量資料,用bulk load的方式效率比較高。bulk load可以將固定格式的資料檔案轉換為HFile檔案匯入,當然也可以直接匯入HFile檔案。所以
Hbase批量匯入資料,支援多執行緒同時操作
/** * HBase操作工具類:快取模式多執行緒批量提交作業到hbase * * @Auther: ning.zhang * @Email: [email protected] * @CreateDate: 2018/7/30 */ public c
Spark Hive 匯入資料到 HBase
導讀: 如何使用MapReduce將textfile、rcfile格式的Hive表資料匯入到HBase,上一篇部落格中已經做了介紹 MapReduce Hive 匯入資料到 HBase,MR這種方式有兩個缺點,一是當資料量特別大的時候,執行較慢
使用Spark批量上傳圖片到HBase中並同時使用OpenCV提取SIFT特徵值
最近正在學習利用Spark做影象的分類和檢索實驗,首先需要上傳影象資料(儲存在本地檔案系統中)到HBase中,提取的影象特徵是SIFT,藉助OpenCV庫提取,剛開始是寫一個任務上傳圖片,然後再寫一個任務提取HBase中影象的特徵值,考慮到圖片的序列化和反序列化
Phoenix通過MapReduce對HBase批量匯入資料
在用該方法之前,請確保hadoop、hbase、phoenix已經啟動。 在phoenix的安裝目錄下使用以下程式碼,啟動MR HADOOP_CLASSPATH=/opt/hbase-0.98.
Hbase通過命令將資料批量匯入的方法
拋磚引玉: hbase建表: hbase(main):003:0> create 'people','0' 將提前準備好的資料上傳到hdfs: [hadoop@h71 ~]$ vi people.txt 1,jimmy,25,jiujinshan 2,tina,25
Spark訪問與HBase關聯的Hive表
刪除 sql 也會 影響 ron ble lec lang nbsp 知識點1:創建關聯Hbase的Hive表 知識點2:Spark訪問Hive 知識點3:Spark訪問與Hbase關聯的Hive表 知識點1:創建關聯Hbase的Hive表 兩種方式創建,內部表和外部表
大牛博客!Spark / Hadoop / Kafka / HBase / Storm
logs 梳理 ubuntu http log 9.png kafka 分布 努力 在這裏,非常感謝下面的著名大牛們,一路的幫助和學習,給予了我很大的動力! 有了Hadoop,再次有了Spark,一次又一次,一晚又一晚的努力相伴! HBa
批量匯入資料(Mysql)報MySQL server has gone away 問題的解決方法
問題分析 首先度娘:mysql出現ERROR : (2006, 'MySQL server has gone away') 的問題意思就是指client和MySQL server之間的連結斷開了。 造成這樣的原因一般是sql操作的時間過長,或者是傳送的資料太大(例如使用inser
C++批量匯入檔案
如果這樣做: for (int i = 1; i<= 100; i++) { img = imread(“D:\低解析度截圖重新命名\%d.jpg”, i); imgs.push_back(img); } 看似不報錯,但是如果imshow一下,就會發現根本沒有讀圖。 對於檔案流,和普通
jdbc百萬資料批量匯入,流式讀取
Connection connReader = null; Connection connWrite = null; String url = "jdbc:mysql://XX:3306/XX?" + "user=XX&password=XX&
CAS統一登入認證(13): ldap 批量匯入使用者
cas登入後臺驗證賬號來自ldap,需要批量匯入賬號到ldap,這個問題幾經研究,最後方法如下: 先編寫程式生成ldif格式檔案,然後再在ldapadmin.exe程式或 phpldapadmin網頁中匯入這個ldif檔案 1. 生成的ldif檔案格式: 【空行】 dn: u
使用kettle來根據時間戳或者批次號來批量匯入資料,達到增量的效果。
1、Kettle是一款國外開源的ETL工具,純java編寫,可以在Window、Linux、Unix上執行,資料抽取高效穩定。下載圖形化介面的zip包格式的,直接解壓縮使用即可。安裝部署模式這裡不說了,自己可以根據自己的需求安裝為單機模式或者叢集模式。 Kettle的社群官
C# 批量匯入 SqlBulkCopy
快速批量匯入方法 SqlBulkCopy 1 public string InsertSqlBulkCopy(DataTable dt, string tableName, ref string errinfo) 2 {
spark examples 匯入idea並測試
記錄下自己使用idea匯入spark examples專案的過程。 spark examples 專案可以給我們提供很多有益的參考,經常看看這些程式碼有助於提高我們寫scala程式碼的水平。 只匯入spark-example專案,其他不管,這個專案使用maven管理依賴,匯入的時候選擇maven. 我並
【轉載】QP02檢驗計劃批量匯入
其他參考: https://archive.sap.com/discussions/thread/3430191 *&---------------------------------------------------------------------* *& PROGRAM
使用solr批量匯入mysql資料庫,以及Unable to read: dataimport.properties等坑
折騰了一下午終於成功了!先放一張成功圖: 成功把mysql的資料新增進去了,我這裡是整合了tomcat9,整合步驟挺麻煩的,百度一大堆! 這裡主要介紹批量匯入資料,這裡有些坑,所以記錄一下: 步驟: 第一: 引入需要的jar包 我這裡把home目錄放外面了,所以核心配置檔案引入jar包需要改一下
Solr批量匯入資料
1、從solr 4.10.3 的dist 下 所需Jar包複製一份到 solr_home的索引庫(db1 是我自己新建的索引庫)下lib下 如果沒有就新建 2、在索引庫(db1)的核心配置檔案(solrconfig.xml)中 新增請求處理器。 <requestHand