1. 程式人生 > 其它 >ALINK(十):載入資料集 (三)Catalog讀入 (CatalogSourceBatchOp)

ALINK(十):載入資料集 (三)Catalog讀入 (CatalogSourceBatchOp)

Java 類名:com.alibaba.alink.operator.batch.source.CatalogSourceBatchOp

Python 類名:CatalogSourceBatchOp

功能介紹

Catalog描述了資料庫的屬性和資料庫的位置, 支援Mysql, Derby, Sqlite, Hive.

在使用時,需要先下載外掛,詳情請看https://www.yuque.com/pinshu/alink_guide/czg4cx

定義分成三步:

第一步,定義Catalog

資料庫

Java 介面

Derby

DerbyCatalog(String catalogName, String defaultDatabase, String derbyVersion, String derbyPath)

MySql

MySqlCatalog(String catalogName, String defaultDatabase, String mysqlVersion,String mysqlUrl, String port, String userName, String password)

Sqlite

SqliteCatalog(String catalogName, String defaultDatabase, String sqliteVersion, String dbUrl)

Hive

HiveCatalog(String catalogName, String defaultDatabase, String hiveVersion, String hiveConfDir)

HiveCatalog(String catalogName, String defaultDatabase, String hiveVersion, FilePath hiveConfDir)

HiveCatalog(String catalogName, String defaultDatabase, String hiveVersion, String hiveConfDir,String kerberosPrincipal, String kerberosKeytab)

示例:
    derby = DerbyCatalog("derby_test_catalog", DERBY_SCHEMA, "10.6.1.0", derbyFolder+'/'+DERBY_DB)
各外掛提供的版本:
    Hive:2.3.4
    MySQL: 5.1.27
    Derby: 10.6.1.0
    SQLite: 3.19.3
    odps: 0.36.4-public

第二步, 定義CatalogObject

dbName = "sqlite_db"
tableName = "table"
# 第一個引數是Catalog, 第二個引數是DB/Project
catalogObject = CatalogObject(derby, ObjectPath(dbName, tableName))

第三步,定義Source和Sink

引數說明

名稱

中文名稱

描述

型別

是否必須?

預設值

catalogObject

catalog object

catalog object

String

程式碼示例

以下程式碼僅用於示意,可能需要修改部分程式碼或者配置環境後才能正常執行!

Python 程式碼

Derby

python

derbyFolder = "*"
DERBY_SCHEMA = "derby_schema"
DERBY_DB = "derby_db"
derby = DerbyCatalog("derby_test_catalog", DERBY_SCHEMA, "10.6.1.0", derbyFolder+'/'+DERBY_DB)
catalogObject = CatalogObject(derby, ObjectPath("test_catalog_source_sink", "test_catalog_source_sink3"))
catalogSinkBatchOp = CatalogSinkBatchOp()\
    .setCatalogObject(catalogObject)
     
source.link(catalogSinkBatchOp)
BatchOperator.execute()
catalogSourceBatchOp = CatalogSourceBatchOp()\
    .setCatalogObject(catalogObject)
catalogSourceBatchOp.print()

Java 程式碼

String derbyFolder = "*";
String DERBY_SCHEMA = "derby_schema";
String DERBY_DB = "derby_db";
DerbyCatalog derby = new DerbyCatalog("derby_test_catalog", DERBY_SCHEMA, "10.6.1.0",
  derbyFolder + '/' + DERBY_DB);
CatalogObject catalogObject = new CatalogObject(derby,
  new ObjectPath("test_catalog_source_sink", "test_catalog_source_sink3"));
catalogSinkBatchOp catalogSinkStreamOp = new catalogSinkBatchOp()
  .setCatalogObject(catalogObject);
source.link(catalogSinkStreamOp);
StreamOperator.execute();
CatalogSourceBatchOp catalogSourceStreamOp = new CatalogSourceBatchOp()
  .setCatalogObject(catalogObject);
catalogSourceStreamOp.print();
StreamOperator.execute();

Sqlite

Python 程式碼

sqliteFolder = "*"
SQLITE_SCHEMA = "sqlite_schema"
SQLITE_DB = "sqlite_db"
sqlite = SqliteCatalog("sqlite_test_catalog", None, "3.19.3",  [sqliteFolder+'/'+SQLITE_DB])
catalogObject = CatalogObject(sqlite, ObjectPath(SQLITE_DB, "test_catalog_source_sink3"))
catalogSinkBatchOp = CatalogSinkBatchOp()\
    .setCatalogObject(catalogObject)
     
source.link(catalogSinkBatchOp)
BatchOperator.execute()
catalogSourceBatchOp = CatalogSourceBatchOp()\
    .setCatalogObject(catalogObject)
catalogSourceBatchOp.print()

Java程式碼

String sqliteFolder = "*";
String SQLITE_SCHEMA = "sqlite_schema";
String SQLITE_DB = "sqlite_db";
SqliteCatalog sqlite = new SqliteCatalog("sqlite_test_catalog", null, "3.19.3", sqliteFolder + '/' +
  SQLITE_DB);
CatalogObject catalogObject = new CatalogObject(sqlite, new ObjectPath(SQLITE_DB,
  "test_catalog_source_sink3"));
CatalogSinkBatchOp catalogSinkStreamOp = CatalogSinkBatchOp()
  .setCatalogObject(catalogObject);
source.link(catalogSinkBatchOp);
StreamOperator.execute();
CatalogSourceBatchOp catalogSourceStreamOp = new CatalogSourceBatchOp()
  .setCatalogObject(catalogObject);
catalogSourceStreamOp.print();
StreamOperator.execute();