《Spark Python API 官方文檔中文版》 之 pyspark.sql (一)
官網地址:http://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html
pyspark.sql module
Module Context
Spark SQL和DataFrames重要的類有:
pyspark.sql.SQLContext DataFrame和SQL方法的主入口
pyspark.sql.DataFrame 將分布式數據集分組到指定列名的數據框中
pyspark.sql.Column DataFrame中的列
pyspark.sql.Row DataFrame數據的行
pyspark.sql.HiveContext 訪問Hive數據的主入口
pyspark.sql.GroupedData 由DataFrame.groupBy()創建的聚合方法集
pyspark.sql.DataFrameNaFunctions 處理丟失數據(空數據)的方法
pyspark.sql.functions DataFrame可用的內置函數
pyspark.sql.types 可用的數據類型列表
pyspark.sql.Window 用於處理窗口函數
1.class pyspark.sql.SQLContext(sparkContext, sqlContext=None)
SQLContext可以用來創建DataFrame、註冊DataFrame為表、在表上執行SQL、緩存表、讀取parquet文件。
參數:● sparkContext - 支持sqlcontext的sparkcontext
1.1 applySchema(rdd, schema)
註:在1.3中已過時,使用createDataFrame()代替。
1.2 cacheTable(tableName)
緩存表到內存中
1.3 clearCache()
從內存緩存刪除所有緩存表。
1.4 createDataFrame(data, schema=None, samplingRatio=None)
從元組/列表RDD或列表或pandas.DataFrame創建DataFrame
當模式是列名的列表時,每個列的類型會從數據中推斷出來。
當模式沒有時,將嘗試從數據中推斷模式(列名和類型),數據應該是行或命名元組或字典的RDD。
如果模式推理是必要的,samplingRatio用來確定用於模式推理的行比率。如果沒有samplingratio,將使用第一行。
參數:● data - 行或元組或列表或字典的RDD、list、pandas.DataFrame.
● schema – 一個結構化類型或者列名列表,默認是空。
samplingRatio – 用於推斷的行的樣本比率。
返回: DataFrame
>>> l=[(‘Alice‘,1)] >>> sqlContext.createDataFrame(l).collect() [Row(_1=u‘Alice‘, _2=1)] >>> sqlContext.createDataFrame(l,[‘name‘,‘age‘]).collect() [Row(name=u‘Alice‘, age=1)]
>>> d=[{‘name‘:‘Alice‘,‘age‘:1}] >>> sqlContext.createDataFrame(d).collect() [Row(age=1, name=u‘Alice‘)]
>>> rdd=sc.parallelize(l) >>> sqlContext.createDataFrame(rdd).collect() [Row(_1=u‘Alice‘, _2=1)] >>> df=sqlContext.createDataFrame(rdd,[‘name‘,‘age‘]) >>> df.collect() [Row(name=u‘Alice‘, age=1)]
>>> sqlContext.createDataFrame(df.toPandas()).collect() [Row(name=u‘Alice‘, age=1)] >>> sqlContext.createDataFrame(pandas.DataFrame([[1, 2]])).collect() [Row(0=1, 1=2)]
1.5 createExternalTable(tableName, path=None, source=None, schema=None, **options)
創建基於數據源中的數據的外部表.
返回與外部表關聯的DataFrame
數據源由源和一組選項指定。如果未指定源,那麽將使用由spark.sql.sources.default 配置的默認的數據源配置。
通常,一個模式可以被提供作為返回的DataFrame的模式,然後創建外部表。
返回: DataFrame
1.6 dropTempTable(tableName)
從目錄中刪除臨時表
>>> sqlContext.registerDataFrameAsTable(df, "table1") >>> sqlContext.dropTempTable("table1")
1.7 getConf(key, defaultValue)
返回指定鍵的Spark SQL配置屬性值。
如果鍵沒有指定返回默認值。
1.8 inferSchema(rdd, samplingRatio=None)
註:在1.3中已過時,使用createDataFrame()代替。
1.9 jsonFile(path, schema=None, samplingRatio=1.0)
從一個文本文件中加載數據,這個文件的每一行均為JSON字符串。
註:在1.4中已過時,使用DataFrameReader.json()代替。
1.10 jsonRDD(rdd, schema=None, samplingRatio=1.0)
從一個已經存在的RDD中加載數據,這個RDD中的每一個元素均為一個JSON字符串。
如果提供了模式,將給定的模式應用到這個JSON數據集。否則,它根據數據集的采樣比例來確定模式。
>>> json=sc.parallelize(["""{"name":"jack","addr":{"city":"beijing","mail":"10001"}}""","""{"name":"john","addr":{"city":"shanghai","mail":"10002"}}"""]) >>> df1 = sqlContext.jsonRDD(json) >>> df1.collect() [Row(addr=Row(city=u‘beijing‘, mail=u‘10001‘), name=u‘jack‘), Row(addr=Row(city=u‘shanghai‘, mail=u‘10002‘), name=u‘john‘)]
>>> df2 = sqlContext.jsonRDD(json,df1.schema) >>> df2.collect() [Row(addr=Row(city=u‘beijing‘, mail=u‘10001‘), name=u‘jack‘), Row(addr=Row(city=u‘shanghai‘, mail=u‘10002‘), name=u‘john‘)]
1.11 load(path=None, source=None, schema=None, **options)
返回數據源中的數據集為DataFrame.
註:在1.4中已過時,使用DataFrameReader.load()代替。
1.12 newSession()
返回一個新的SQLContext做為一個新的會話,這個會話有單獨的SQLConf,註冊臨時表和UDFs,但共享sparkcontext和緩存表。
1.13 parquetFile(*paths)
加載Parquet文件,返回結果為DataFrame
註:在1.4中已過時,使用DataFrameReader.parquet()代替。
1.14 range(start, end=None, step=1, numPartitions=None)
創建只有一個名為id的長類型的列的DataFrame,包含從開始到結束的按照一定步長的獨立元素。
參數:● start - 開始值
● end - 結束值
● step - 增量值(默認:1)
● numPartitions – DataFrame分區數
返回: DataFrame
>>> sqlContext.range(1, 7, 2).collect() [Row(id=1), Row(id=3), Row(id=5)]
如果僅有一個參數,那麽這個參數被作為結束值。
>>> sqlContext.range(3).collect() [Row(id=0), Row(id=1), Row(id=2)]
1.15 read
返回一個DataFrameReader,可用於讀取數據為DataFrame。
1.16 registerDataFrameAsTable(df, tableName)
註冊給定的DataFrame作為目錄中的臨時表。
臨時表只在當前SQLContext實例有效期間存在。
>>> sqlContext.registerDataFrameAsTable(df, "table1")
1.17 registerFunction(name, f, returnType=StringType)
註冊python方法(包括lambda方法),作為UDF,這樣可以在 SQL statements中使用。
除了名稱和函數本身之外,還可以選擇性地指定返回類型。當返回類型沒有指定時,默認自動轉換為字符串。對於任何其他返回類型,所生成的對象必須與指定的類型匹配。
參數:● name - UDF名稱
● f – python方法
● 返回類型 數據類型對象
>>> sqlContext.registerFunction("stringLengthString", lambda x: len(x)) >>> sqlContext.sql("SELECT stringLengthString(‘test‘)").collect() [Row(_c0=u‘4‘)]
>>> from pyspark.sql.types import IntegerType >>> sqlContext.registerFunction("stringLengthInt", lambda x: len(x), IntegerType()) >>> sqlContext.sql("SELECT stringLengthInt(‘test‘)").collect() [Row(_c0=4)]
>>> from pyspark.sql.types import IntegerType >>> sqlContext.udf.register("stringLengthInt", lambda x: len(x), IntegerType()) >>> sqlContext.sql("SELECT stringLengthInt(‘test‘)").collect() [Row(_c0=4)]
1.18 setConf(key, value)
設置給定的Spark SQL配置屬性
1.19 sql(sqlQuery)
返回DataFrame代表給定查詢的結果
參數:● sqlQuery - sql語句
返回: DataFrame
>>> l=[(1,‘row1‘),(2,‘row2‘),(3,‘row3‘)] >>> df = sqlContext.createDataFrame(l,[‘field1‘,‘field2‘]) >>> sqlContext.registerDataFrameAsTable(df, "table1") >>> df2 = sqlContext.sql("SELECT field1 AS f1, field2 as f2 from table1") >>> df2.collect() [Row(f1=1, f2=u‘row1‘), Row(f1=2, f2=u‘row2‘), Row(f1=3, f2=u‘row3‘)]
1.20 table(tableName)
返回指定的表為DataFrame
返回: DataFrame
>>> l=[(1,‘row1‘),(2,‘row2‘),(3,‘row3‘)] >>> df = sqlContext.createDataFrame(l,[‘field1‘,‘field2‘]) >>> sqlContext.registerDataFrameAsTable(df, "table1") >>> df2 = sqlContext.table("table1") >>> sorted(df.collect()) == sorted(df2.collect()) True
1.21 tableNames(dbName=None)
返回數據庫的表名稱列表
參數:dbName – 字符串類型的數據庫名稱.默認為當前的數據庫。
返回: 字符串類型的表名稱列表
>>> l=[(1,‘row1‘),(2,‘row2‘),(3,‘row3‘)] >>> df = sqlContext.createDataFrame(l,[‘field1‘,‘field2‘]) >>> sqlContext.registerDataFrameAsTable(df, "table1") >>> "table1" in sqlContext.tableNames() True >>> "table1" in sqlContext.tableNames("db") True
1.22 tables(dbName=None)
返回一個包含表名稱的DataFrame從給定的數據庫。
如果數據庫名沒有指定,將使用當前的數據庫。
返回的DataFrame包含兩列: 表名稱和是否臨時表 (一個Bool類型的列,標識表是否為臨時表)。
參數:● dbName – 字符串類型的使用的數據庫名
返回: DataFrame
>>> l=[(1,‘row1‘),(2,‘row2‘),(3,‘row3‘)] >>> df = sqlContext.createDataFrame(l,[‘field1‘,‘field2‘]) >>> sqlContext.registerDataFrameAsTable(df, "table1") >>> df2 = sqlContext.tables() >>> df2.filter("tableName = ‘table1‘").first() Row(tableName=u‘table1‘, isTemporary=True)
1.23 udf
返回一個註冊的UDF為UDFRegistration。
返回: UDFRegistration
1.24 uncacheTable(tableName)
從內存的緩存表中移除指定的表。
2.class pyspark.sql.HiveContext(sparkContext, hiveContext=None)
Hive此處暫略
《Spark Python API 官方文檔中文版》 之 pyspark.sql (一)