1. 程式人生 > >資料匯入HBase常用方法

資料匯入HBase常用方法

【編者按】要使用Hadoop,資料合併至關重要,HBase應用甚廣。一般而言,需要 針對不同情景模式將現有的各種型別的資料庫或資料檔案中的資料轉入至HBase 中。常見方式為:使用HBase的API中的Put方法; 使用HBase 的bulk load 工具;使用定製的MapReduce Job方式。《HBase Administration Cookbook》一書對這三種方式有著詳盡描述,由 ImportNew 的陳晨進行了編譯,很有收穫,推薦給大家。

HBase資料遷移(1)-使用HBase的API中的Put方法 

使用HBase的API中的Put是最直接的方法,用法也很容易學習。但針對大部分情況,它並非都是最高效的方式。當需要將海量資料在規定時間內載入HBase中時,效率問題體現得尤為明顯。待處理的資料量一般都是巨大的,這也許是為何我們選擇了HBase而不是其他資料庫的原因。在專案開始之前,你就該思考如何將所有能夠很好的將資料轉移進HBase,否則之後可能面臨嚴重的效能問題。

HBase有一個名為 bulk load的功能支援將海量資料高效地裝載入HBase中。Bulk load是通過一個MapReduce Job來實現的,通過Job直接生成一個HBase的內部HFile格式檔案來形成一個特殊的HBase資料表,然後直接將資料檔案載入到執行的叢集中。使用bulk load功能最簡單的方式就是使用importtsv 工具。importtsv 是從TSV檔案直接載入內容至HBase的一個內建工具。它通過執行一個MapReduce Job,將資料從TSV檔案中直接寫入HBase的表或者寫入一個HBase的自有格式資料檔案。

儘管importtsv 工具在需要將文字資料匯入HBase的時候十分有用,但是有一些情況,比如匯入其他格式的資料,你會希望使用程式設計來生成資料,而MapReduce是處理海量資料最有效的方式。這可能也是HBase中載入海量資料唯一最可行的方法了。當然我們可以使用MapReduce向HBase匯入資料,但海量的資料集會使得MapReduce Job也變得很繁重。若處理不當,則可能使得MapReduce的job執行時的吞吐量很小。

在HBase中資料合併是一項頻繁執行寫操作任務,除非我們能夠生成HBase的內部資料檔案,並且直接載入。這樣儘管HBase的寫入速度一直很快,但是若合併過程沒有合適的配置,也有可能造成寫操作時常被阻塞。寫操作很重的任務可能引起的另一個問題就是將資料寫入了相同的族群伺服器(region server),這種情況常出現在將海量資料匯入到一個新建的HBase中。一旦資料集中在相同的伺服器,整個叢集就變得不平衡,並且寫速度會顯著的降低。我們將會在本文中致力於解決這些問題。我們將從一個簡單的任務開始,使用API中的Put方法將MySQL中的資料匯入HBase。接著我們會描述如何使用 importtsv 和 bulk load將TSV資料檔案匯入HBase。我們也會有一個MapReduce樣例展示如何使用其他資料檔案格式來匯入資料。上述方式都包括將資料直接寫入HBase中,以及在HDFS中直接寫入HFile型別檔案。本文中最後一節解釋在向HBase匯入資料之前如何構建好叢集。本文程式碼均是以Java編寫,我們假設您具有基本Java知識,所以我們將略過如何編譯與打包文中的Java示例程式碼,但我們會在示例原始碼中進行註釋。

通過單個客戶端匯入MySQL資料

資料合併最常見的應用場景就是從已經存在的關係型資料庫將資料匯入到HBase中。對於此型別任務,最簡單直接的方式就是從一個單獨的客戶端獲取資料,然後通過HBase的API中Put方法將資料存入HBase中。這種方式適合處理資料不是太多的情況。

本節描述的是使用Put方法將MySQL資料匯入HBase中的方式。所有的操作均是在一個單獨的客戶端執行,並且不會使用到MapReduce。本節將會帶領你通過HBase Shell建立HBase表格,通過Java來連線叢集,並將資料匯入HBase。

準備

公共資料集合是個練習HBase資料合併的很好資料來源。網際網路上有很多公共資料集合。我們在本文中獎使用 “美國國家海洋和大氣管理局 1981-2010氣候平均值”的公共資料集合。訪問http://www1.ncdc.noaa.gov/pub/data/normals/1981-2010/下載。

這些氣候報表資料是由美國國家海洋和大氣管理局(NOAA)生成的。在本文中,我們使用在目錄 products | hourly 下的小時溫度資料(可以在上述連結頁面中找到)。下載hly-temp-normal.txt檔案。
需要一個MySQL例項,在MySQL資料庫中建立hly_temp_normal表格,使用如下的SQL命令:

  1. createtable hly_temp_normal (  
  2. id INTNOTNULL AUTO_INCREMENT PRIMARYKEY,  
  3. stnid CHAR(11),  
  4. month TINYINT,  
  5. day TINYINT,  
  6. value1 VARCHAR(5),  
  7. value2 VARCHAR(5),  
  8. value3 VARCHAR(5),  
  9. value4 VARCHAR(5),  
  10. value5 VARCHAR(5),  
  11. value6 VARCHAR(5),  
  12. value7 VARCHAR(5),  
  13. value8 VARCHAR(5),  
  14. value9 VARCHAR(5),  
  15. value10 VARCHAR(5),  
  16. value11 VARCHAR(5),  
  17. value12 VARCHAR(5),  
  18. value13 VARCHAR(5),  
  19. value14 VARCHAR(5),  
  20. value15 VARCHAR(5),  
  21. value16 VARCHAR(5),  
  22. value17 VARCHAR(5),  
  23. value18 VARCHAR(5),  
  24. value19 VARCHAR(5),  
  25. value20 VARCHAR(5),  
  26. value21 VARCHAR(5),  
  27. value22 VARCHAR(5),  
  28. value23 VARCHAR(5),  
  29. value24 VARCHAR(5)  
  30. );  

本文提供了一些指令碼將txt中的資料匯入到MySQL表中。你可以使用 insert_hly.py 來載入每小時的NOAA資料。只需要修改指令碼中的主機(host),使用者(user),密碼(password)以及資料名稱(database name)。完成修改後就能夠將下載的hly-temp-normal.txt資料匯入到mysql的hly_temp_normal 表中,使用命令如下: 
$ python insert_hly.py -f hly-temp-normal.txt -t hly_temp_normal

譯者注:此處給出python指令碼下載地址(https://github.com/uprush/hac-book/blob/master/2-data-migration/script/insert_hly.py)

譯者注:由於對於python的瞭解有限以及環境限制,所以單獨另寫了一段Java的程式碼,可以直接使用的:

  1. import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader;  
  2.     import java.io.Reader; import java.sql.Connection; import java.sql.DriverManager;  
  3.     import java.sql.PreparedStatement; import java.sql.SQLException; import
  4.     java.util.ArrayList; import java.util.List; publicclass InsertHly { static
  5.     String user="root"static String pwd="root123"static String driver="com.mysql.jdbc.Driver";  
  6.     static String url="jdbc:mysql://127.0.0.1:3306/htom?useUnicode=true&characterEncoding=UTF-8";  
  7.     publicstaticvoid main(String[] args) throws SQLException { Connection  
  8.     baseCon = null; String sqlStr="insert into hly_temp_normal (stnid,month,day,value1,value2,value3,value4,value5,value6,value7,value8,value9,value10,value11,value12,value13,value14,value15,value16,value17,value18,value19,value20,value21,value22,value23,value24)  
  9.     values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; List parasValues=new
  10.     ArrayList(); try { baseCon = DriverManager.getConnection(url, user, pwd);  
  11.     } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace();
  12.     } // 替換為檔案地址 String allRowsStr=readFileByChars("d:\\TestZone\\hly-temp-normal.txt",
  13.     "gbk"); String[] rows=allRowsStr.split("\n"); for(String row : rows){ parasValues.add(row.split("\\s+"));  
  14.     } PreparedStatement basePsm = nulltry { baseCon.setAutoCommit(false);  
  15.     basePsm = baseCon.prepareStatement(sqlStr); for (int i = 0; i < parasValues.size();  
  16.     i++) { Object[] parasValue = parasValues.get(i); for (int j = 0; j <  
  17.     parasValue.length; j++) { basePsm.setObject(j + 1, parasValue[j]); } basePsm.addBatch();  
  18.     } basePsm.executeBatch(); baseCon.commit(); } catch (SQLException e) {  
  19.     baseCon.rollback(); throw e; } finally { if (basePsm != null) { basePsm.close();  
  20.     basePsm = null; } if (baseCon != null) { baseCon.close(); } } } public
  21.     static String readFileByChars(String fileName, String enc) { StringBuffer  
  22.     content=new StringBuffer(); Reader reader = nulltry { // 一次讀多個字元 char[]
  23.     tempchars = newchar[30]; int charread = 0; reader = new InputStreamReader(new
  24.     FileInputStream(fileName),enc); // 讀入多個字元到字元陣列中,charread為一次讀取字元數 while
  25.     ((charread = reader.read(tempchars)) != -1) { // 同樣遮蔽掉\r不顯示 if ((charread
  26.     == tempchars.length) && (tempchars[tempchars.length - 1] != '\r'))  
  27.     { content.append(tempchars); } else { for (int i = 0; i < charread;  
  28.     i++) { if (tempchars[i] == '\r') { continue; } else { content.append(tempchars[i]);  
  29.     } } } } return content.toString(); } catch (Exception e1) { e1.printStackTrace();  
  30.     } finally { if (reader != null) { try { reader.close(); } catch (IOException  
  31.     e1) { } } } returnnull; } } )  

為使得下一節中的Java原始碼能夠編譯,你需要下列庫支援: 
hadoop-core-1.0.2.jar 
hbase-0.92.1.jar 
mysql-connector-java-5.1.18.jar

你可以將他們手動加入classpath中,或者使用本文中的可用的示例程式碼。

在匯入資料之前,確認HDFS, ZooKeeper,和HBase叢集均正常執行。在HBase的客戶端節點記錄日誌。

如何實施

通過單節點客戶端將資料從MySQL匯入HBase: 
1.從HBase的客戶端伺服器從過HBase的Shell命令列,連線到HBase的叢集。 
hadoop$ $HBASE_HOME/bin/hbase shell 
2.在HBase中建立 hly_temp 表 
hbase> create ‘hly_temp’, {NAME => ‘n’, VERSIONS => 1} 
3.寫一個Java程式將資料從MySQL中匯入HBase,並將其打包成jar。在Java中按照下列步驟匯入資料: 
i. 使用Java建立一個connectHBase() 方法來連線到指定的HBase表: 
$ vi Recipe1.java

  1. privatestatic HTable connectHBase(String tablename) \  
  2. throws IOException {  
  3. HTable table = null;  
  4. Configuration conf = HBaseConfiguration.create();  
  5. table = new HTable(conf, tablename);  
  6. return table;  
  7. }  

ii. 使用Java建立一個 connectDB() 方法來 MySQL : 
$ vi Recipe1.java

  1. privatestatic Connection connectDB() \  
  2. throws Exception {  
  3. String userName = "db_user";  
  4. String password = "db_password";  
  5. String url = "jdbc:mysql://db_host/database";  
  6. Class.forName("com.mysql.jdbc.Driver").newInstance();  
  7. Connection conn = DriverManager.getConnection(url,  
  8. userName, password);  
  9. return conn;  
  10. }  

此處是Java類中的main() 方法,在其中我們從MySQL獲取資料並存入HBase中: 
$ vi Recipe1.java

  1. publicclass Recipe1 {  
  2.  publicstaticvoid main(String[] args) {  
  3.    Connection dbConn = null;  
  4.    HTable htable = null;  
  5.    Statement stmt = null;  
  6.    String query = "select * from hly_temp_normal";  
  7.    try {  
  8.      dbConn = connectDB();  
  9.      htable = connectHBase("hly_temp");  
  10.      byte[] family = Bytes.toBytes("n");  
  11.      stmt = dbConn.createStatement();  
  12.      ResultSet rs = stmt.executeQuery(query);  
  13.      // time stamp for all inserted rows
  14.      // 所有插入資料的時間戳
  15.      long ts = System.currentTimeMillis();  
  16.      while (rs.next()) {  
  17.        String stationid = rs.getString("stnid");  
  18.        int month = rs.getInt("month");  
  19.        int day = rs.getInt("day");  
  20.        String rowkey = stationid + Common.lpad(String.   
  21.        valueOf(month), 2,    
  22.        '0') + Common.lpad(String.valueOf(day), 2'0');  
  23.        Put p = new Put(Bytes.toBytes(rowkey));  
  24. 相關推薦

    資料匯入HBase常用方法

    【編者按】要使用Hadoop,資料合併至關重要,HBase應用甚廣。一般而言,需要 針對不同情景模式將現有的各種型別的資料庫或資料檔案中的資料轉入至HBase 中。常見方式為:使用HBase的API中的Put方法; 使用HBase 的bulk load 工具;使用定製的MapReduce Job方式。《H

    JS有哪些資料型別和常用方法

    這裡是修真院前端小課堂,每篇分享文從 【背景介紹】【知識剖析】【常見問題】【解決方案】【編碼實戰】【擴充套件思考】【更多討論】【參考文獻】 八個方面深度解析前端知識/技能,本篇分享的是: 【JS有哪些資料型別和常用方法?    】 一.背景介紹  

    資料儲存---HBase常用介紹(中)

    我們這裡主要介紹HBase的API 基礎API 封裝工具類 基礎API 建立表 新增資料 查詢資料的三種方式 掃描查詢 get方式執行查詢 過濾查詢 PS:刪除表請通過shell命令進入客戶端刪除。 package com.hbase; imp

    Mysql 資料匯入 Hbase

    目錄 一、前言 一、前言 在大資料專案中需要做資料遷移時,我們第一時間總會想到sqoop。sqoop是apache 旗下一款“Hadoop 和關係資料庫伺服器之間傳送資料”的工具,

    將sqlserver的資料匯入hbase

    將sqlserver的資料匯入hbase中 1.解壓sqoop-sqlserver-1.0.tar.gz,並改名(可以不改)          tar  -zxvf  sqoop- sql

    flume將資料匯入hbase

    1 將hbase的lib目錄下jar拷貝到flume的lib目錄下;2 在hbase中建立儲存資料的表hbase(main):002:0> create 'test_idoall_org','uid','name'3 建立flume配置檔案 vi.confa1.sour

    通過sqoop將MySQL資料庫中的資料匯入Hbase

    從接觸到大資料到成功的實現一個功能期間走了不少彎路也踩了不少坑,這裡作為我的學習筆記也可以作為小白們的前車之鑑,少走彎路,有不正確之處,望指出 環境準備: hadoop、hbase、sqoop、mys

    用sqoop將oracle資料匯入Hbase 使用筆記

    網上已經有很多關於這方面的資料,但是我在使用過程中也遇見了不少問題 1. sqoop 的環境我沒有自己搭建  直接用的公司的 2. oracle 小白怕把公司環境弄壞了,自己用容器搭建了一個 docker pull docker.io/wnameless/oracle-xe

    kafka資料匯入hbase

    我們在使用kafka處理資料的過程中會使用kafka跟一下資料庫進行互動,Hbase就是其中的一種。下面給大家介紹一下kafka中的資料是如何匯入Hbase的。 本文的思路是通過consumers把資料消費到Hbase中。 首先在Hbase中建立表,建立表可以在H

    MapReduce將HDFS文字資料匯入HBase

    HBase本身提供了很多種資料匯入的方式,通常有兩種常用方式: 使用HBase提供的TableOutputFormat,原理是通過一個Mapreduce作業將資料匯入HBase 另一種方式就是使用HBase原生Client API 本文就是示範如何通過M

    Kettle 將Oracle資料匯入HBase的注意事項

          使用Kettle採集Oracle資料,匯入到HBase。 Kettle是一個比較好用的ETL工具,個人感覺Kettle比Sqoop還要好用,主要是因為Kettle通過視覺化,元件式拖拉配置

    Python sklearn資料分析中常用方法

    一、資料處理 隨機劃分訓練集和測試集: from sklearn.model_selection import train_test_split X_all = data_train.drop(['Survived', 'PassengerId'],

    hive over hbase方式將文字庫資料匯入hbase

    1,建立hbase表Corpus >> create 'Corpus','CF' 2,建立hive->hbase外表logic_Corpus,並對應hbase中的Corpus表 >> CREATE EXTERNAL TABLE logic_Co

    空間資料探勘常用方法

    問題1:空間資料探勘有哪些常用方法,舉例說明一種方法的原理及應用. 答:空間資料探勘的常用方法有:統計法,聚類方法,關聯規則發掘方法,Rough集方法,神經網路方法,雲理論,證據理論,模糊集理論,遺傳演算法等演算法(出自丁信宙,仇環,蘇曉慶. 基於雲理論的缺損資料推理和

    HBase Shell 操作命令&&使用Sqoop將資料匯入HBase

    一、HBase Shell 操作命令實驗 要求: HBase叢集正常啟動,且可以執行正常 進入客戶端 [[email protected] ~]$ cd /home/zkpk/hbase-0

    c# 程式實現ACCESS資料匯入SQL的方法

    //連線ACCESS資料庫程式碼OleDbConnection adoConn= new OleDbConnection();string StrConn= "Provider=Microsoft.Jet.OLEDB.4.0;Jet OLEDB:Database Password=密碼;Data source

    文字資料匯入HBASE

    在將有定界符文字檔案匯入HBASE庫中,需要將後面的定界符去掉,否則將匯入失敗。如下所示:[[email protected] bin]$ cat /tmp/emp.txt1,A,201304,2,B,201305,3,C,201306,4,D,201307,這個

    資料遷移常用方法

    SQL SERVER幾種資料遷移/匯出匯入的實踐 SQLServer提供了多種資料匯出匯入的工具和方法,在此,分享我實踐的經驗(只涉及資料庫與Excel、資料庫與文字檔案、資料庫與資料庫之間的匯出匯入)。 (一)資料庫與Excel 方法1: 使用資料庫客戶端(SSMS)的介面工具。右

    linux下匯入、匯出mysql資料庫命令的實現方法

    首先建空資料庫 mysql>create database abc; 匯入資料庫 mysql>use abc; 設定資料庫編碼 mysql>set names utf8; 匯入資料(注意sql檔案的路徑) mysql>source /home/abc/abc.sql;

    python資料型別之列表(list)和其常用方法

    列表是python常用資料型別之一,是可變的,可由n = []建立,也可由n = list()建立,第一種方法更常用。   常用方法總結:   # 建立方法 n = [] 或者 n = list() # index 查詢索引值 li = ['Edward', 'Mark'