1. 程式人生 > 實用技巧 >Sqoop元件詳解

Sqoop元件詳解

1、概述

Sqoop是一款開源的工具,主要用於在Hadoop(Hive)與傳統的資料庫(mysql、postgresql、oracle...)間進行資料的傳遞。其原理是匯入和匯出動作翻譯成mapreduce程式來實現,在翻譯出的mapreduce中主要是對inputformat和outputformat進行定製,從而提供併發特性和容錯率。

2、sqoop工作流程圖

下圖分別是import和export的工作流程圖。可以看到,通過將sqoop命令轉換為一個個的Map任務,並行的對資料進行處理和轉移,從而達到資料傳輸的目的。

3、匯入資料

為方便演示,特建立三張表。一張hive未分割槽表cfg_siteinfo_tdlte1,一張hive分割槽表cfg_siteinfo_tdlte2,一張mysql表cfg_siteinfo_tdlte3(包含自增欄位和時間欄位)。

在Sqoop中,“匯入”概念指:從非大資料叢集(RDBMS)向大資料叢集(HDFS,HIVE,HBASE)中傳輸資料,即使用import關鍵字。

create table hub_test.cfg_siteinfo_tdlte1 (
  area string,
  enbid int,
  eci int) 
row format delimited fields terminated by ',';


create table hub_test.cfg_siteinfo_tdlte2 (
  area string,
  enbid int,
  eci int) 
partitioned by(city string)
row format delimited fields terminated by ',';


create table hub_test.cfg_siteinfo_tdlte_3 (
  area varchar(20),
  enbid int,
  eci int,
  PRIMARY KEY (eci),
  last_mod timestamp default current_timestamp on update current_timestamp)

3.1 Mysql匯入到HDFS路徑

sqoop import -D mapred.job.queue.name=toolbox \
-Dorg.apache.sqoop.splitter.allow_text_splitter=true \
--connect jdbc:mysql://134.58.98.26:3306/fast \
--username C##hub \
--password 123456 \
--query "select area,enbid,eci from cfg_siteinfo_tdlte where city='HEFEI' and \$CONDITIONS" \
--target-dir /apps/hive/warehouse/hub_test.db/cfg_siteinfo_tdlte1 \
--delete-target-dir \
--fields-terminated-by ','  \
--null-string '' \
--null-non-string '' \
--split-by eci \
--m 4

注意:query引數的引號問題,要麼外層使用單引號,內層使用雙引號,\$CONDITIONS的\$符號不用轉義。若是外層使用雙引號,那麼內層使用單引號,然後\$CONDITIONS的\$符號需要轉義: "\${query} AND \\$CONDITIONS"。

3.2 Mysql匯入到Hive表

hive普通表匯入:

sqoop import   \
-Dorg.apache.sqoop.splitter.allow_text_splitter=true \
--connect jdbc:mysql://134.58.98.26:3306/fast \
--username C##hub  \
--password 123456  \
--query "select area,enbid,eci from cfg_siteinfo_tdlte where city='HEFEI' and \$CONDITIONS" \
--target-dir /tmp/hive/warehouse/hub_test.db/cfg_siteinfo_tdlte1 \
--delete-target-dir \
--hive-import \
--hive-overwrite \
--hive-database hub_test \
--hive-table cfg_siteinfo_tdlte1 \
--fields-terminated-by ','  \
--lines-terminated-by '\n'  \
--null-string '' \
--null-non-string '' \
--split-by eci \
--m 4

提示:該過程分為兩步,第一步將資料匯入到HDFS臨時目錄,第二步將臨時目錄的資料載入到Hive表的資料倉庫位置。

hive分割槽表匯入:

sqoop import   \
-Dorg.apache.sqoop.splitter.allow_text_splitter=true \
--connect jdbc:mysql://134.58.98.26:3306/fast \
--username C##hub  \
--password 123456  \
--query "select area,enbid,eci from cfg_siteinfo_tdlte where city='HEFEI' and \$CONDITIONS" \
--target-dir /tmp/hive/warehouse/hub_test.db/cfg_siteinfo_tdlte2 \
--delete-target-dir \
--hive-import \
--hive-overwrite \
--hive-database hub_test \
--hive-table cfg_siteinfo_tdlte2 \
--hive-partition-key city \
--hive-partition-value HEFEI \
--fields-terminated-by ','  \
--lines-terminated-by '\n'  \
--null-string '' \
--null-non-string '' \
--split-by eci \
--m 4

提示:分割槽表和未分割槽表匯入的區別,添加了hive-partition-key和hive-partition-value引數。

3.3 增量匯入

增量匯入是僅匯入新新增的表中的行的技術,sqoop使用–incremental引數實現增量匯入,有兩種模式:append和lastmodified。

(1)append模式

sqoop import \
--connect jdbc:mysql://134.58.98.26:3306/fast \
--username username \
--password 123456 \
--table cfg_siteinfo_tdlte1 \
--num-mappers 2 \
--split-by eci \
--hive-import \
--hive-database hub_test \
--hive-table cfg_siteinfo_tdlte1 \
--check-column eci \
--incremental append \
--last-value 0 \

注意:APPEND 模式支援匯入到HDFS和Hive。

(2)Lastmodified模式

lastmodified需要指定一個更新的時間列,會將大於指定時間列的值增量匯入到目標資料來源,被更新的資料行也會被匯入,匯入方式又分為append和merge-key倆種模式,需要注意的是,此種增量匯入方式不支援直接匯入到Hive

merge-key模式:

sqoop import \
--connect jdbc:mysql://134.58.98.26:3306/fast \
--username username \
--password 123456 \
--table cfg_siteinfo_tdlte3 \
--num-mappers 1 \
--target-dir /app/hive/warehouse/hub_test.db/cfg_siteinfo_tdlte3 \
--fields-terminated-by ',' \
--check-column last_mod \
--incremental lastmodified \
--last-value "2020-02-20 00:00:00" \
--merge-key eci

append模式:

sqoop import \
--connect jdbc:mysql://134.58.98.26:3306/fast \
--username username \
--password 123456 \
--table cfg_siteinfo_tdlte3 \
--num-mappers 1 \
--target-dir /app/hive/warehouse/hub_test.db/cfg_siteinfo_tdlte3 \
--fields-terminated-by ',' \
--check-column last_mod \
--incremental lastmodified \
--last-value "2020-02-20 00:00:00" \
--append

4、匯出資料

4.1 HDFS檔案匯出到Mysql

在Sqoop中,“匯出”概念指:從大資料叢集(HDFS,HIVE,HBASE)向非大資料叢集(RDBMS)中傳輸資料,即使用export關鍵字。

sqoop export  \
-D mapred.job.queue.name=nokia \
--connect jdbc:mysql://134.58.98.26:3306/fast \
--username C##hub 
--password 123456 \
--table cfg_siteinfo_tdlte1 \
--columns AREA,ENBID,ECI \
--export-dir /apps/hive/warehouse/lte_hub_wk1.db/cfg_siteinfo_tdlte1/ \
--input-fields-terminated-by ","
--num-mappers 1 \
--input-null-string '\\N' 
--input-null-non-string '\\N'

提示:Mysql中如果表不存在,不會自動建立。

4.2 增量更新匯出

-- update-key,更新標識,即根據某個欄位進行更新,例如id,可以指定多個更新標識的欄位,多個欄位之間用逗號分隔。

-- updatemod,預設是updateonly,僅僅更新已存在的資料記錄,不會插入新紀錄。同時存在allowinsert模式,更新已存在的資料記錄,同時插入新紀錄。實質上是一個insert & update的操作。

sqoop export  \
-D mapred.job.queue.name=nokia \
--connect jdbc:mysql://134.58.98.26:3306/fast \
--username C##hub 
--password 123456 \
--table cfg_siteinfo_tdlte1 \
--columns AREA,ENBID,ECI \
--export-dir /apps/hive/warehouse/lte_hub_wk1.db/cfg_siteinfo_tdlte3/ \
--input-fields-terminated-by ","
--num-mappers 1 \
--input-null-string '\\N' 
--input-null-non-string '\\N' 
--update-key eci \
--update-mode allowinsert

5、sqopp job

當然,引數很多看著較為繁瑣,可以將其封裝在sqoop job中,同時也方便定時任務指令碼的呼叫。

(1)建立作業

sqoop job --create my_job \
-- import --connect jdbc:mysql://134.58.98.26:3306/fast \
--username C##hub \
--password-file /user/hive/pwd/mysql.pwd \
--query "select area,enbid,eci from cfg_siteinfo_tdlte where city='HEFEI' and \$CONDITIONS" \
--target-dir /apps/hive/warehouse/hub_test.db/cfg_siteinfo_tdlte1 \
--delete-target-dir \
--fields-terminated-by ','  \
--null-string '' \
--null-non-string '' \
--split-by eci \
--m 4

(2)顯示作業列表

sqoop job --list

(3)顯示作業內容

sqoop job --show myjob

(4)執行作業內容

sqoop job --exec myjob

免密設定步驟:

sqoop在建立job時,使用–password-file引數,可以避免輸入mysql密碼,如果使用–password將出現警告,並且每次都要手動輸入密碼才能執行job,sqoop規定密碼檔案必須存放在HDFS上,並且許可權必須是400。

(1)在mysql.pwd中追加MySQL密碼

echo -n "123456" > mysql.pwd

(2)建立儲存密碼檔案的目錄

hadoop fs -mkdir -p /user/hive/pwd/

(3)把密碼檔案放入該目錄中

hadoop fs -put mysql.pwd /user/hive/pwd/

(4)修改檔案許可權

hadoop fs -chmod 400 /user/hive/pwd/mysql.pwd

(5)檢查sqoop的sqoop-site.xml是否存在如下配置:

<property>
    <name>sqoop.metastore.client.record.password</name>
    <value>true</value>
    <description>If true, allow saved passwords in the metastore.
    </description>
</property>
In[]: