dremio create table 幾個特殊的能力
阿新 • • 發佈:2021-09-06
dremio 是支援create table的,官方文件比較簡單,只說明瞭可以直接建立,但是dremio 的create table 其實是更加強大的
如果使用了dremio的ui 的話,資料下載匯出就是利用了這個能力,但是預設是在master中執行的,而且資料是放在一個固定的space中
__datasetDownload
同時基於jobid 動態的建立了一個表,然後去讀位元組流,之後包裝的http servlet 進行的處理
幾種支援的格式
txt (csv),json, parquet 而且支援比較多的配置選項
參考測試用例
當讓對於支援create table的是需要開啟CTAS的,同時有時看看原始碼,多瞭解下官方的機制還是很有用的,同時官方原始碼中的一個測試用例也是很不錯的學習
資料
package com.dremio.exec.sql;
import org.junit.Test;
import com.dremio.PlanTestBase;
public class TestCTASWithOptions extends PlanTestBase {
@Test
public void csv() throws Exception {
try {
test("CREATE TABLE dfs_test.testCsv " +
"STORE AS (type => 'text', fieldDelimiter => ',') " +
"AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
testBuilder()
.sqlQuery("SELECT * FROM TABLE(\"dfs_test\".\"testCsv\"" +
"(type => 'text', fieldDelimiter => ',', extractHeader => true))")
.unOrdered()
.baselineColumns("region_id", "sales_city")
.baselineValues("0", "None")
.baselineValues("1", "San Francisco")
.go();
} finally {
test("DROP TABLE dfs_test.testCsv");
}
}
@Test
public void csvWithCustomExtension() throws Exception {
try {
test("CREATE TABLE dfs_test.csvWithCustomExtension " +
"STORE AS (type => 'text', fieldDelimiter => ',', outputExtension => 'myparquet') " +
"AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
testBuilder()
.sqlQuery("SELECT * FROM TABLE(\"dfs_test\".\"csvWithCustomExtension\"" +
"(type => 'text', fieldDelimiter => ',', extractHeader => true))")
.unOrdered()
.baselineColumns("region_id", "sales_city")
.baselineValues("0", "None")
.baselineValues("1", "San Francisco")
.go();
} finally {
// DROP TABLE doesn't support custom extensions
//test("DROP TABLE dfs_test.csvWithCustomExtension");
}
}
@Test
public void csvUnordered() throws Exception {
try {
// order the options differently
test("CREATE TABLE dfs_test.testCsvUnordered " +
"STORE AS (fieldDelimiter => ',', type => 'text') " +
"AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
testBuilder()
.sqlQuery("SELECT * FROM TABLE(\"dfs_test\".\"testCsvUnordered\"" +
"(type => 'text', fieldDelimiter => ',', extractHeader => true))")
.unOrdered()
.baselineColumns("region_id", "sales_city")
.baselineValues("0", "None")
.baselineValues("1", "San Francisco")
.go();
} finally {
test("DROP TABLE dfs_test.testCsvUnordered");
}
}
@Test
public void csvTabRecordDelimiter() throws Exception {
try {
test("CREATE TABLE dfs_test.testCsvTabRecordDelimiter " +
"STORE AS (type => 'text', fieldDelimiter => ',', lineDelimiter => '\t') " +
"AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
testBuilder()
.sqlQuery("SELECT * FROM " +
"TABLE(\"dfs_test\".\"testCsvTabRecordDelimiter\"" +
"(type => 'text', fieldDelimiter => ',', lineDelimiter => '\t', extractHeader => true))")
.unOrdered()
.baselineColumns("region_id", "sales_city")
.baselineValues("0", "None")
.baselineValues("1", "San Francisco")
.go();
} finally {
test("DROP TABLE dfs_test.testCsvTabRecordDelimiter");
}
}
@Test
public void tsv() throws Exception {
try {
test("CREATE TABLE dfs_test.testTsv STORE AS (type => 'teXt', fieldDelimiter => '\t') " +
"AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
testBuilder()
.sqlQuery("SELECT * FROM " +
"TABLE(\"dfs_test\".\"testTsv\"(type => 'text', fieldDelimiter => '\t', extractHeader => true))")
.unOrdered()
.baselineColumns("region_id", "sales_city")
.baselineValues("0", "None")
.baselineValues("1", "San Francisco")
.go();
} finally {
test("DROP TABLE dfs_test.testTsv");
}
}
@Test
public void json() throws Exception {
try {
test("CREATE TABLE dfs_test.testJson " +
"STORE AS (type => 'json') " +
"AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
testBuilder()
.sqlQuery("SELECT * FROM " +
"TABLE(\"dfs_test\".\"testJson\"(type => 'json'))")
.unOrdered()
.baselineColumns("region_id", "sales_city")
.baselineValues(0L, "None")
.baselineValues(1L, "San Francisco")
.go();
} finally {
test("DROP TABLE dfs_test.testJson");
}
}
@Test
public void jsonWithCustomExtension() throws Exception {
try {
test("CREATE TABLE dfs_test.jsonWithCustomExtension " +
"STORE AS (type => 'json', outputExtension => 'myjson') " +
"AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
testBuilder()
.sqlQuery("SELECT * FROM " +
"TABLE(\"dfs_test\".\"jsonWithCustomExtension\"(type => 'json'))")
.unOrdered()
.baselineColumns("region_id", "sales_city")
.baselineValues(0L, "None")
.baselineValues(1L, "San Francisco")
.go();
} finally {
// DROP TABLE doesn't support custom extensions
//test("DROP TABLE dfs_test.jsonWithCustomExtension");
}
}
@Test
public void parquet() throws Exception {
try {
test("CREATE TABLE dfs_test.testParquet " +
"STORE AS (type => 'parquet') " +
"AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
testBuilder()
.sqlQuery("SELECT * FROM " +
"TABLE(\"dfs_test\".\"testParquet\"(type => 'parquet'))")
.unOrdered()
.baselineColumns("region_id", "sales_city")
.baselineValues(0L, "None")
.baselineValues(1L, "San Francisco")
.go();
} finally {
test("DROP TABLE dfs_test.testParquet");
}
}
@Test
public void parquetWithCustomExtension() throws Exception {
try {
test("CREATE TABLE dfs_test.parquetWithCustomExtension " +
"STORE AS (type => 'parquet', outputExtension => 'myparquet') " +
"AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
testBuilder()
.sqlQuery("SELECT * FROM " +
"TABLE(\"dfs_test\".\"parquetWithCustomExtension\"(type => 'parquet'))")
.unOrdered()
.baselineColumns("region_id", "sales_city")
.baselineValues(0L, "None")
.baselineValues(1L, "San Francisco")
.go();
} finally {
// DROP TABLE doesn't support custom extensions
//test("DROP TABLE dfs_test.parquetWithCustomExtension");
}
}
@Test
public void parquetWithPartition() throws Exception {
try {
test("CREATE TABLE dfs_test.testParquetWithPartition " +
"PARTITION BY (region_id) " +
"STORE AS (type => 'parquet') " +
"AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
testBuilder()
.sqlQuery("SELECT dir0, region_id, sales_city FROM TABLE(\"dfs_test\".\"testParquetWithPartition\"(type => 'parquet'))")
.unOrdered()
.baselineColumns("dir0", "region_id", "sales_city")
.baselineValues("0_0", 0L, "None")
.baselineValues("1_1", 1L, "San Francisco")
.go();
} finally {
test("DROP TABLE dfs_test.testParquetWithPartition");
}
}
@Test
public void negativeCaseUnsupportedType() throws Exception {
final String query = "CREATE TABLE dfs_test.negativeCaseUnsupportedType " +
"STORE AS (type => 'unknownFormat') " +
"AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2";
errorMsgTestHelper(query, "unknown type unknownFormat, expected one of");
}
@Test
public void negativeCaseUnknownOption() throws Exception {
final String query = "CREATE TABLE dfs_test.negativeCaseUnknownOptions " +
"STORE AS (type => 'json', unknownOption => 'sd') " +
"AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2";
errorMsgTestHelper(query, "Unknown storage option(s): {unknownOption=sd}");
}
@Test
public void csvWithSingleWriter() throws Exception {
try {
final String query = "CREATE TABLE dfs_test.csvWithSingleWriter " +
"STORE AS (type => 'text', fieldDelimiter => ',') " +
"WITH SINGLE WRITER " +
"AS SELECT region_id, count(*) cnt FROM cp.\"region.json\" GROUP BY region_id ORDER BY region_id LIMIT 2";
test(query);
testBuilder()
.sqlQuery("SELECT * FROM TABLE(\"dfs_test\".\"csvWithSingleWriter\"" +
"(type => 'text', fieldDelimiter => ',', extractHeader => true))")
.unOrdered()
.baselineColumns("region_id", "cnt")
.baselineValues("0", "1")
.baselineValues("1", "1")
.go();
} finally {
test("DROP TABLE dfs_test.csvWithSingleWriter");
}
}
}
說明
實際上我們也可以參考此玩法,使用s3開啟CTAS,將需要匯出的資料放到s3中,之後基於s3進行統一的資料匯出處理,好處是可以規避
ui 100萬資料匯出的一個限制,同時進行不同場景資料的優化處理,當然create還支援資料分片以及排序的。。。
參考資料
https://docs.dremio.com/sql-reference/sql-commands/tables/?parent=sql-commands