1. 程式人生 > >spark獲取資料解讀(部分)

spark獲取資料解讀(部分)

本系列文章是下載的是spark2.2.1版本的原始碼進行相關分析和學習。

理解dbData=sc.read.schema(mySchema).format("com.xxxx.spark.sql").options(uri=dbUrl, database= myDatabase, collection=myCollection).load()

1.SparkSession類的概括(今天我們主要講解read,所以SparkSession就略過哈):
官方備註為"The entry point to programming Spark with the Dataset and DataFrame API"
即:使用資料集和DataFrame API編寫Spark程式的入口點
使用示例:
>>> spark = SparkSession.builder \\
    ...     .master("local") \\
    ...     .appName("Word Count") \\
    ...     .config("spark.some.config.option", "some-value") \\
    ...     .getOrCreate()

    .. autoattribute:: builder
       :annotation:
    """
2.而今天我們來理解一下sparkSession的read方法
官網給出如下內容:
read
Returns a DataFrameReader that can be used to read data in as a DataFrame.
Returns:DataFrameReader
大概意思就是read方法返回的是一個DataFrameReader ,它可以用來讀入資料並得到一個DataFrame.
原始碼如下:
 
def read(self):
        """
        Returns a :class:`DataFrameReader` that can be used to read data
        in as a :class:`DataFrame`.
		返回`DataFrameReader`類:可以用來讀取資料且格式為DataFrame
        :return: :class:`DataFrameReader`
        """
        return DataFrameReader(self._wrapped)
從上面的原始碼中我們可以看出,SparkSession的read()方法實質上是呼叫的DataFrameReader()方法。
那下面我們就找到這個方法一探究竟
從匯入語句from pyspark.sql.readwriter import DataFrameReader可以知道DataFrameReader的所在位置
DataFrameReader這個類包含了很多種方法,這裡我們只分析我們用到的就可以了:
schema指定輸入模式
format指定輸入資料來源格式。
options為基礎資料來源新增輸入選項
原始碼如下:
class DataFrameReader(OptionUtils):
    """
    Interface used to load a :class:`DataFrame` from external storage systems
    (e.g. file systems, key-value stores, etc). Use :func:`spark.read`
    to access this.
    該介面用於從外部儲存系統(例如檔案系統、鍵值儲存等等)載入一個`DataFrame`類。使用:函式:“spark.read” 來訪問。
.. versionadded:: 1.4 """ def __init__(self, spark): self._jreader = spark._ssql_ctx.read() self._spark = spark def _df(self, jdf): from pyspark.sql.dataframe import DataFrame return DataFrame(jdf, self._spark) @since(1.4) def format(self, source): """Specifies the input data source format. 指定輸入資料來源格式。 :param source: string, name of the data source, e.g. 'json', 'parquet'. 字串型別,資料來源名稱 >>> df = spark.read.format('json').load('python/test_support/sql/people.json') >>> df.dtypes [('age', 'bigint'), ('name', 'string')] """ self._jreader = self._jreader.format(source) return self @since(1.4) def schema(self, schema): """Specifies the input schema. 指定輸入模式 Some data sources (e.g. JSON) can infer the input schema automatically from data. By specifying the schema here, the underlying data source can skip the schema inference step, and thus speed up data loading. 一些資料來源(例如JSON)可以從資料中自動推斷出輸入模式。 在這裡通過指定模式,底層資料來源可以跳過模式推理步驟,從而加快資料載入速度。 :param schema: a :class:`pyspark.sql.types.StructType` object """ from pyspark.sql import SparkSession if not isinstance(schema, StructType): raise TypeError("schema should be StructType") spark = SparkSession.builder.getOrCreate() jschema = spark._jsparkSession.parseDataType(schema.json()) self._jreader = self._jreader.schema(jschema) return self @since(1.4) def options(self, **options): """Adds input options for the underlying data source. 為基礎資料來源新增輸入選項 You can set the following option(s) for reading files: * ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps in the JSON/CSV datasources or partition values. If it isn't set, it uses the default value, session local timezone. """ for k in options: self._jreader = self._jreader.option(k, to_str(options[k])) return self @since(1.4) def load(self, path=None, format=None, schema=None, **options): """Loads data from a data source and returns it as a :class`DataFrame`. 從資料來源載入資料並將其作為一個DataFrame類返回。 :param path: optional string or a list of string for file-system backed data sources. 可選的,字串或用於檔案系統支援的資料來源的字串列表。 :param format: optional string for format of the data source. Default to 'parquet'. 可選的,用於資料來源格式化的字串。預設為“parquet”。 :param schema: optional :class:`pyspark.sql.types.StructType` for the input schema. 可選的, 輸入模式 :param options: all other string options 其他的所有字串選項 >>> df = spark.read.load('python/test_support/sql/parquet_partitioned', opt1=True, ... opt2=1, opt3='str') >>> df.dtypes [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')] >>> df = spark.read.format('json').load(['python/test_support/sql/people.json', ... 'python/test_support/sql/people1.json']) >>> df.dtypes [('age', 'bigint'), ('aka', 'string'), ('name', 'string')] """ if format is not None: self.format(format) if schema is not None: self.schema(schema) self.options(**options) if isinstance(path, basestring): return self._df(self._jreader.load(path)) elif path is not None: if type(path) != list: path = [path] return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path))) else: return self._df(self._jreader.load()) @since(1.4) def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, predicates=None, properties=None): """ Construct a :class:`DataFrame` representing the database table named ``table`` accessible via JDBC URL ``url`` and connection ``properties``. 構建一個`DataFrame`類,它表示一個數據庫表名為‘table’可以通過JDBC UR`url`連結和連線屬性進行。 Partitions of the table will be retrieved in parallel if either ``column`` or ``predicates`` is specified. ``lowerBound`, ``upperBound`` and ``numPartitions`` is needed when ``column`` is specified. 如果有`column`或者`predicates`任何一個被指定,表的分割槽將被並行檢索。那麼`lowerBound`、 `lowerBound`、`numPartitions`都必須被指定。 If both ``column`` and ``predicates`` are specified, ``column`` will be used. 如果`column`和`predicates`同時被指定,那麼column`也將被指定。 .. note:: Don't create too many partitions in parallel on a large cluster; \ otherwise Spark might crash your external database systems. 不要在一個大型叢集上建立太多的並行分割槽,否則,Spark可能會破壞您的外部資料庫系統 :param url: a JDBC URL of the form ``jdbc:subprotocol:subname`` 一個JDBC url :param table: the name of the table 表名 :param column: the name of an integer column that will be used for partitioning; if this parameter is specified, then ``numPartitions``, ``lowerBound`` (inclusive), and ``upperBound`` (exclusive) will form partition strides for generated WHERE clause expressions used to split the column ``column`` evenly :param lowerBound: the minimum value of ``column`` used to decide partition stride :param upperBound: the maximum value of ``column`` used to decide partition stride :param numPartitions: the number of partitions :param predicates: a list of expressions suitable for inclusion in WHERE clauses; each one defines one partition of the :class:`DataFrame` 一個適合包含在where字句中的表示式列表,每一個都定義了一個`DataFrame`分割槽 :param properties: a dictionary of JDBC database connection arguments. Normally at least properties "user" and "password" with their corresponding values. For example { 'user' : 'SYSTEM', 'password' : 'mypassword' } 一個JDBC資料庫連線引數的字典。通常情況下至少包含使用者和密碼兩個屬性 :return: a DataFrame """ if properties is None:#先檢查連線引數,如果為None則設定它為一個空字典 properties = dict() jprop = JavaClass("java.util.Properties", self._spark._sc._gateway._gateway_client)() for k in properties:#通過引數設定相關配置 jprop.setProperty(k, properties[k]) if column is not None:#如果指定了列名稱,則檢查lowerBound、upperBound、numPartitions的值,如果他們中任意一個值為None則引發異常 assert lowerBound is not None, "lowerBound can not be None when ``column`` is specified" assert upperBound is not None, "upperBound can not be None when ``column`` is specified" assert numPartitions is not None, \ "numPartitions can not be None when ``column`` is specified" return self._df(self._jreader.jdbc(url, table, column, int(lowerBound), int(upperBound), int(numPartitions), jprop)) if predicates is not None: gateway = self._spark._sc._gateway jpredicates = utils.toJArray(gateway, gateway.jvm.java.lang.String, predicates) return self._df(self._jreader.jdbc(url, table, jpredicates, jprop)) return self._df(self._jreader.jdbc(url, table, jprop))
本文是對SparkSession的部分理解,由於自身能力有限,可能存在偏差,歡迎指正分享。





相關推薦

spark獲取資料解讀部分

本系列文章是下載的是spark2.2.1版本的原始碼進行相關分析和學習。 理解dbData=sc.read.schema(mySchema).format("com.xxxx.spark.sql").options(uri=dbUrl, database= myDataba

資料之路-民宿市場資料解讀

筆者有個朋友,北上廣打拼多年,每天加班加點,披星戴月…終於不堪折磨,萌生退意,想要歸隱山林,開個農家院收租為生… 在無情的嘲笑了他後,筆者也很好奇,到底民宿市場如何?做民宿到底賺不賺錢?什麼樣的民宿比較賺錢?使用者都是哪些人? 帶著這些問題筆者開始了民宿市場的探

CVPR 2018 論文解讀部分

CVPR 2018還有3個月就開始了,目前已經公佈了所有收錄論文名單,為了能夠讓大家更深刻了解C

平均月薪超過1萬的北京人是這樣上班的|大資料解讀視訊

最新資料顯示,全國37個主要城市的平均招聘薪酬為7789元,北京的平均薪酬水平排名第一併且已經過

snmp4j獲取資料例項之SnmpTrap示例

import java.io.IOException; import java.util.Vector; import org.snmp4j.CommunityTarget; import org.snmp4j.PDU; import org.snmp4j.Snmp; i

Spark學習之路 十五SparkCore的源碼解讀啟動腳本

-o 啟動服務 binary dirname ppi std 參數 exp 情況 一、啟動腳本分析 獨立部署模式下,主要由master和slaves組成,master可以利用zk實現高可用性,其driver,work,app等信息可以持久化到zk上;slaves由一臺至多

Spark自學之路——資料分割槽

資料分割槽        對資料集在節點間的分割槽控制。在分散式程式中,網路的通訊代價是很大的,因此控制資料分佈以獲得最少的網路傳輸可以極大地提升整體效能,Spark可以控制RDD分割槽來減少網路通訊開銷。分割槽並不是對所有的應用都有好處,如果RDD只被掃

前端之路:sql語句,表中隨機獲取一條記錄資料或者獲取隨機獲取多條記錄資料

<!--表中獲取隨機一條title 耗時0.01s id==隨機欄位,最好為表id--> SELECT * FROM `tableName` AS t1 JOIN (SELECT ROUND(RAND() * ((SELECT MAX(id) FROM `ta

Spark資料本地化調優

資料本地化的級別: PROCESS_LOCAL task要計算的資料在本程序(Executor)的記憶體中。 NODE_LOCAL task所計算的資料在本節點所在的磁碟上。 task所計算的資料在本節點其他Executor程序的記憶體中。 NO_PREF

【Python3實戰Spark資料分析及排程】Spark Core 課程筆記1

目錄 架構 注意事項 Spark Core: Spark 核心進階 Spark 核心概念 Application User program built on Spark. Consists of a driver progr

Spark快速大資料分析

楔子 Spark快速大資料分析 前3章內容,僅作為學習,有斷章取義的嫌疑。如有問題參考原書 Spark快速大資料分析 以下為了打字方便,可能不是在注意大小寫 1 Spark資料分析導論 1.1 Spark是什麼 Spark是一個用來實現快速而通用的叢

Spark中元件Mllib的學習25之線性迴歸2-較大資料多元

對多組資料進行model的training,然後再利用model來predict具體的值 。過程中有輸出model的權重 公式:f(x)=a1X1+a2X2+a3X3+…… 2.程式碼:

資料求索4:配置Spark History Server

大資料求索(4):配置Spark History Server 修改配置檔案 修改conf/spark-default.conf cp spark-default.conf.template spark-default.conf vim spark-defau

Echarts折線圖動態獲取資料例項附java後臺程式碼

// 基於準備好的dom,初始化echarts例項 var myChart = echarts.init(document.getElementById('line')); // 指定圖表的配置項和資料 var option = {

資料探勘工具---Spark的使用方法

Spark使用過程中報錯彙總 報錯1: “ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(ap

[大資料]spark入門 in pythonHelloWorld

之前由於工作的需要玩了快一年的hadoop,做了一些資料分析的基礎工作。 自然,hadoop用做離線分析還是完全滿足需求的,無論是自己寫mapreduce程式碼開發 又或者使用hive來便利分析,但是面對實時計算,就需要跟高一層級的框架了,storm,spark 就是對應的

資料基礎從零開始安裝配置Hadoop 2.7.2+Spark 2.0.0到Ubuntu 16.04

raw to spark 0 install ubuntu 14.04.01 desktop x64 1 system基礎配置 《以下都是root模式》 1.3 root password sudo passwd root 1.5 root登入選項 a.在terminal下

Spark隨機森林演算法對資料分類——計算準確率和召回率

1.召回率和正確率計算 對於一個K元的分類結果,我們可以得到一個K∗K的混淆矩陣,得到的舉證結果如下圖所示。 從上圖所示的結果中不同的元素表示的含義如下: mij :表示實際分類屬於類i,在預測過程中被預測到分類j 對於所有的mij可以概括為四種方式

線性表資料結構解讀雜湊表結構-HashMap

    前面的部落格中,我給大家分析過陣列和連結串列兩種線性表資料結構。陣列儲存區間連續,查詢方便,但是插入和刪除效率低下;連結串列儲存區間離散,插入刪除方便,但是查詢困難。大家肯定會問,有沒有一種結構,既能做到查詢便捷,又能做到插入刪除方便呢?答案就是我們今天

Django小專案--待辦清單從表單中獲取資料並存入資料庫

首先進入主頁(要記得先進入虛擬環境並且通過python  mange.py runserver啟動本地伺服器),我們知道主頁匹配的網址是localhost:8000/todo/home,在瀏覽器上輸入並回車。 在頁面的右上角有一個新增待辦事項的按鈕,輸入內容並點選新