1. 程式人生 > 其它 >資料遷移工具 Sqoop

資料遷移工具 Sqoop

介紹:

  Sqoop是一款開源的工具,主要用於在Hadoop(Hive)與傳統的資料庫(mysql、 postgresql等)間進行資料的傳遞。可以將關係型資料庫(MySQL ,Oracle ,Postgres等)中的資料匯入到HDFS中,也可以將HDFS的資料導進到關係型資料庫 中。

匯入全部資料:

sqoop import \
--connect jdbc:mysql://linux:3306/sqoop \
--username hive \
--password 12345678 \
--table goodtbl \
--target-dir /root \
--delete-target-dir
\ --num-mappers 1 \ --fields-terminated-by "\t"

備註:

  • target-dir:將資料匯入 HDFS 的路徑;
  • delete-target-dir:如果目標資料夾在 HDFS 上已經存在,那麼再次執行就會報 錯。可以使用--delete-target-dir來先刪除目錄。也可以使用 append 引數,表 示追加資料;
  • num-mappers:啟動多少個Map Task;預設啟動4個Map Task;也可以寫成 - m 1
  • fields-terminated-by:HDFS檔案中資料的分隔符;

匯入查詢資料:

sqoop import \
--connect jdbc:mysql://linux123:3306/sqoop \ --username hive \ --password 12345678 \ --target-dir /root\ --append \ -m 1 \ --fields-terminated-by "\t" \ --query 'select gname, serialNumber, price, stock_number, create_time from goodtbl where price>88 and $CONDITIONS;'

備註:

  • 查詢語句的where子句中必須包含 '$CONDITIONS'
  • 如果query後使用的是雙引號,則$CONDITIONS前必須加轉義符,防止shell識 別為自己的變數,單引號就不需要轉義

匯入指定的列: 

sqoop import \
--connect jdbc:mysql://linux123:3306/sqoop \
--username hive \
--password 12345678 \
--target-dir /root\
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by "\t" \
--columns gname,serialNumber,price \
--table goodtbl

備註:columns中如果涉及到多列,用逗號分隔,不能新增空格

匯入查詢資料(使用關鍵字):

sqoop import \
--connect jdbc:mysql://linux123:3306/sqoop \
--username hive \
--password 12345678 \
--target-dir /root\
--delete-target-dir \
-m 1 \
--fields-terminated-by "\t" \
--table goodtbl \
--where "price>=68"

啟動多個Map Task匯入資料:

sqoop import \
--connect jdbc:mysql://linux123:3306/sqoop \
--username hive \
--password 12345678 \
--target-dir /root \
--delete-target-dir \
--fields-terminated-by "\t" \
--table goodtbl \
--split-by gname

備註:

  • 沒有指定maptask數量,預設啟動4個
  • 如果 MySQL 中的表有主鍵,指定 Map Task 的個數就行
  • 如果 MySQL 中的表有主鍵,要使用 split-by 指定分割槽欄位
  • 如果分割槽欄位是字元型別,使用 sqoop 命令的時候要新增:- Dorg.apache.sqoop.splitter.allow_text_splitter=true。即:
sqoop import -
Dorg.apache.sqoop.splitter.allow_text_splitter=true \
--connect jdbc:mysql://liunx:3306/sqoop \
... ...
  • 查詢語句的where子句中的 '$CONDITIONS' ,也是為了做資料分割槽使用的,即 使只有1個Map Task

MySQL 到 Hive:

sqoop import \
--connect jdbc:mysql://linux123:3306/sqoop \
--username hive \
--password 12345678 \
--table goodtbl \
--hive-import \
--create-hive-table \
--fields-terminated-by "\t" \
--hive-overwrite \
--hive-table mydb.goodtbl \
-m 1

引數說明:

  • hive-import。必須引數,指定匯入hive
  • hive-database。Hive庫名(預設值default)
  • hive-table。Hive表名
  • fields-terminated-by。Hive欄位分隔符
  • hive-overwrite。覆蓋中已經存在的資料
  • create-hive-table。建立好 hive 表,但是表可能存在錯誤。不建議使用這個參 數,建議提前建好表

匯出資料:Hive/HDFS到RDBMS:

# 執行匯出
sqoop export \
--connect jdbc:mysql://linux123:3306/sqoop \
--username hive \
--password 12345678 \
--table goodtbl2 \
--num-mappers 1 \
--export-dir /user/hive/warehouse/mydb.db/goodtbl \
--input-fields-terminated-by "\t"

增量資料匯入:

  sqoop 實現了基於時間戳的CDC。

  • 時間戳:最好有兩個列,一個插入時間戳,表示何時建立,一個更新時間 戳,表示最後一次更新的時間;
  • 序列:大多數資料庫都提供自增功能,表中的列定義成自增的,很容易地根 據該列識別新插入的資料;

時間戳的CDC是最簡單且常用的,但是有如下缺點:

  • 不能記錄刪除記錄的操作
  • 無法識別多次更新
  • 不具有實時能力
sqoop import \
--connect jdbc:mysql://linux123:3306/sqoop \
--username hive --password 12345678 \
--table goodtbl \
--incremental append \
--hive-import \
--fields-terminated-by "\t" \
--hive-table mydb.goodtbl \
--check-column serialNumber \
--last-value 50 \
-m 1

引數說明:

  • check-column 用來指定一些列(即可以指定多個列),這些列在增量匯入時用 來檢查這些資料是否作為增量資料進行匯入,和關係型資料庫中的自增欄位及時 間戳類似。這些被指定的列的型別不能使任意字元型別,如char、varchar等類 型都不可以
  • last-value 指定上一次匯入中檢查列指定欄位最大值

執行 Sqoop job:

  執行資料增量匯入有兩種實現方式:

  • 1. 每次手工配置last-value,手工排程
  • 2. 使用job,給定初始last-value,定時任務每天定時排程

  很明顯方式2更簡便。

建立口令檔案:

echo -n "12345678" > sqoopPWD.pwd
hdfs dfs -mkdir -p /sqoop/pwd
hdfs dfs -put sqoopPWD.pwd /sqoop/pwd
hdfs dfs -chmod 400 /sqoop/pwd/sqoopPWD.pwd
# 可以在 sqoop 的 job 中增加:
--password-file /sqoop/pwd/sqoopPWD.pwd

建立 sqoop job:

# 建立 sqoop job
sqoop job --create myjob1 -- import \
--connect jdbc:mysql://linux123:3306/sqoop?useSSL=false \
--username hive \
--password-file /sqoop/pwd/sqoopPWD.pwd \
--table goodtbl \
--incremental append \
--hive-import \
--hive-table mydb.goodtbl \
--check-column serialNumber \
--last-value 0 \
-m 1
# 檢視已建立的job
sqoop job --list

# 檢視job詳細執行是引數
sqoop job --show myjob1

# 執行job
sqoop job --exec myjob1

# 刪除job
sqoop job --delete myjob1

實現原理:

  因為job執行完成後,會把當前check-column的最大值記錄到meta中,下次再調起 時把此值賦給last-value。 預設情況下元資料儲存在 ~/.sqoop/

其中,metastore.db.script 檔案記錄了對last-value的更新操作:

cat metastore.db.script |grep incremental.last.value