1. 程式人生 > >Spark SQL官方文件閱讀--待完善

Spark SQL官方文件閱讀--待完善

1,DataFrame是一個將資料格式化為列形式的分散式容器,類似於一個關係型資料庫表. 程式設計入口:SQLContext 2,SQLContext由SparkContext物件建立 也可建立一個功能更加全面的HiveContext物件,HiveContext是SQLContext的子類,從API中可以看出HiveContext extends SQLContext,所以能用SQLContext的地方也能用HiveContext 3,使用HiveContext可以使用更加複雜的HiveQL語句,可以使用Hive UDF,可以從Hive表中讀取資料. 4,建立DataFrames 可以從已有的RDD, Hive表或者資料來源建立DataFrame sqlContext.read方法返回一個DataFrameReader物件,該物件有jdbc,json,parquet,load等方法從資料來源載入資料,生成一個DataFrame物件的功能 DataFrame物件的show方法,顯示DataFrame中的前20條記錄     write方法返回一個DataFrameWriter物件,可以將資料寫入輸出中 sqlContext.sql方法,執行sql語句,並返回一個DataFrame物件 5,將已有RDD轉化為DataFrame,有兩種方法     (1)使用reflection將RDD轉化成DataFrame     Spark SQL的介面支援自動將一個包含case類的RDD轉化成DataFrame.case類中定義了table表結構.case類的引數被反射機制讀入,並作為表的列名.case類也可以使用陣列等複雜的資料結構.     (2)通過一個程式設計介面構建表結構,然後應用與已有RDD     在case表無法預先定義好的情況下,可以使用這種方法構建DataFrame.           a,從原始RDD建立一個行RDD(RDD of Rows)           b,建立一個與a步驟中相匹配的schema,使用StructType           c,使用SQLContext的createDataFrame方法將schema轉化成DataFrame 6,資料來源(Data Sources)     通過DataFrame,Spark
 SQL支援在多種資料上進行操作.一個DataFrame能夠被當做普通RDD進行操作,也可以註冊為一個臨時表.     sqlContext.read方法返回一個DataFrameReader物件,該物件有多種方法可以將資料來源載入到程式中. 7,資料儲存模式(Save Modes)     指定不同的儲存模式,將對資料的寫出進行不同的操作.注意這些儲存模式沒有實現鎖功能,也不能自動實現,所以,當有多個使用者向同一位置寫出時會不安全.     SaveMode.ErrorIfExists(預設)     如果資料已經存在,拋異常     SaveMode.Append                       如果資料以存在,追加到最後     SaveMode.Overwrite                    如果資料以存在,覆蓋(先刪除,)     SaveMode.Ignore                           如果資料已存在,則不儲存,類似於(create table if not exists) 8,持久化到表中(Saving to Persistent Tables)     使用HiveContext時,使用saveAsTable命令DataFrames也可以儲存為持久化的表. 9,Parquet檔案     Parquet是基於列儲存的資料結構.     (1)parquet檔案載入     sqlContext.read.parquet("people.parquet");(也可用load方法載入)     (2)parquet檔案輸出     val people:RDD[Person], people.write.parquet("people.parquet") 10,表分割槽(Partition discovery)        某個分割槽如下所示:      
    將path/to/table路徑傳遞給SQLContext.read.parquet或者SQLContext.read.load,SparkSQL會自動的從路徑中提取分割槽資訊. 最終的DataFrame會變成如下形式:          11,結構的合併(Schema merging)     ProtocolBuffer,Avro,Thrifs和Parquet都支援結構自動調整.使用者可以以一個簡單的表結構開始,慢慢的增加一些其他欄位.這樣,終端使用者會得到很大不同但是內在又有關聯的Parquet檔案.在SparkSQL中,這些Parquet資料來源可以被自動檢測並且將這些結構進行合併.     val df1 = ....     df1.write.parquet("data/test_table/key=1")     val df2 = ....     df2.write.parquet("data/test_table/key=2")     val df3 = sqlContext.read.parquet("data/test_table") 12,Parquet的相關配置(Configuration)     Parquet資料的配置可以使用SQLContext的setConf方法進行設定,或者使用SQL中的命令,SET key=value     
spark
.sql.parquet.binaryAsString
false 一些其他的parquet系統,當把該parquet schema寫出時,不區分二進位制資料和string字串.這個屬性告訴Spark SQL把二進位制資料解析成string字串
spark.sql.parquet.int96AsTimestamp true 其他parquet系統將timestamp儲存為INT96型.Spark同樣將timestamp儲存為INT96,不失精度.
spark.sql.parquet.cacheMetadata true 啟用快取一些parquet schema metadata的功能,提高訪問靜態資料的速度
spark.sql.parquet.compression.codec gzip 設定parquet檔案寫出的壓縮方式,包括:uncompressed,snappy,gzip, lzo
spark.sql.parquet.filterPushdown false Turn on Parquet filter pushdown optimization. This feature is turned off by default because of a known bug in Parquet 1.6.0rc3 (PARQUET-136). However, if your table doesn't contain any nullable string or binary columns, it's still safe to turn this feature on.
spark.sql.hive.convertMetastoreParquet true 設定為false時,Spark SQL會使用Hive SerDe來處理paquet表,而不是使用the built in support
13,JSON資料集(JSON Datasets)     使用SQLContext.read.json()方法載入一個String RDD或者JSON檔案     需要注意的是,這裡所謂的json檔案不是傳統的JSON檔案.每一行必須包含一個分離的,自包含的標準的JSON物件.所以,一個多行的JSON檔案無法讀入.     也就是說,這裡載入的json檔案如果有多行的話,每一行都必須是一個標準的JSON檔案.     如spark example自帶的people.json檔案:      {"name":"Michael"}
     {"name":"Andy", "age":30}
     {"name":"Justin", "age":19} 14,Hive表(Hive Tables)     Spark SQL同樣支援從Hive中讀取和寫入資料.然而,由於Hive需要依賴很多其他的系統,在預設的Spark assembly中並不包含對Hive的支援.可以在編譯Spark時新增引數-Phive和-Phive-thriftserver.     初始化一個HiveContext物件,該物件繼承自SQLContext,並且增加了在MetaStore中的表的查詢功能和使用HiveQL查詢的功能.及時沒有安裝Hive也可以使用HiveContext.如果沒有hive-site.xml檔案,則會自動的在當前資料夾中建立metastore_db和warehouse     val sqlContext = new HiveContext(sc)     與不同版本的Hive Metastore互動(Interacting with Different Versions of Hive Metastore) 15,使用JDBC連線其他的資料(JDBC To Other Databases)     實現該功能需要使用到JdbcRDD類.     程式執行時,需要包含JDBC驅動jar包.如果要在spark-shell中使用jdbc功能,需要按如下方式啟動spark-shell:     SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell     val jdbcDF = sqlContext.load("jdbc", Map("url"->"jdbc:postgresql:dbserver","dbtable"->"schema.tablename")) 各引數含義:     
url JDBC URL
dbtable 將要讀入的表
driver The class name of the JDBC driver needed to connect to this URL. This class will be loaded on the master and workers before running an JDBC commands to allow the driver to register itself with the JDBC subsystem.
partitionColumn, lowerBound, upperBound, numPartitions These options must all be specified if any of them is specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned.
(1)注意事項(Troubleshooting)     a)JDBC驅動class必須在所有executors上可訪問.這樣實現的一個方法是修改所有worker節點上的compute_classpath.sh檔案,使之包含這個driver JARs     b)有些資料庫,例如H2,會將所有names轉化成大寫,所以需要在Spark SQL中使用upper case來對應這些names 15,效能調優(Performance Tuning)     (1)資料快取到記憶體中(Caching Data In Memory)     呼叫sqlContext.cacheTable("tableName")或者dataFrame.cache()將Spark SQL資料快取到記憶體中.將記憶體中的表移除,sqlContext.uncacheTable("tableName")     使用SQLContext物件的setConf方法或者使用SQL SET key=value命令     
spark.sql.inMemoryColumnarStorage.compressed true Spark SQL會基於資料的資訊自動為每一個欄位選擇壓縮方式
spark.sql.inMemoryColumnarStorage.batchSize 1000 控制columnar 快取的大小.引數越大效率越高,但是可能會導致記憶體溢位
    (2)其他可選配置(Other Configuration Options)  16,分散式SQL引擎(Distributed SQL Engine)     Spark SQL也可以當做一個分散式的查詢引擎,使用JDBC/ODBC或者命令列(command-line)的方式     (1)執行Thrift JDBC/ODBC服務 待完善......     (2)執行Spark SQL CLI     可以使用Spark SQL命令列模式執行相關的HiveQL語句,在本地模式(local mode)中使用.所以,Spark SQL CLI不能與Thrift JDBC服務建立連線.     ./bin/spark-sql 17,升級指南(Migration Guide)     (1)從Spark SQL 1.3升級到1.4  待完善.......     (2)從Spark SQL 1.0-1.2 升級到1.3  待完善....... 18, Shark使用者升級  待完善....... 19,與Hive相容(Compatibility with Apache Hive)     Spark SQL可與Hive的Metastore,SerDes(序列化與反序列化)和UDF(使用者自定義函式)相相容,spark-1.4基於Hive 0.12.0和0.13.1     (1)在已有Hive Warehouses中安裝     Spark SQL Thrift JDBC server 是開箱即用(out of box)的,即不需要修改現有的Hive Metastore,也不需要更改表資料或分割槽的安放位置.     (2)支援的Spark特性(Supported Hive Features)     a),Hive查詢語句           包括,SELECT, GROUP BY, ORDER BY, CLUSTER BY, SORT BY     b),所有的Hive操作,包括           關係型操作:===<><>>=<=等      算術操作:+,-,*,/,%等      邏輯操作:AND, &&, OR, ||等      複雜的型別構造(type constructors)      數學方法:sign, ln, cos等      字串函式:instr, length, printf等     c),使用者自定義函式(UDF)     d),使用者自定義聚合函式(UDAF)     e),使用者自定義序列化與反序列化(SerDes)     f),Joins           JOIN, {LEFT|RIGHT|FULL} OUTER JOIN, LEFT SEMI JOIN, CROSS JOIN     g),Unions     h),子查詢           select col from (select a + b as col from t1) t2     i),抽樣(Sampling)     j),Explain     k),表分割槽     l),檢視     m),所有Hive DDL語句,包括           CREATE TABLE, CREATE TABLE AS SELECT, ALTER TABLE     n),大部分Hive資料型別           TINYINT, SMALLINT, INT, BIGINT, BOOLEAN, FLOAT, DOUBLE, STRING, BINARY, TIMESTAMP, DATE, ARRAY<>, MAP<>, STRUCT<>     (3)不支援的hive函式   待完善.......... 20,資料型別(Data Types)      Spark SQL和DataFrames支援以下資料型別:      a)數字型別           ByteType,1個位元組           ShortType,2個位元組           IntegerType,4個位元組           LongType,8個位元組           FloatType,4個位元組           DoubleType,8個位元組           DecimalType,      b)字串型別           StringType      c)二進位制位型別           BinaryType      d)布林型別           BooleanType      e) 時間型別           TimestampType           DateType     f)複雜型別           ArrayType(elementType, containsNull)           MapType(keyType, valueType, valueContainsNull)           StructType(fields)