Sqoop之java API匯入匯出資料
阿新 • • 發佈:2018-12-21
折騰了一下午終於成功了,這裡做一下總結
專案依賴:
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-client</artifactId>
<version>1.99.7</version>
</dependency>
遇到的一些坑都在程式碼中進行註釋,下面就是利用java操作sqoop的程式碼:
import org.apache.sqoop.client.SqoopClient; import org.apache.sqoop.model.*; import org.apache.sqoop.submission.counter.Counter; import org.apache.sqoop.submission.counter.CounterGroup; import org.apache.sqoop.submission.counter.Counters; import org.apache.sqoop.validation.Status; import java.util.Arrays; import java.util.UUID; public class SqoopDataModel { //建立靜態客戶端物件 static SqoopClient client; //建立jdbc連線 public static MLink createMysqlLink() { //使用內建的聯結器 MLink link = client.createLink("generic-jdbc-connector"); // 隨機生成名字,也可以自己自定,用於建立job使用 link.setName("jdbc-link" + UUID.randomUUID().toString().substring(0, 4)); link.setCreationUser("wangwang"); //獲取連線配置物件 MLinkConfig linkConfig = link.getConnectorLinkConfig(); //指定連線jdbc路徑uri、驅動、使用者名稱和密碼 linkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://localhost:3306/db1"); linkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver"); linkConfig.getStringInput("linkConfig.username").setValue("root"); linkConfig.getStringInput("linkConfig.password").setValue("wuwenwang00oo"); // 這裡必須指定 identifierEnclose, 它預設是雙引號,mysql也會報錯 //表示解析sql語句的單詞界定符,這裡我配置成空格 linkConfig.getStringInput("dialect.identifierEnclose").setValue(" "); //儲存連線 Status status = client.saveLink(link); if (status.canProceed()) { System.out.println("Created Link with Link Name : " + link.getName()); return link; } else { System.out.println("Something went wrong creating the link"); return null; } } /** *建立hdfs連線 * */ public static MLink createHdfsLink() { //使用內建的聯結器 MLink link = client.createLink("hdfs-connector"); link.setName("hdfs-link" + UUID.randomUUID().toString().substring(0, 4)); link.setCreationUser("wangwang"); //獲取連線配置物件,並配置hdfs路徑及hadoop配置路徑 MLinkConfig linkConfig = link.getConnectorLinkConfig(); linkConfig.getStringInput("linkConfig.uri").setValue("hdfs://localhost:9000/"); linkConfig.getStringInput("linkConfig.confDir").setValue("/Users/wangwang/softdir/hadoop-2.8.5/etc/hadoop"); //儲存連線 Status status = client.saveLink(link); if (status.canProceed()) { System.out.println("Created Link with Link Name : " + link.getName()); return link; } else { System.out.println("Something went wrong creating the link"); return null; } } /** * job:mysql to hdfs * @param fromLink * @param toLink * @return */ public static String createMysqlToHdfsJob(MLink fromLink, MLink toLink) { //建立job,引數1表示資料來源link名稱,引數2表示目的地link名稱 MJob job = client.createJob(fromLink.getName(), toLink.getName()); job.setName("wangwang-job" + UUID.randomUUID()); job.setCreationUser("wangwang"); //獲取資料來源配置物件fromJobConfig,並配置資料庫名和表名,以及欄位名 MFromConfig fromJobConfig = job.getFromJobConfig(); fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("db1"); fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("t_user"); fromJobConfig.getListInput("fromJobConfig.columnList").setValue(Arrays.asList("id", "user_name", "passwd")); //獲取目的地配置物件,並配置輸出路徑、輸出格式、配置壓縮比、是否覆蓋空值 MToConfig toJobConfig = job.getToJobConfig(); //這裡為了每次不對輸出檔案刪除,我做了隨機拼接操作,保證每次的輸出路徑不同,因為sqoop的hdfs匯出路徑要求不能存在 toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/sqooptest" + UUID.randomUUID()); //如果不指定輸出格式,則會出現以下異常 //Caused by: org.apache.sqoop.common.SqoopException: MAPRED_EXEC_0013:Cannot write to the data writer toJobConfig.getEnumInput("toJobConfig.outputFormat").setValue("TEXT_FILE"); toJobConfig.getEnumInput("toJobConfig.compression").setValue("NONE"); toJobConfig.getBooleanInput("toJobConfig.overrideNullValue").setValue(true); //獲取驅動器並指定map數量 MDriverConfig driverConfig = job.getDriverConfig(); driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(1); //儲存job Status status = client.saveJob(job); if (status.canProceed()) { System.out.println("Created Job with Job Name: " + job.getName()); return job.getName(); } else { System.out.println("Something went wrong creating the job"); return null; } } /** * job:hdfs to mysql * @param fromLink * @param toLink * @return */ public static String createHdfsToMysqlJob(MLink fromLink, MLink toLink) { MJob job = client.createJob(fromLink.getName(), toLink.getName()); job.setName("wangwang" + UUID.randomUUID()); job.setCreationUser("wangwang"); MFromConfig fromJobConfig = job.getFromJobConfig(); fromJobConfig.getStringInput("fromJobConfig.inputDirectory").setValue("/sqoopDir"); MToConfig toJobConfig = job.getToJobConfig(); toJobConfig.getStringInput("toJobConfig.tableName").setValue("t_user"); //這裡不需要指定表的欄位,否則會出現語法錯誤 //GENERIC_JDBC_CONNECTOR_0002:Unable to execute the SQL // toJobConfig.getListInput("toJobConfig.columnList") // .setValue(Arrays.asList("id", "user_name", "passwd")); MDriverConfig driverConfig = job.getDriverConfig(); //這裡指定map數量,檢視mapreduce執行情況發現就沒有reduce任務 driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(1); //這裡我們不能指定reduce的數量,否則會出現異常:No data available in table //driverConfig.getIntegerInput("throttlingConfig.numLoaders").setValue(10); Status status = client.saveJob(job); if (status.canProceed()) { System.out.println("Created Job with Job Name: " + job.getName()); return job.getName(); } else { System.out.println("Something went wrong creating the job"); return null; } } //啟動job static void startJob(String jobName) { //Job start MSubmission submission = client.startJob(jobName); System.out.println("Job Submission Status : " + submission.getStatus()); if (submission.getStatus().isRunning() && submission.getProgress() != -1) { System.out.println("Progress : " + String.format("%.2f %%", submission.getProgress() * 100)); } System.out.println("Hadoop job id :" + submission.getExternalJobId()); System.out.println("Job link : " + submission.getExternalLink()); Counters counters = submission.getCounters(); if (counters != null) { System.out.println("Counters:"); for (CounterGroup group : counters) { System.out.print("\t"); System.out.println(group.getName()); for (Counter counter : group) { System.out.print("\t\t"); System.out.print(counter.getName()); System.out.print(": "); System.out.println(counter.getValue()); } } } } public static void main(String[] args) { String url = "http://localhost:12000/sqoop/"; client = new SqoopClient(url); System.out.println(client); MLink mysqlLink = createMysqlLink(); MLink hdfsLink = createHdfsLink(); // 將資料匯入 hdfs // startJob(createMysqlToHdfsJob(mysqlLink, hdfsLink)); // 將資料導回 mysql startJob(createHdfsToMysqlJob(hdfsLink, mysqlLink)); } }