大資料Hadoop之——資料同步工具Sqoop
一、概述
Apache Sqoop(SQL-to-Hadoop)專案旨在協助RDBMS(Relational Database Management System:關係型資料庫管理系統)與Hadoop之間進行高效的大資料交流。使用者可以在 Sqoop 的幫助下,輕鬆地把關係型資料庫的資料匯入到 Hadoop 與其相關的系統 (如HBase和Hive)中;同時也可以把資料從 Hadoop 系統裡抽取並匯出到關係型資料庫裡。
Sqoop是一個在結構化資料和Hadoop之間進行批量資料遷移的工具,結構化資料可以是MySQL、Oracle等RDBMS。Sqoop底層用MapReduce程式實現抽取、轉換、載入,MapReduce天生的特性保證了並行化和高容錯率,而且相比Kettle等傳統ETL工具,任務跑在Hadoop叢集上,減少了ETL伺服器資源的使用情況。在特定場景下,抽取過程會有很大的效能提升。
官網:https://sqoop.apache.org/
官方文件:https://sqoop.apache.org/docs/1.99.7/index.html
GitHub:https://github.com/apache/sqoop
二、架構
sqoop的底層實現是mapreduce,所以sqoop依賴於hadoop,sqoop將匯入或匯出命令翻譯成MapReduce程式來實現,在翻譯出的MapReduce 中主要是對InputFormat和OutputFormat進行定製。
1)資料匯入(RDBMS->Haoop)
- sqoop會通過jdbc來獲取需要的資料庫的元資料資訊,例如:匯入的表的列名,資料型別。
- 這些資料庫的資料型別會被對映成為java的資料型別,根據這些資訊,sqoop會生成一個與表名相同的類用來完成序列化工作,儲存表中的每一行記錄。
- sqoop開啟MapReduce作業
- 啟動的作業在input的過程中,會通過jdbc讀取資料表中的內容,這時,會使用sqoop生成的類進行序列化。
- 最後將這些記錄寫到hdfs上,在寫入hdfs的過程中,同樣會使用sqoop生成的類進行反序列化。
2)資料匯出(Haoop->RDBMS)
- 首先sqoop通過jdbc訪問關係型資料庫獲取需要匯出的資訊的元資料資訊
- 根據獲取的元資料資訊,sqoop生成一個Java類,用來承載資料的傳輸,該類必須實現序列化
- 啟動MapReduce程式
- sqoop利用生成的這個類,並行從hdfs中獲取資料
- 每個map作業都會根據讀取到的匯出表的元資料資訊和讀取到的資料,生成一批insert 語句,然後多個map作業會並行的向MySQL中插入資料。
三、安裝
因為Sqoop依賴於hadoop服務,可以參考我之前的文章:大資料Hadoop原理介紹+安裝+實戰操作(HDFS+YARN+MapReduce)
Sqoop 作為一個二進位制包釋出,包含兩個獨立的部分——客戶端和服務端。
- 服務端——您需要在叢集中的單個節點上安裝服務端。該節點將作為所有 Sqoop 客戶端的入口點。
- 客戶端——客戶端可以安裝在任意數量的機器上。
將 Sqoop 安裝包複製到要執行 Sqoop 服務端的機器上。Sqoop 伺服器充當 Hadoop 客戶端,因此 Hadoop 庫(Yarn、Mapreduce 和 HDFS jar 檔案)和配置檔案(core-site.xml、mapreduce-site.xml,...)必須在此節點上可用。
1)下載
下載地址:http://archive.apache.org/dist/sqoop/
$ cd /opt/bigdata/hadoop/software/
$ wget http://archive.apache.org/dist/sqoop/1.99.7/sqoop-1.99.7-bin-hadoop200.tar.gz
$ tar -xf sqoop-1.99.7-bin-hadoop200.tar.gz -C /opt/bigdata/hadoop/server/
2)配置環境變數
# 建立第三方 jar包存放路徑
$ mkdir $SQOOP_HOME/lib
# 配置環境變數/etc/profile
export SQOOP_HOME=/opt/bigdata/hadoop/server/sqoop-1.99.7-bin-hadoop200
export PATH=$SQOOP_HOME/bin:$PATH
export SQOOP_SERVER_EXTRA_LIB=$SQOOP_HOME/lib
# 如果已經配置好了$HADOOP_HOME,就可以不用配置下面的環境變量了,sqoop會自動去找
# sqoop hadoop環境配置
export HADOOP_COMMON_HOME=$HADOOP_HOME/share/hadoop/common
export HADOOP_HDFS_HOME=$HADOOP_HOME/share/hadoop/hdfs
export HADOOP_MAPRED_HOME=$HADOOP_HOME/share/hadoop/mapreduce
export HADOOP_YARN_HOME=$HADOOP_HOME/share/hadoop/yarn
$ source /etc/profile
3)配置sqoop代理使用者
先配置hadoop sqoop的代理使用者
$ vi $HADOOP_HOME/etc/hadoop/core-site.xml
<property>
<name>hadoop.proxyuser.sqoop2.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.sqoop2.groups</name>
<value>*</value>
</property>
# 重新載入配置
$ hdfs dfsadmin -refreshSuperUserGroupsConfiguration
如果您在所謂的系統使用者下執行 Sqoop 2 伺服器(使用者 ID 小於min.user.id - 預設情況下為 1000),則預設情況下 YARN 將拒絕執行 Sqoop 2 作業。您需要將執行 Sqoop 2 伺服器的使用者名稱(很可能是使用者sqoop2)新增到container-executor.cfg的allowed.system.users屬性中。
當伺服器在sqoop2使用者下執行時,需要存在於container-executor.cfg
檔案中的示例片段:
# 建立sqoop2使用者
$ useradd sqoop2
# 新增配置
$ vi $HADOOP_HOME/etc/hadoop/container-executor.cfg
allowed.system.users=sqoop2
4)安裝JDBC
mysql驅動下載地址:https://repo1.maven.org/maven2/mysql/mysql-connector-java/,根據自己的mysql版本下載對應的驅動
$ cd $SQOOP_HOME/lib
$ wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.26/mysql-connector-java-8.0.26.jar
$ wget https://jdbc.postgresql.org/download/postgresql-42.3.4.jar
5)修改conf/sqoop.properties
org.apache.sqoop.submission.engine.mapreduce.configuration.directory=/opt/bigdata/hadoop/server/hadoop-3.3.1/etc/hadoop/
6)儲存庫初始化
首次啟動 Sqoop 2 伺服器之前,需要初始化元資料儲存庫。使用升級工具初始化儲存庫:
$ sqoop2-tool upgrade
【問題】derby
的jar包版本過低報錯
Caused by: java.lang.SecurityException: sealing violation: package org.apache.derby.impl.jdbc.authentication is sealed
【解決】
# 刪掉sqoop2自帶的derby jar包
$ rm -f $SQOOP_HOME/server/lib/derby-*.jar
# 把hive的lib的jar copy到sqoop2 server lib目錄下
$ cp $HIVE_HOME/lib/derby-*.jar /$SQOOP_HOME/server/lib/
再初始化並驗證
$ sqoop2-tool upgrade
#驗證
$ sqoop2-tool verify
在當前目錄下會生產db目錄和log目錄
7)啟動sqoop服務端
$ sqoop2-server start
# 執行sqoop2-server stop會停止sqoop
# 檢視程序
$ jps
# 檢視埠,預設是12000,可以修改conf/sqoop.properties的org.apache.sqoop.jetty.port欄位來修改埠
$ netstat -tnlp|grep 12000
8)啟動sqoop客戶端
由於我現在是偽分散式,所以sqoop server和sqoop client都在一臺機器上,直接執行sqoop2-shell命令即可啟動sqoop客戶端
$ sqoop2-shell
發現啟動客戶端報錯了
【原因&解決】是因為jdk版本不匹配,重新下載jdk安裝
jdk下載地址:https://www.oracle.com/java/technologies/downloads/
$ cd /opt/bigdata/hadoop/software/
$ wget https://download.oracle.com/otn-pub/java/jdk/8u331-b09/165374ff4ea84ef0bbd821706e29b123/jdk-8u331-linux-x64.tar.gz
$ tar -xf jdk-8u331-linux-x64.tar.gz -C /opt/bigdata/hadoop/server/
# 在/etc/profile配置環境變數
export JAVA_HOME=/opt/bigdata/hadoop/server/jdk1.8.0_331
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
# 重新載入
$ source /etc/profile
重新啟動sqoop客戶端
$ sqoop2-shell
# 檢視版本
show version --all
# 檢視幫助
help
9)設定客戶端的各種屬性
Set 命令允許設定客戶端的各種屬性。與輔助命令類似,set 不需要連線到 Sqoop 伺服器。設定命令不用於重新配置 Sqoop 伺服器。
# 將sqoop包copy到其它機器,當作客戶端
$ scp -r /opt/bigdata/hadoop/server/sqoop-1.99.7-bin-hadoop200 hadoop-node2:/opt/bigdata/hadoop/server/
# 配置環境變數/etc/profile
export SQOOP_HOME=/opt/bigdata/hadoop/server/sqoop-1.99.7-bin-hadoop200
export PATH=$SQOOP_HOME/bin:$PATH
$ source /etc/profile
$ sqoop2-shell
# 設定埠,host,預設埠12000
set server --host hadoop-node1 --port 12000 --webapp sqoop
# 或者如下:
set server --url http://hadoop-node1:12000/sqoop
# 檢視設定
show server --all
【溫馨提示】注意:當給出--url選項時,--host、--port或--webapp選項將被忽略。
引數 | 預設值 | 描述 |
---|---|---|
-h, --host | localhost | 執行 Sqoop 伺服器的伺服器名稱 (FQDN) |
-p, --port | 12000 | 埠 |
-w, --webapp | sqoop | Jetty 的 Web 應用程式名稱 |
-u, --url | url 格式的 Sqoop 伺服器 |
四、簡單使用
1)常用命令
$ sqoop2-shell
# 檢視幫助
help
# 配置服務
set server --url http://hadoop-node1:12000/sqoop
show server --all
# 顯示持久的作業提交物件
show submission
show submission --j jobName
show submission --job jobName --detail
# 顯示所有連結
show link
# 顯示聯結器
show connector
2)資料從MYSQL匯入到HDFS(Import)
1、 建立JDBC連線
$ sqoop2-shell
sqoop:000> set server --url http://hadoop-node1:12000/sqoop
# 先檢視connector
sqoop:000> show connector
# 建立mysql連線
sqoop:000> create link -connector generic-jdbc-connector
Creating link for connector with name generic-jdbc-connector
Please fill following values to create new link object
Name: mysql-jdbc-link
Database connection
Driver class: com.mysql.cj.jdbc.Driver
Connection String: jdbc:mysql://hadoop-node1:3306/azkaban?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true
Username: root
Password: ******
Fetch Size:
Connection Properties:
There are currently 0 values in the map:
entry#
SQL Dialect
Identifier enclose:
New link was successfully created with validation status OK and name mysql-jdbc-link
sqoop:000>
sqoop:000> show link
# 刪除
sqoop:000> delete link --name mysql-jdbc-link
2、建立HDFS連線
sqoop:000> create link -connector hdfs-connector
Creating link for connector with name hdfs-connector
Please fill following values to create new link object
Name: hdfs-link
HDFS cluster
URI: hdfs://hadoop-node1:8082
Conf directory: /opt/bigdata/hadoop/server/hadoop-3.3.1/etc/hadoop/
Additional configs::
There are currently 0 values in the map:
entry#
New link was successfully created with validation status OK and name hdfs-link
sqoop:000> show link
3、建立Job任務
首先先建立HDFS儲存目錄
$ hadoop fs -mkdir -p /user/sqoop2/output/
$ hadoop fs -chown -R sqoop2:sqoop2 /user/sqoop2/output/
再執行資料轉換
$ sqoop2-shell
sqoop:000> set server --url http://hadoop-node1:12000/sqoop
sqoop:000> show link
sqoop:000> create job -f "mysql-jdbc-link" -t "hdfs-link"
Name: mysql2hdfs
sqoop:000> show job
4、執行Job
sqoop:000> show job
sqoop:000> start job -n mysql2hdfs
在yarn平臺上檢視
檢視執行狀態是失敗的,檢視有錯誤日誌,主要日誌如下:
【問題一】
java.lang.NoClassDefFoundError: org/apache/commons/lang/StringUtils
【解決】
下載地址:https://commons.apache.org/proper/commons-lang/download_lang.cgi
$ cd $SQOOP_HOME/lib/
$ wget https://mirrors.tuna.tsinghua.edu.cn/apache//commons/lang/binaries/commons-lang-2.6-bin.tar.gz
$ tar -xf commons-lang-2.6-bin.tar.gz
# 將jar包放在mapreduce lib目錄,所有節點都得放,因為排程到哪臺機器是隨機的
$ cp commons-lang-2.6/commons-lang-2.6.jar $HADOOP_HOME/share/hadoop/mapreduce/
# 網上說放在sqoop lib目錄下,應該也是可以的,但是也是所有節點需要放
# $ cp commons-lang-2.6/commons-lang-2.6.jar .
$ rm -fr commons-lang-2.6-bin.tar.gz commons-lang-2.6
# 重啟sqoop server
$ cd $SQOOP_HOME
$ sqoop2-server stop;sqoop2-server start
# 設定執行使用者
$ export HADOOP_USER_NAME=sqoop2
【問題二】hdfs賬號不允許假扮root使用者
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.authorize.AuthorizationException): User: hdfs is not allowed to impersonate root
【解決】在core-sit-xml配置hdfs代理使用者
<property>
<name>hadoop.proxyuser.hdfs.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hdfs.groups</name>
<value>*</value>
</property>
重新載入配置生效
$ hdfs dfsadmin -refreshSuperUserGroupsConfiguration
再執行job
start job -n mysql2hdfs
# 檢視狀態
status job -n mysql2hdfs
# 停止job
#stop job -n mysql2hdfs
去Yarn上檢視執行情況
去HDFS上檢視輸出
3)從HDFS匯出到MYSQL(Export)
本來是想通過hive去轉換,但是現在沒有了hive的聯結器了,所以這裡選擇通過hive去建立HDFS資料檔案,通過HDFS轉出到mysql,當然也可以一步到位。
1、建立叢集測試表和資料
# 準備資料檔案
$ vi /tmp/sqoop-test-data
test01,北京
test02,上海
test03,廣州
test04,深圳
$ hive
sql語句如下:
-- hive建立測試庫
create database hive_sqoop_test_db;
-- hive建立一張表,預設是textfile型別的,通過逗號分隔欄位
create table hive_sqoop_test_db.test_table01(name string,address string) row format delimited fields terminated by ',';
# 從local載入資料,這裡的local是指hs2服務所在機器的本地linux檔案系統
load data local inpath '/tmp/sqoop-test-data' into table hive_sqoop_test_db.test_table01;
select * from hive_sqoop_test_db.test_table01;
-- 當然也可以通過下面方式建立,但是不提倡,因為很慢很慢
-- hive建立一張表,預設是textfile型別的
create table if not exists hive_sqoop_test_db.test_table01
(
name string,
address string
);
# -- 建立測試資料
insert into hive_sqoop_test_db.test_table01 values('test01','北京');
insert into hive_sqoop_test_db.test_table01 values('test02','上海');
insert into hive_sqoop_test_db.test_table01 values('test02','廣州');
insert into hive_sqoop_test_db.test_table01 values('test02','深圳');
# 查詢驗證
select * from hive_sqoop_test_db.test_table01;
對應HDFS的檔案:/user/hive/warehouse/hive_sqoop_test_db.db/test_table01/sqoop-test-data
2、建立MYSQL接收表
-- 建立測試庫
create database sqoop_test_db
-- 建立接收表
create table sqoop_test_db.test_table01
(
name varchar(10),
address varchar(10)
);
3、建立MYSQL連線
$ sqoop2-shell
sqoop:000> set server --url http://hadoop-node1:12000/sqoop
sqoop:000> show connector
sqoop:000> create link -connector generic-jdbc-connector
Name: hive2mysql-mysql-link
Driver class: com.mysql.cj.jdbc.Driver
Connection String: jdbc:mysql://hadoop-node1:3306/sqoop_test_db?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true
Username: root
Password: 123456
4、建立HDFS連線
sqoop:000> create link -connector hdfs-connector
Name: hdfs2mysql-hdfs-link
URI: hdfs://hadoop-node1:8082
Conf directory: /opt/bigdata/hadoop/server/hadoop-3.3.1/etc/hadoop/
5、建立Job
sqoop:000> show link
sqoop:000> create job -f "hdfs2mysql-hdfs-link" -t "hive2mysql-mysql-link"
Name: hdfs2mysql-job
Input directory: hdfs://hadoop-node1:8082/user/hive/warehouse/hive_sqoop_test_db.db/test_table01/
Choose: 0
Schema name: sqoop_test_db
Table name: test_table01
Extractors: 1
Loaders: 1
6、開始執行Job
sqoop:000> start job -n hdfs2mysql-job
sqoop:000> status job -n hdfs2mysql-job
YARN上檢視任務
去mysql上檢視資料
4)通過JAVA實現資料從MYSQL匯入到HDFS(Import)
1、先準備好資料來源
CREATE DATABASE sqoop_test_db;
-- 必須設定一個主鍵,要不然會報錯
create table if not exists sqoop_test_db.test_table01
(
id INT Unsigned Primary Key AUTO_INCREMENT,
name VARCHAR(20),
address VARCHAR(20)
);
# -- 建立測試資料
insert into sqoop_test_db.test_table01 values(1,'test01','北京');
insert into sqoop_test_db.test_table01 values(2,'test02','上海');
insert into sqoop_test_db.test_table01 values(3,'test03','廣州');
insert into sqoop_test_db.test_table01 values(4,'test04','深圳');
2、新增專案依賴
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-client</artifactId>
<version>1.99.7</version>
</dependency>
3、編寫java程式碼
這裡實現的是HDFS=》MYSQL的資料轉換
import org.apache.commons.io.filefilter.FalseFileFilter;
import org.apache.sqoop.client.SqoopClient;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.model.*;
import org.apache.sqoop.validation.Status;
import javax.sound.midi.Soundbank;
import java.util.List;
import java.util.ResourceBundle;
/**
* Import
* mysql資料 匯出 HDFS
*
*/
public class Mysql2HDFS {
public static void main(String[] args) {
String url = "http://hadoop-node1:12000/sqoop/";
SqoopClient client = new SqoopClient(url);
String mysql_link_name = "java-mysql-link";
String hdfs_link_name = "java-hdfs-link";
// 獲取所有link
List<MLink> links = client.getLinks();
boolean mysql_link_isexist = Boolean.FALSE;
boolean hdfs_link_isexist = Boolean.FALSE;
for (MLink link : links) {
if (!mysql_link_isexist && link.getName().equals(mysql_link_name)){
mysql_link_isexist = Boolean.TRUE;
}
if (!hdfs_link_isexist && link.getName().equals(hdfs_link_name)){
hdfs_link_isexist = Boolean.TRUE;
}
if (mysql_link_isexist && hdfs_link_isexist){
break;
}
}
/**
* 1、建立mysql link
*/
MLink mysql_link = client.createLink("generic-jdbc-connector");
mysql_link.setName("java-mysql-link");
mysql_link.setCreationUser("root");
// 如果不存在就建立link
if (!mysql_link_isexist){
MLinkConfig mysql_linkConfig = mysql_link.getConnectorLinkConfig();
System.out.println(mysql_linkConfig);
// fill in the link config values
mysql_linkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.cj.jdbc.Driver");
mysql_linkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://hadoop-node1:3306/sqoop_test_db?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true");
mysql_linkConfig.getStringInput("linkConfig.username").setValue("root");
mysql_linkConfig.getStringInput("linkConfig.password").setValue("123456");
mysql_linkConfig.getStringInput("dialect.identifierEnclose").setValue(" ");
// 設定 primary key
// mysql_linkConfig.getStringInput("linkConfig.partitionColumn").setValue("id");
// save the link object that was filled
Status mysql_status = client.saveLink(mysql_link);
// 檢視屬性
// describe(client.getConnector("generic-jdbc-connector").getLinkConfig().getConfigs(), client.getConnectorConfigBundle("generic-jdbc-connector"));
if(mysql_status.canProceed()) {
System.out.println("Created Link with Link Name : " + mysql_link.getName());
} else {
System.out.println("Something went wrong creating the link");
}
}else {
System.out.println("Link Name : " + mysql_link.getName() + " is exist");
}
/**
* 2、建立hdfs link
*/
MLink hdfs_link = client.createLink("hdfs-connector");
hdfs_link.setName("java-hdfs-link");
hdfs_link.setCreationUser("root");
// 如果不存在就建立link
if (!hdfs_link_isexist){
// 建立hdfs link
MLinkConfig hdfs_linkConfig = hdfs_link.getConnectorLinkConfig();
hdfs_linkConfig.getStringInput("linkConfig.uri").setValue("hdfs://hadoop-node1:8082");
hdfs_linkConfig.getStringInput("linkConfig.confDir").setValue("/opt/bigdata/hadoop/server/hadoop-3.3.1/etc/hadoop/");
Status hdfs_status = client.saveLink(hdfs_link);
if(hdfs_status.canProceed()) {
System.out.println("Created Link with Link Name : " + hdfs_link.getName());
} else {
System.out.println("Something went wrong creating the link");
}
}else {
System.out.println("Link Name : " + hdfs_link.getName() + " is exist");
}
/**
* 3、建立job
*/
String job_name = "java-mysql2hdfs";
List<MJob> jobs = client.getJobs();
boolean job_isexist = Boolean.FALSE;
for (MJob job : jobs) {
if (job.getName().equals(job_name)){
job_isexist = Boolean.TRUE;
break;
}
}
MJob job = client.createJob(mysql_link_name, hdfs_link_name);
job.setName("java-mysql2hdfs");
job.setCreationUser("root");
if (!job_isexist){
// set the "FROM" link job config values
MFromConfig fromJobConfig = job.getFromJobConfig();
System.out.println(fromJobConfig);
fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("sqoop_test_db");
fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("test_table01");
// set the "TO" link job config values
MToConfig toJobConfig = job.getToJobConfig();
// 匯出目錄是需要不存在的
toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("hdfs://hadoop-node1:8082/tmp/output/");
toJobConfig.getEnumInput("toJobConfig.outputFormat").setValue("TEXT_FILE");
toJobConfig.getEnumInput("toJobConfig.compression").setValue("NONE");
toJobConfig.getBooleanInput("toJobConfig.overrideNullValue").setValue(true);
// set the driver config values
MDriverConfig driverConfig = job.getDriverConfig();
// System.out.println(driverConfig);
driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(1);
// driverConfig.getIntegerInput("throttlingConfig.numLoaders").setValue(0);
Status status = client.saveJob(job);
if(status.canProceed()) {
System.out.println("Created Job with Job Name: "+ job.getName());
} else {
System.out.println("Something went wrong creating the job");
System.exit(0);
}
} else {
System.out.println("Job Name : " + job.getName() + " is exist");
}
/**
* 4、啟動job
*/
MSubmission submission = client.startJob(job.getName());
System.out.println("Job Submission Status : " + submission.getStatus());
if(submission.getStatus().isRunning() && submission.getProgress() != -1) {
System.out.println("Progress : " + String.format("%.2f %%", submission.getProgress() * 100));
}
System.out.println("Hadoop job id :" + submission.getExternalJobId());
System.out.println("Job link : " + submission.getExternalLink());
}
/**
* 輸出屬性資訊
* @param configs
* @param resource
*/
public static void describe(List<MConfig> configs, ResourceBundle resource) {
for (MConfig config : configs) {
System.out.println(resource.getString(config.getLabelKey())+":");
List<MInput<?>> inputs = config.getInputs();
for (MInput input : inputs) {
System.out.println(resource.getString(input.getLabelKey()) + " : " + input.getValue());
}
System.out.println();
}
}
}
檢視AYRN任務
檢視HDFS
這裡只用java程式碼實現了MYSQL-》HDFS的轉換,HDFS-》MYSQL的轉換就留給小夥伴試試,其實也很簡單,稍微把我上面的程式碼改一下就ok了,也可以對照上面第三個示例。有疑問的小夥伴歡迎給我留言,後續會有更多大資料相關的文章,請小夥伴耐心等待~