1. 程式人生 > 其它 >大資料Hadoop之——資料同步工具Sqoop

大資料Hadoop之——資料同步工具Sqoop

目錄

一、概述

Apache Sqoop(SQL-to-Hadoop)專案旨在協助RDBMS(Relational Database Management System:關係型資料庫管理系統)與Hadoop之間進行高效的大資料交流。使用者可以在 Sqoop 的幫助下,輕鬆地把關係型資料庫的資料匯入到 Hadoop 與其相關的系統 (如HBase和Hive)中;同時也可以把資料從 Hadoop 系統裡抽取並匯出到關係型資料庫裡。

Sqoop是一個在結構化資料和Hadoop之間進行批量資料遷移的工具,結構化資料可以是MySQL、Oracle等RDBMS。Sqoop底層用MapReduce程式實現抽取、轉換、載入,MapReduce天生的特性保證了並行化和高容錯率,而且相比Kettle等傳統ETL工具,任務跑在Hadoop叢集上,減少了ETL伺服器資源的使用情況。在特定場景下,抽取過程會有很大的效能提升。

官網:https://sqoop.apache.org/
官方文件:https://sqoop.apache.org/docs/1.99.7/index.html
GitHub:https://github.com/apache/sqoop

二、架構

sqoop的底層實現是mapreduce,所以sqoop依賴於hadoop,sqoop將匯入或匯出命令翻譯成MapReduce程式來實現,在翻譯出的MapReduce 中主要是對InputFormat和OutputFormat進行定製。

1)資料匯入(RDBMS->Haoop)

  • sqoop會通過jdbc來獲取需要的資料庫的元資料資訊,例如:匯入的表的列名,資料型別。
  • 這些資料庫的資料型別會被對映成為java的資料型別,根據這些資訊,sqoop會生成一個與表名相同的類用來完成序列化工作,儲存表中的每一行記錄。
  • sqoop開啟MapReduce作業
  • 啟動的作業在input的過程中,會通過jdbc讀取資料表中的內容,這時,會使用sqoop生成的類進行序列化。
  • 最後將這些記錄寫到hdfs上,在寫入hdfs的過程中,同樣會使用sqoop生成的類進行反序列化。

2)資料匯出(Haoop->RDBMS)

  • 首先sqoop通過jdbc訪問關係型資料庫獲取需要匯出的資訊的元資料資訊
  • 根據獲取的元資料資訊,sqoop生成一個Java類,用來承載資料的傳輸,該類必須實現序列化
  • 啟動MapReduce程式
  • sqoop利用生成的這個類,並行從hdfs中獲取資料
  • 每個map作業都會根據讀取到的匯出表的元資料資訊和讀取到的資料,生成一批insert 語句,然後多個map作業會並行的向MySQL中插入資料。

三、安裝

因為Sqoop依賴於hadoop服務,可以參考我之前的文章:大資料Hadoop原理介紹+安裝+實戰操作(HDFS+YARN+MapReduce)

Sqoop 作為一個二進位制包釋出,包含兩個獨立的部分——客戶端和服務端。

  • 服務端——您需要在叢集中的單個節點上安裝服務端。該節點將作為所有 Sqoop 客戶端的入口點。
  • 客戶端——客戶端可以安裝在任意數量的機器上。

將 Sqoop 安裝包複製到要執行 Sqoop 服務端的機器上。Sqoop 伺服器充當 Hadoop 客戶端,因此 Hadoop 庫(Yarn、Mapreduce 和 HDFS jar 檔案)和配置檔案(core-site.xml、mapreduce-site.xml,...)必須在此節點上可用。

1)下載

下載地址:http://archive.apache.org/dist/sqoop/

$ cd /opt/bigdata/hadoop/software/
$ wget http://archive.apache.org/dist/sqoop/1.99.7/sqoop-1.99.7-bin-hadoop200.tar.gz
$ tar -xf sqoop-1.99.7-bin-hadoop200.tar.gz -C /opt/bigdata/hadoop/server/

2)配置環境變數

# 建立第三方 jar包存放路徑
$ mkdir $SQOOP_HOME/lib
# 配置環境變數/etc/profile
export SQOOP_HOME=/opt/bigdata/hadoop/server/sqoop-1.99.7-bin-hadoop200
export PATH=$SQOOP_HOME/bin:$PATH
export SQOOP_SERVER_EXTRA_LIB=$SQOOP_HOME/lib
# 如果已經配置好了$HADOOP_HOME,就可以不用配置下面的環境變量了,sqoop會自動去找
# sqoop hadoop環境配置
export HADOOP_COMMON_HOME=$HADOOP_HOME/share/hadoop/common
export HADOOP_HDFS_HOME=$HADOOP_HOME/share/hadoop/hdfs
export HADOOP_MAPRED_HOME=$HADOOP_HOME/share/hadoop/mapreduce
export HADOOP_YARN_HOME=$HADOOP_HOME/share/hadoop/yarn

$ source /etc/profile

3)配置sqoop代理使用者

先配置hadoop sqoop的代理使用者

$ vi $HADOOP_HOME/etc/hadoop/core-site.xml

<property>
  <name>hadoop.proxyuser.sqoop2.hosts</name>
  <value>*</value>
</property>
<property>
  <name>hadoop.proxyuser.sqoop2.groups</name>
  <value>*</value>
</property>

# 重新載入配置
$ hdfs dfsadmin -refreshSuperUserGroupsConfiguration

如果您在所謂的系統使用者下執行 Sqoop 2 伺服器(使用者 ID 小於min.user.id - 預設情況下為 1000),則預設情況下 YARN 將拒絕執行 Sqoop 2 作業。您需要將執行 Sqoop 2 伺服器的使用者名稱(很可能是使用者sqoop2)新增到container-executor.cfg的allowed.system.users屬性中。

當伺服器在sqoop2使用者下執行時,需要存在於container-executor.cfg檔案中的示例片段:

# 建立sqoop2使用者
$ useradd sqoop2
# 新增配置
$ vi $HADOOP_HOME/etc/hadoop/container-executor.cfg
allowed.system.users=sqoop2

4)安裝JDBC

mysql驅動下載地址:https://repo1.maven.org/maven2/mysql/mysql-connector-java/,根據自己的mysql版本下載對應的驅動

$ cd $SQOOP_HOME/lib
$ wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.26/mysql-connector-java-8.0.26.jar
$ wget https://jdbc.postgresql.org/download/postgresql-42.3.4.jar

5)修改conf/sqoop.properties

org.apache.sqoop.submission.engine.mapreduce.configuration.directory=/opt/bigdata/hadoop/server/hadoop-3.3.1/etc/hadoop/

6)儲存庫初始化

首次啟動 Sqoop 2 伺服器之前,需要初始化元資料儲存庫。使用升級工具初始化儲存庫:

$ sqoop2-tool upgrade

【問題】derby的jar包版本過低報錯

Caused by: java.lang.SecurityException: sealing violation: package org.apache.derby.impl.jdbc.authentication is sealed

【解決】

# 刪掉sqoop2自帶的derby jar包
$ rm -f $SQOOP_HOME/server/lib/derby-*.jar
# 把hive的lib的jar copy到sqoop2 server lib目錄下
$ cp $HIVE_HOME/lib/derby-*.jar /$SQOOP_HOME/server/lib/

再初始化並驗證

$ sqoop2-tool upgrade
#驗證
$ sqoop2-tool verify

在當前目錄下會生產db目錄和log目錄

7)啟動sqoop服務端

$ sqoop2-server start
# 執行sqoop2-server stop會停止sqoop
# 檢視程序
$ jps
# 檢視埠,預設是12000,可以修改conf/sqoop.properties的org.apache.sqoop.jetty.port欄位來修改埠
$ netstat -tnlp|grep 12000

8)啟動sqoop客戶端

由於我現在是偽分散式,所以sqoop server和sqoop client都在一臺機器上,直接執行sqoop2-shell命令即可啟動sqoop客戶端

$ sqoop2-shell

發現啟動客戶端報錯了

【原因&解決】是因為jdk版本不匹配,重新下載jdk安裝
jdk下載地址:https://www.oracle.com/java/technologies/downloads/

$ cd /opt/bigdata/hadoop/software/
$ wget https://download.oracle.com/otn-pub/java/jdk/8u331-b09/165374ff4ea84ef0bbd821706e29b123/jdk-8u331-linux-x64.tar.gz
$ tar -xf jdk-8u331-linux-x64.tar.gz -C /opt/bigdata/hadoop/server/
# 在/etc/profile配置環境變數
export JAVA_HOME=/opt/bigdata/hadoop/server/jdk1.8.0_331
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

# 重新載入
$ source /etc/profile

重新啟動sqoop客戶端

$ sqoop2-shell
# 檢視版本
show version --all
# 檢視幫助
help

9)設定客戶端的各種屬性

Set 命令允許設定客戶端的各種屬性。與輔助命令類似,set 不需要連線到 Sqoop 伺服器。設定命令不用於重新配置 Sqoop 伺服器。

# 將sqoop包copy到其它機器,當作客戶端
$ scp -r /opt/bigdata/hadoop/server/sqoop-1.99.7-bin-hadoop200 hadoop-node2:/opt/bigdata/hadoop/server/
# 配置環境變數/etc/profile
export SQOOP_HOME=/opt/bigdata/hadoop/server/sqoop-1.99.7-bin-hadoop200
export PATH=$SQOOP_HOME/bin:$PATH

$ source /etc/profile

$ sqoop2-shell
# 設定埠,host,預設埠12000
set server --host hadoop-node1 --port 12000 --webapp sqoop
# 或者如下:
set server --url http://hadoop-node1:12000/sqoop
# 檢視設定
show server --all

【溫馨提示】注意:當給出--url選項時,--host、--port或--webapp選項將被忽略。

引數 預設值 描述
-h, --host localhost 執行 Sqoop 伺服器的伺服器名稱 (FQDN)
-p, --port 12000
-w, --webapp sqoop Jetty 的 Web 應用程式名稱
-u, --url url 格式的 Sqoop 伺服器

四、簡單使用

1)常用命令

$ sqoop2-shell
# 檢視幫助
help
# 配置服務
set server --url http://hadoop-node1:12000/sqoop
show server --all
# 顯示持久的作業提交物件
show submission
show submission --j jobName
show submission --job jobName --detail
# 顯示所有連結
show link
# 顯示聯結器
show connector

2)資料從MYSQL匯入到HDFS(Import)

1、 建立JDBC連線

$ sqoop2-shell
sqoop:000> set server --url http://hadoop-node1:12000/sqoop
# 先檢視connector
sqoop:000> show connector
# 建立mysql連線
sqoop:000> create link -connector generic-jdbc-connector
Creating link for connector with name generic-jdbc-connector
Please fill following values to create new link object
Name: mysql-jdbc-link

Database connection

Driver class: com.mysql.cj.jdbc.Driver
Connection String: jdbc:mysql://hadoop-node1:3306/azkaban?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true
Username: root
Password: ******
Fetch Size:
Connection Properties:
There are currently 0 values in the map:
entry#

SQL Dialect

Identifier enclose:
New link was successfully created with validation status OK and name mysql-jdbc-link
sqoop:000>

sqoop:000> show link
# 刪除
sqoop:000> delete link --name mysql-jdbc-link

2、建立HDFS連線

sqoop:000> create link -connector hdfs-connector
Creating link for connector with name hdfs-connector
Please fill following values to create new link object
Name: hdfs-link

HDFS cluster

URI: hdfs://hadoop-node1:8082
Conf directory: /opt/bigdata/hadoop/server/hadoop-3.3.1/etc/hadoop/
Additional configs::
There are currently 0 values in the map:
entry#
New link was successfully created with validation status OK and name hdfs-link

sqoop:000> show link

3、建立Job任務

首先先建立HDFS儲存目錄

$ hadoop fs -mkdir -p /user/sqoop2/output/
$ hadoop fs -chown -R sqoop2:sqoop2 /user/sqoop2/output/

再執行資料轉換

$ sqoop2-shell
sqoop:000> set server --url http://hadoop-node1:12000/sqoop
sqoop:000> show link
sqoop:000> create job -f "mysql-jdbc-link" -t "hdfs-link"
Name: mysql2hdfs
sqoop:000> show job


4、執行Job

sqoop:000> show job
sqoop:000> start job -n mysql2hdfs

在yarn平臺上檢視

檢視執行狀態是失敗的,檢視有錯誤日誌,主要日誌如下:
【問題一】

java.lang.NoClassDefFoundError: org/apache/commons/lang/StringUtils

【解決】
下載地址:https://commons.apache.org/proper/commons-lang/download_lang.cgi

$ cd $SQOOP_HOME/lib/
$ wget https://mirrors.tuna.tsinghua.edu.cn/apache//commons/lang/binaries/commons-lang-2.6-bin.tar.gz
$ tar -xf commons-lang-2.6-bin.tar.gz
# 將jar包放在mapreduce lib目錄,所有節點都得放,因為排程到哪臺機器是隨機的
$ cp commons-lang-2.6/commons-lang-2.6.jar $HADOOP_HOME/share/hadoop/mapreduce/
# 網上說放在sqoop lib目錄下,應該也是可以的,但是也是所有節點需要放
# $ cp commons-lang-2.6/commons-lang-2.6.jar .
$ rm -fr commons-lang-2.6-bin.tar.gz commons-lang-2.6

# 重啟sqoop server
$ cd $SQOOP_HOME
$ sqoop2-server stop;sqoop2-server start
# 設定執行使用者
$ export HADOOP_USER_NAME=sqoop2

【問題二】hdfs賬號不允許假扮root使用者

Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.authorize.AuthorizationException): User: hdfs is not allowed to impersonate root

【解決】在core-sit-xml配置hdfs代理使用者

<property>
  <name>hadoop.proxyuser.hdfs.hosts</name>
  <value>*</value>
</property>
<property>
  <name>hadoop.proxyuser.hdfs.groups</name>
  <value>*</value>
</property>

重新載入配置生效

$ hdfs dfsadmin -refreshSuperUserGroupsConfiguration

再執行job

start job -n mysql2hdfs
# 檢視狀態
status job -n mysql2hdfs
# 停止job
#stop job -n mysql2hdfs

去Yarn上檢視執行情況

去HDFS上檢視輸出

3)從HDFS匯出到MYSQL(Export)

本來是想通過hive去轉換,但是現在沒有了hive的聯結器了,所以這裡選擇通過hive去建立HDFS資料檔案,通過HDFS轉出到mysql,當然也可以一步到位。

1、建立叢集測試表和資料

# 準備資料檔案
$ vi /tmp/sqoop-test-data
test01,北京
test02,上海
test03,廣州
test04,深圳
$ hive

sql語句如下:

-- hive建立測試庫
create database hive_sqoop_test_db;
-- hive建立一張表,預設是textfile型別的,通過逗號分隔欄位
create table hive_sqoop_test_db.test_table01(name string,address string) row format delimited fields terminated by ',';
# 從local載入資料,這裡的local是指hs2服務所在機器的本地linux檔案系統
load data local inpath '/tmp/sqoop-test-data' into table hive_sqoop_test_db.test_table01;
select * from hive_sqoop_test_db.test_table01;

-- 當然也可以通過下面方式建立,但是不提倡,因為很慢很慢
-- hive建立一張表,預設是textfile型別的
create table if not exists hive_sqoop_test_db.test_table01
(
name    string,
address string
);
# -- 建立測試資料
insert into hive_sqoop_test_db.test_table01 values('test01','北京');
insert into hive_sqoop_test_db.test_table01 values('test02','上海');
insert into hive_sqoop_test_db.test_table01 values('test02','廣州');
insert into hive_sqoop_test_db.test_table01 values('test02','深圳');
# 查詢驗證
select * from hive_sqoop_test_db.test_table01;

對應HDFS的檔案:/user/hive/warehouse/hive_sqoop_test_db.db/test_table01/sqoop-test-data

2、建立MYSQL接收表

-- 建立測試庫
create database sqoop_test_db
-- 建立接收表
create table sqoop_test_db.test_table01
(
name varchar(10),
address varchar(10)
);

3、建立MYSQL連線

$ sqoop2-shell
sqoop:000> set server --url http://hadoop-node1:12000/sqoop
sqoop:000> show connector
sqoop:000> create link -connector generic-jdbc-connector
Name: hive2mysql-mysql-link
Driver class: com.mysql.cj.jdbc.Driver
Connection String: jdbc:mysql://hadoop-node1:3306/sqoop_test_db?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true
Username: root
Password: 123456

4、建立HDFS連線

sqoop:000> create link -connector hdfs-connector
Name: hdfs2mysql-hdfs-link
URI: hdfs://hadoop-node1:8082
Conf directory: /opt/bigdata/hadoop/server/hadoop-3.3.1/etc/hadoop/

5、建立Job

sqoop:000> show link
sqoop:000> create job -f "hdfs2mysql-hdfs-link" -t "hive2mysql-mysql-link"
Name: hdfs2mysql-job
Input directory: hdfs://hadoop-node1:8082/user/hive/warehouse/hive_sqoop_test_db.db/test_table01/
Choose: 0
Schema name: sqoop_test_db
Table name: test_table01
Extractors: 1
Loaders: 1

6、開始執行Job

sqoop:000> start job -n hdfs2mysql-job
sqoop:000> status job -n hdfs2mysql-job

YARN上檢視任務

去mysql上檢視資料

4)通過JAVA實現資料從MYSQL匯入到HDFS(Import)

1、先準備好資料來源

CREATE DATABASE sqoop_test_db;
-- 必須設定一個主鍵,要不然會報錯
create table if not exists sqoop_test_db.test_table01
(
id INT Unsigned Primary Key AUTO_INCREMENT,
name    VARCHAR(20),
address VARCHAR(20)
);
# -- 建立測試資料
insert into sqoop_test_db.test_table01 values(1,'test01','北京');
insert into sqoop_test_db.test_table01 values(2,'test02','上海');
insert into sqoop_test_db.test_table01 values(3,'test03','廣州');
insert into sqoop_test_db.test_table01 values(4,'test04','深圳');

2、新增專案依賴

<dependency>
  <groupId>org.apache.sqoop</groupId>
  <artifactId>sqoop-client</artifactId>
  <version>1.99.7</version>
</dependency>

3、編寫java程式碼

這裡實現的是HDFS=》MYSQL的資料轉換

import org.apache.commons.io.filefilter.FalseFileFilter;
import org.apache.sqoop.client.SqoopClient;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.model.*;
import org.apache.sqoop.validation.Status;

import javax.sound.midi.Soundbank;
import java.util.List;
import java.util.ResourceBundle;

/**
 * Import
 * mysql資料 匯出 HDFS
 *
 */

public class Mysql2HDFS {
    public static void main(String[] args) {
        String url = "http://hadoop-node1:12000/sqoop/";
        SqoopClient client = new SqoopClient(url);

        String mysql_link_name = "java-mysql-link";
        String hdfs_link_name = "java-hdfs-link";

        // 獲取所有link
        List<MLink> links = client.getLinks();
        boolean mysql_link_isexist = Boolean.FALSE;
        boolean hdfs_link_isexist = Boolean.FALSE;
        for (MLink link : links) {
            if (!mysql_link_isexist && link.getName().equals(mysql_link_name)){
                mysql_link_isexist = Boolean.TRUE;
            }
            if (!hdfs_link_isexist && link.getName().equals(hdfs_link_name)){
                hdfs_link_isexist = Boolean.TRUE;
            }
            if (mysql_link_isexist && hdfs_link_isexist){
                break;
            }
        }

        /**
         * 1、建立mysql link
         */
        MLink mysql_link = client.createLink("generic-jdbc-connector");
        mysql_link.setName("java-mysql-link");
        mysql_link.setCreationUser("root");
        // 如果不存在就建立link
        if (!mysql_link_isexist){
            MLinkConfig mysql_linkConfig = mysql_link.getConnectorLinkConfig();
            System.out.println(mysql_linkConfig);
            // fill in the link config values
            mysql_linkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.cj.jdbc.Driver");
            mysql_linkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://hadoop-node1:3306/sqoop_test_db?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true");
            mysql_linkConfig.getStringInput("linkConfig.username").setValue("root");
            mysql_linkConfig.getStringInput("linkConfig.password").setValue("123456");
            mysql_linkConfig.getStringInput("dialect.identifierEnclose").setValue(" ");

            // 設定 primary key
            // mysql_linkConfig.getStringInput("linkConfig.partitionColumn").setValue("id");
            // save the link object that was filled
            Status mysql_status = client.saveLink(mysql_link);

            // 檢視屬性
            // describe(client.getConnector("generic-jdbc-connector").getLinkConfig().getConfigs(), client.getConnectorConfigBundle("generic-jdbc-connector"));
            if(mysql_status.canProceed()) {
                System.out.println("Created Link with Link Name : " + mysql_link.getName());
            } else {
                System.out.println("Something went wrong creating the link");
            }
        }else {
            System.out.println("Link Name : " + mysql_link.getName() + " is exist");
        }

        /**
         * 2、建立hdfs link
         */
        MLink hdfs_link = client.createLink("hdfs-connector");
        hdfs_link.setName("java-hdfs-link");
        hdfs_link.setCreationUser("root");
        // 如果不存在就建立link
        if (!hdfs_link_isexist){
            // 建立hdfs link
            MLinkConfig hdfs_linkConfig = hdfs_link.getConnectorLinkConfig();
            hdfs_linkConfig.getStringInput("linkConfig.uri").setValue("hdfs://hadoop-node1:8082");
            hdfs_linkConfig.getStringInput("linkConfig.confDir").setValue("/opt/bigdata/hadoop/server/hadoop-3.3.1/etc/hadoop/");
            Status hdfs_status = client.saveLink(hdfs_link);
            if(hdfs_status.canProceed()) {
                System.out.println("Created Link with Link Name : " + hdfs_link.getName());
            } else {
                System.out.println("Something went wrong creating the link");
            }
        }else {
            System.out.println("Link Name : " + hdfs_link.getName() + " is exist");
        }

        /**
         * 3、建立job
         */
        String job_name = "java-mysql2hdfs";
        List<MJob> jobs = client.getJobs();

        boolean job_isexist = Boolean.FALSE;
        for (MJob job : jobs) {
            if (job.getName().equals(job_name)){
                job_isexist = Boolean.TRUE;
                break;
            }
        }
        MJob job = client.createJob(mysql_link_name, hdfs_link_name);
        job.setName("java-mysql2hdfs");

        job.setCreationUser("root");
        if (!job_isexist){
            // set the "FROM" link job config values
            MFromConfig fromJobConfig = job.getFromJobConfig();
            System.out.println(fromJobConfig);
            fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("sqoop_test_db");
            fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("test_table01");

            // set the "TO" link job config values
            MToConfig toJobConfig = job.getToJobConfig();
            // 匯出目錄是需要不存在的
            toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("hdfs://hadoop-node1:8082/tmp/output/");
            toJobConfig.getEnumInput("toJobConfig.outputFormat").setValue("TEXT_FILE");
            toJobConfig.getEnumInput("toJobConfig.compression").setValue("NONE");
            toJobConfig.getBooleanInput("toJobConfig.overrideNullValue").setValue(true);

            // set the driver config values
            MDriverConfig driverConfig = job.getDriverConfig();
            // System.out.println(driverConfig);
            driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(1);
            // driverConfig.getIntegerInput("throttlingConfig.numLoaders").setValue(0);


            Status status = client.saveJob(job);
            if(status.canProceed()) {
                System.out.println("Created Job with Job Name: "+ job.getName());
            } else {
                System.out.println("Something went wrong creating the job");
                System.exit(0);
            }
        } else {
            System.out.println("Job Name : " + job.getName() + " is exist");
        }

        /**
         * 4、啟動job
         */
        MSubmission submission = client.startJob(job.getName());
        System.out.println("Job Submission Status : " + submission.getStatus());
        if(submission.getStatus().isRunning() && submission.getProgress() != -1) {
            System.out.println("Progress : " + String.format("%.2f %%", submission.getProgress() * 100));
        }
        System.out.println("Hadoop job id :" + submission.getExternalJobId());
        System.out.println("Job link : " + submission.getExternalLink());

    }

    /**
     * 輸出屬性資訊
     * @param configs
     * @param resource
     */
    public static void  describe(List<MConfig> configs, ResourceBundle resource) {
        for (MConfig config : configs) {
            System.out.println(resource.getString(config.getLabelKey())+":");
            List<MInput<?>> inputs = config.getInputs();
            for (MInput input : inputs) {
                System.out.println(resource.getString(input.getLabelKey()) + " : " + input.getValue());
            }
            System.out.println();
        }
    }

}

檢視AYRN任務

檢視HDFS

這裡只用java程式碼實現了MYSQL-》HDFS的轉換,HDFS-》MYSQL的轉換就留給小夥伴試試,其實也很簡單,稍微把我上面的程式碼改一下就ok了,也可以對照上面第三個示例。有疑問的小夥伴歡迎給我留言,後續會有更多大資料相關的文章,請小夥伴耐心等待~