1. 程式人生 > >建立 spark_session 讀取資料-加入快取-並使用SQL語句分析

建立 spark_session 讀取資料-加入快取-並使用SQL語句分析

c.spark_session_quick_start.py
1 建立 spark_session 讀取資料-加入快取
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import time
from pyspark.sql import SparkSession

if __name__ == "__main__":
	#配置環境變數
    os.environ['JAVA_HOME'] = 'C:/Java/jdk1.8.0_91'
    os.environ['HADOOP_HOME'] = 'C:/Java/hadoop-2.6.0-cdh5.7.6'
    os.environ['SPARK_HOME'
] = 'C:/Java/spark-2.2.0-bin-2.6.0-cdh5.7.6' # 例項化SparkSession物件,以本地模式是執行Spark程式 spark = SparkSession \ .builder \ .appName("Hello_World_Application") \ .master("local[2]")\ .getOrCreate() # print type(spark) 檢驗spark是否可用 # 讀取資料, 一行一行的讀取,每行資料的欄位名稱為value,型別為字串
log_data = spark.read.text("datas/README.md") print (type(log_data)) print (log_data.first()) print ("Count: " + str(log_data.count())) print ('\n') # 在Spark框架中可以將資料進行快取,以便再次使用時,直接從快取中讀取資料 # 預設快取級別:MEMORY_AND_DISK,先放記憶體,不足放磁碟 log_data.cache() # 對DataFrame中每條資料進行過濾,獲取每條資料中的value欄位的只值,進行篩選
nums_spark = log_data.filter(log_data.value.contains('Spark')).count() nums_python = log_data.filter(log_data.value.contains('Python')).count() print("Lines with Spark: %i, lines with Python: %i" % (nums_spark, nums_python))
2 SparkSQL資料分析(DSL,SQL)
    # 使用SparkSession讀取wc.data,資料封裝在DataFrame集合中
    wc_df = spark.read.text('datas/wc.data')
    print (type(wc_df))
    wc_df.show(n=5,  truncate=False)

    # DataFrame = RDD + schema, 如何將DataFrame轉換為RDD
    """
    SparkSQL中對資料分析兩種方式:
    -1. DSL分析    呼叫DataFrame中函式  
    -2. SQL分析    需要將DataFrame註冊為臨時檢視,編寫類似MySQL中SQL進行分析    
    """
    # 匯入SparkSQL中函式庫
    from pyspark.sql.functions import *

    word_df = wc_df\
        .select(explode(split(wc_df.value, '\\s+'))\
        .alias('word'))

    word_count_df = word_df.groupBy('word').count()  # 操作以後, 聚合count以後的欄位名稱為count
    word_count_df.show()


    # 註冊時臨時檢視
    word_df.createOrReplaceTempView('view_tmp_word')
    spark.sql('SELECT word, COUNT(1) AS count FROM view_tmp_word GROUP BY word').show()

    # 讀取CSV檔案
    csv_df = spark.read.csv('datas/flights.csv', header=True, inferSchema=True)
    csv_df.printSchema()
    csv_df.show(n=10, truncate=False)

    csv_df.write.csv('datas/flights.tsv', header=True, sep='\t')

    # 為了檢視Spark程式執行是的WEB UI介面,讓執行緒休眠一段時間
    time.sleep(100000)

    # SparkContext Stop
    spark.stop()