1. 程式人生 > 其它 >dremio create table 幾個特殊的能力

dremio create table 幾個特殊的能力

dremio 是支援create table的,官方文件比較簡單,只說明瞭可以直接建立,但是dremio 的create table 其實是更加強大的
如果使用了dremio的ui 的話,資料下載匯出就是利用了這個能力,但是預設是在master中執行的,而且資料是放在一個固定的space中
__datasetDownload 同時基於jobid 動態的建立了一個表,然後去讀位元組流,之後包裝的http servlet 進行的處理

幾種支援的格式

txt (csv),json, parquet 而且支援比較多的配置選項

參考測試用例

當讓對於支援create table的是需要開啟CTAS的,同時有時看看原始碼,多瞭解下官方的機制還是很有用的,同時官方原始碼中的一個測試用例也是很不錯的學習
資料

package com.dremio.exec.sql;
import org.junit.Test;
import com.dremio.PlanTestBase;
public class TestCTASWithOptions extends PlanTestBase {
  @Test
  public void csv() throws Exception {
    try {
      test("CREATE TABLE dfs_test.testCsv " +
          "STORE AS (type => 'text', fieldDelimiter => ',') " +
          "AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
      testBuilder()
          .sqlQuery("SELECT * FROM TABLE(\"dfs_test\".\"testCsv\"" +
              "(type => 'text', fieldDelimiter => ',', extractHeader => true))")
          .unOrdered()
          .baselineColumns("region_id", "sales_city")
          .baselineValues("0", "None")
          .baselineValues("1", "San Francisco")
          .go();
    } finally {
      test("DROP TABLE dfs_test.testCsv");
    }
  }
  @Test
  public void csvWithCustomExtension() throws Exception {
    try {
      test("CREATE TABLE dfs_test.csvWithCustomExtension " +
          "STORE AS (type => 'text', fieldDelimiter => ',', outputExtension => 'myparquet') " +
          "AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
      testBuilder()
          .sqlQuery("SELECT * FROM TABLE(\"dfs_test\".\"csvWithCustomExtension\"" +
              "(type => 'text', fieldDelimiter => ',', extractHeader => true))")
          .unOrdered()
          .baselineColumns("region_id", "sales_city")
          .baselineValues("0", "None")
          .baselineValues("1", "San Francisco")
          .go();
    } finally {
      // DROP TABLE doesn't support custom extensions
      //test("DROP TABLE dfs_test.csvWithCustomExtension");
    }
  }
  @Test
  public void csvUnordered() throws Exception {
    try {
      // order the options differently
      test("CREATE TABLE dfs_test.testCsvUnordered " +
          "STORE AS (fieldDelimiter => ',', type => 'text') " +
          "AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
      testBuilder()
          .sqlQuery("SELECT * FROM TABLE(\"dfs_test\".\"testCsvUnordered\"" +
              "(type => 'text', fieldDelimiter => ',', extractHeader => true))")
          .unOrdered()
          .baselineColumns("region_id", "sales_city")
          .baselineValues("0", "None")
          .baselineValues("1", "San Francisco")
          .go();
    } finally {
      test("DROP TABLE dfs_test.testCsvUnordered");
    }
  }
  @Test
  public void csvTabRecordDelimiter() throws Exception {
    try {
      test("CREATE TABLE dfs_test.testCsvTabRecordDelimiter " +
          "STORE AS (type => 'text', fieldDelimiter => ',', lineDelimiter => '\t') " +
          "AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
      testBuilder()
          .sqlQuery("SELECT * FROM " +
              "TABLE(\"dfs_test\".\"testCsvTabRecordDelimiter\"" +
              "(type => 'text', fieldDelimiter => ',', lineDelimiter => '\t', extractHeader => true))")
          .unOrdered()
          .baselineColumns("region_id", "sales_city")
          .baselineValues("0", "None")
          .baselineValues("1", "San Francisco")
          .go();
    } finally {
      test("DROP TABLE dfs_test.testCsvTabRecordDelimiter");
    }
  }
  @Test
  public void tsv() throws Exception {
    try {
      test("CREATE TABLE dfs_test.testTsv STORE AS (type => 'teXt', fieldDelimiter => '\t') " +
          "AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
      testBuilder()
          .sqlQuery("SELECT * FROM " +
              "TABLE(\"dfs_test\".\"testTsv\"(type => 'text', fieldDelimiter => '\t', extractHeader => true))")
          .unOrdered()
          .baselineColumns("region_id", "sales_city")
          .baselineValues("0", "None")
          .baselineValues("1", "San Francisco")
          .go();
    } finally {
      test("DROP TABLE dfs_test.testTsv");
    }
  }
  @Test
  public void json() throws Exception {
    try {
      test("CREATE TABLE dfs_test.testJson " +
          "STORE AS (type => 'json') " +
          "AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
      testBuilder()
          .sqlQuery("SELECT * FROM " +
              "TABLE(\"dfs_test\".\"testJson\"(type => 'json'))")
          .unOrdered()
          .baselineColumns("region_id", "sales_city")
          .baselineValues(0L, "None")
          .baselineValues(1L, "San Francisco")
          .go();
    } finally {
      test("DROP TABLE dfs_test.testJson");
    }
  }
  @Test
  public void jsonWithCustomExtension() throws Exception {
    try {
      test("CREATE TABLE dfs_test.jsonWithCustomExtension " +
          "STORE AS (type => 'json', outputExtension => 'myjson') " +
          "AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
      testBuilder()
          .sqlQuery("SELECT * FROM " +
              "TABLE(\"dfs_test\".\"jsonWithCustomExtension\"(type => 'json'))")
          .unOrdered()
          .baselineColumns("region_id", "sales_city")
          .baselineValues(0L, "None")
          .baselineValues(1L, "San Francisco")
          .go();
    } finally {
      // DROP TABLE doesn't support custom extensions
      //test("DROP TABLE dfs_test.jsonWithCustomExtension");
    }
  }
  @Test
  public void parquet() throws Exception {
    try {
      test("CREATE TABLE dfs_test.testParquet " +
          "STORE AS (type => 'parquet') " +
          "AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
      testBuilder()
          .sqlQuery("SELECT * FROM " +
              "TABLE(\"dfs_test\".\"testParquet\"(type => 'parquet'))")
          .unOrdered()
          .baselineColumns("region_id", "sales_city")
          .baselineValues(0L, "None")
          .baselineValues(1L, "San Francisco")
          .go();
    } finally {
      test("DROP TABLE dfs_test.testParquet");
    }
  }
  @Test
  public void parquetWithCustomExtension() throws Exception {
    try {
      test("CREATE TABLE dfs_test.parquetWithCustomExtension " +
          "STORE AS (type => 'parquet', outputExtension => 'myparquet') " +
          "AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
      testBuilder()
          .sqlQuery("SELECT * FROM " +
              "TABLE(\"dfs_test\".\"parquetWithCustomExtension\"(type => 'parquet'))")
          .unOrdered()
          .baselineColumns("region_id", "sales_city")
          .baselineValues(0L, "None")
          .baselineValues(1L, "San Francisco")
          .go();
    } finally {
      // DROP TABLE doesn't support custom extensions
      //test("DROP TABLE dfs_test.parquetWithCustomExtension");
    }
  }
  @Test
  public void parquetWithPartition() throws Exception {
    try {
      test("CREATE TABLE dfs_test.testParquetWithPartition " +
          "PARTITION BY (region_id) " +
          "STORE AS (type => 'parquet') " +
          "AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
      testBuilder()
          .sqlQuery("SELECT dir0, region_id, sales_city FROM TABLE(\"dfs_test\".\"testParquetWithPartition\"(type => 'parquet'))")
          .unOrdered()
          .baselineColumns("dir0", "region_id", "sales_city")
          .baselineValues("0_0", 0L, "None")
          .baselineValues("1_1", 1L, "San Francisco")
          .go();
    } finally {
      test("DROP TABLE dfs_test.testParquetWithPartition");
    }
  }
  @Test
  public void negativeCaseUnsupportedType() throws Exception {
    final String query = "CREATE TABLE dfs_test.negativeCaseUnsupportedType " +
        "STORE AS (type => 'unknownFormat') " +
        "AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2";
    errorMsgTestHelper(query, "unknown type unknownFormat, expected one of");
  }
  @Test
  public void negativeCaseUnknownOption() throws Exception {
    final String query = "CREATE TABLE dfs_test.negativeCaseUnknownOptions " +
        "STORE AS (type => 'json', unknownOption => 'sd') " +
        "AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2";
    errorMsgTestHelper(query, "Unknown storage option(s): {unknownOption=sd}");
  }
  @Test
  public void csvWithSingleWriter() throws Exception {
    try {
      final String query = "CREATE TABLE dfs_test.csvWithSingleWriter " +
          "STORE AS (type => 'text', fieldDelimiter => ',') " +
          "WITH SINGLE WRITER " +
          "AS SELECT region_id, count(*) cnt FROM cp.\"region.json\" GROUP BY region_id ORDER BY region_id LIMIT 2";
      test(query);
      testBuilder()
          .sqlQuery("SELECT * FROM TABLE(\"dfs_test\".\"csvWithSingleWriter\"" +
              "(type => 'text', fieldDelimiter => ',', extractHeader => true))")
          .unOrdered()
          .baselineColumns("region_id", "cnt")
          .baselineValues("0", "1")
          .baselineValues("1", "1")
          .go();
    } finally {
      test("DROP TABLE dfs_test.csvWithSingleWriter");
    }
  }
}

說明

實際上我們也可以參考此玩法,使用s3開啟CTAS,將需要匯出的資料放到s3中,之後基於s3進行統一的資料匯出處理,好處是可以規避
ui 100萬資料匯出的一個限制,同時進行不同場景資料的優化處理,當然create還支援資料分片以及排序的。。。

參考資料

https://docs.dremio.com/sql-reference/sql-commands/tables/?parent=sql-commands