1. 程式人生 > >spark獲取資料解讀(部分)



理解dbData=sc.read.schema(mySchema).format("com.xxxx.spark.sql").options(uri=dbUrl, database= myDatabase, collection=myCollection).load()

官方備註為"The entry point to programming Spark with the Dataset and DataFrame API"
即:使用資料集和DataFrame API編寫Spark程式的入口點
>>> spark = SparkSession.builder \\
    ...     .master("local") \\
    ...     .appName("Word Count") \\
    ...     .config("spark.some.config.option", "some-value") \\
    ...     .getOrCreate()

    .. autoattribute:: builder
Returns a DataFrameReader that can be used to read data in as a DataFrame.
大概意思就是read方法返回的是一個DataFrameReader ,它可以用來讀入資料並得到一個DataFrame.
def read(self):
        Returns a :class:`DataFrameReader` that can be used to read data
        in as a :class:`DataFrame`.
        :return: :class:`DataFrameReader`
        return DataFrameReader(self._wrapped)
從匯入語句from pyspark.sql.readwriter import DataFrameReader可以知道DataFrameReader的所在位置
class DataFrameReader(OptionUtils):
    Interface used to load a :class:`DataFrame` from external storage systems
    (e.g. file systems, key-value stores, etc). Use :func:`spark.read`
    to access this.
    該介面用於從外部儲存系統(例如檔案系統、鍵值儲存等等)載入一個`DataFrame`類。使用:函式:“spark.read” 來訪問。
.. versionadded:: 1.4 """ def __init__(self, spark): self._jreader = spark._ssql_ctx.read() self._spark = spark def _df(self, jdf): from pyspark.sql.dataframe import DataFrame return DataFrame(jdf, self._spark) @since(1.4) def format(self, source): """Specifies the input data source format. 指定輸入資料來源格式。 :param source: string, name of the data source, e.g. 'json', 'parquet'. 字串型別,資料來源名稱 >>> df = spark.read.format('json').load('python/test_support/sql/people.json') >>> df.dtypes [('age', 'bigint'), ('name', 'string')] """ self._jreader = self._jreader.format(source) return self @since(1.4) def schema(self, schema): """Specifies the input schema. 指定輸入模式 Some data sources (e.g. JSON) can infer the input schema automatically from data. By specifying the schema here, the underlying data source can skip the schema inference step, and thus speed up data loading. 一些資料來源(例如JSON)可以從資料中自動推斷出輸入模式。 在這裡通過指定模式,底層資料來源可以跳過模式推理步驟,從而加快資料載入速度。 :param schema: a :class:`pyspark.sql.types.StructType` object """ from pyspark.sql import SparkSession if not isinstance(schema, StructType): raise TypeError("schema should be StructType") spark = SparkSession.builder.getOrCreate() jschema = spark._jsparkSession.parseDataType(schema.json()) self._jreader = self._jreader.schema(jschema) return self @since(1.4) def options(self, **options): """Adds input options for the underlying data source. 為基礎資料來源新增輸入選項 You can set the following option(s) for reading files: * ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps in the JSON/CSV datasources or partition values. If it isn't set, it uses the default value, session local timezone. """ for k in options: self._jreader = self._jreader.option(k, to_str(options[k])) return self @since(1.4) def load(self, path=None, format=None, schema=None, **options): """Loads data from a data source and returns it as a :class`DataFrame`. 從資料來源載入資料並將其作為一個DataFrame類返回。 :param path: optional string or a list of string for file-system backed data sources. 可選的,字串或用於檔案系統支援的資料來源的字串列表。 :param format: optional string for format of the data source. Default to 'parquet'. 可選的,用於資料來源格式化的字串。預設為“parquet”。 :param schema: optional :class:`pyspark.sql.types.StructType` for the input schema. 可選的, 輸入模式 :param options: all other string options 其他的所有字串選項 >>> df = spark.read.load('python/test_support/sql/parquet_partitioned', opt1=True, ... opt2=1, opt3='str') >>> df.dtypes [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')] >>> df = spark.read.format('json').load(['python/test_support/sql/people.json', ... 'python/test_support/sql/people1.json']) >>> df.dtypes [('age', 'bigint'), ('aka', 'string'), ('name', 'string')] """ if format is not None: self.format(format) if schema is not None: self.schema(schema) self.options(**options) if isinstance(path, basestring): return self._df(self._jreader.load(path)) elif path is not None: if type(path) != list: path = [path] return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path))) else: return self._df(self._jreader.load()) @since(1.4) def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, predicates=None, properties=None): """ Construct a :class:`DataFrame` representing the database table named ``table`` accessible via JDBC URL ``url`` and connection ``properties``. 構建一個`DataFrame`類,它表示一個數據庫表名為‘table’可以通過JDBC UR`url`連結和連線屬性進行。 Partitions of the table will be retrieved in parallel if either ``column`` or ``predicates`` is specified. ``lowerBound`, ``upperBound`` and ``numPartitions`` is needed when ``column`` is specified. 如果有`column`或者`predicates`任何一個被指定,表的分割槽將被並行檢索。那麼`lowerBound`、 `lowerBound`、`numPartitions`都必須被指定。 If both ``column`` and ``predicates`` are specified, ``column`` will be used. 如果`column`和`predicates`同時被指定,那麼column`也將被指定。 .. note:: Don't create too many partitions in parallel on a large cluster; \ otherwise Spark might crash your external database systems. 不要在一個大型叢集上建立太多的並行分割槽,否則,Spark可能會破壞您的外部資料庫系統 :param url: a JDBC URL of the form ``jdbc:subprotocol:subname`` 一個JDBC url :param table: the name of the table 表名 :param column: the name of an integer column that will be used for partitioning; if this parameter is specified, then ``numPartitions``, ``lowerBound`` (inclusive), and ``upperBound`` (exclusive) will form partition strides for generated WHERE clause expressions used to split the column ``column`` evenly :param lowerBound: the minimum value of ``column`` used to decide partition stride :param upperBound: the maximum value of ``column`` used to decide partition stride :param numPartitions: the number of partitions :param predicates: a list of expressions suitable for inclusion in WHERE clauses; each one defines one partition of the :class:`DataFrame` 一個適合包含在where字句中的表示式列表,每一個都定義了一個`DataFrame`分割槽 :param properties: a dictionary of JDBC database connection arguments. Normally at least properties "user" and "password" with their corresponding values. For example { 'user' : 'SYSTEM', 'password' : 'mypassword' } 一個JDBC資料庫連線引數的字典。通常情況下至少包含使用者和密碼兩個屬性 :return: a DataFrame """ if properties is None:#先檢查連線引數,如果為None則設定它為一個空字典 properties = dict() jprop = JavaClass("java.util.Properties", self._spark._sc._gateway._gateway_client)() for k in properties:#通過引數設定相關配置 jprop.setProperty(k, properties[k]) if column is not None:#如果指定了列名稱,則檢查lowerBound、upperBound、numPartitions的值,如果他們中任意一個值為None則引發異常 assert lowerBound is not None, "lowerBound can not be None when ``column`` is specified" assert upperBound is not None, "upperBound can not be None when ``column`` is specified" assert numPartitions is not None, \ "numPartitions can not be None when ``column`` is specified" return self._df(self._jreader.jdbc(url, table, column, int(lowerBound), int(upperBound), int(numPartitions), jprop)) if predicates is not None: gateway = self._spark._sc._gateway jpredicates = utils.toJArray(gateway, gateway.jvm.java.lang.String, predicates) return self._df(self._jreader.jdbc(url, table, jpredicates, jprop)) return self._df(self._jreader.jdbc(url, table, jprop))



