Hadoop學習筆記-Day 2
名詞解釋
▪ Operations are eager when they are executed as soon as the statement is reached in the code; 勤快執行:接收到程式碼立刻執行;
▪ Operations are lazy when the execution occurs only when the result is referenced; 懶惰執行:當結果是參考過來的,並未執行;
$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
--queue thequeue \
examples/jars/spark-examples*.jar \
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
Features of Parquet # parquet檔案的特徵
─ Optimized binary storage of structured data #優化儲存
─ Schema metadata is embedded in the file #嵌入schema
─ Efficient performance and size for large amounts of data #高效處理大資料
─ Supported by many Hadoop ecosystem tools #支援hadoop生態系統工具
─ Spark, Hadoop MapReduce, Hive, Impala, and others #支援的工具
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
Use parquet-tools to view Parquet file schema and data
─ Use head to display the first few records
$ parquet-tools head mydatafile.parquet
─ Use schema to view the schema
$ parquet-tools schema mydatafile.parquet
====安裝 parquet-tools==================
1.安裝maven
$ yum install maven
2.編譯parquet-tools
Option 1: Build and install all modules of the parent directory:
$ git clone https://github.com/apache/parquet-mr
$ cd parquet-mr
$ mvn install -Plocal
This will put the snapshot artifacts in your local ~/.m2 directory. Subsequently, you can (re)build just parquet-tools like you initially tried, because now the snapshot artifacts will already be available from ~/.m2.
Option 2: Build the parquet-mr modules from the parent directory, while asking Maven to build needed modules as well along the way:
$ git clone https://github.com/apache/parquet-mr
$ cd parquet-mr
$ mvn package -pl parquet-tools -am -Plocal
======================
cd /opt/software;java -jar parquet-tools-1.10.0.jar cat /jacksun/data/base_stations.parquet
>>>>>>>>>>>>>>>>>>>>>>>>>>>
第六章 Working with DataFrames and Schemas
#DataFrames(讀&寫 ) and Schemas手工定製 & 自動推測
▪ How to create DataFrames from a variety of sources
▪ Creating DataFrames from Data Sources
▪ Spark SQL supports a wide range of data source types and formats
─ Text files
─ CSV
─ JSON
─ Plain text
─ Binary format files
─ Apache Parquet
─ Apache ORC
─ Tables
─ Hive metastore
─ JDBC
▪ spark.read returns a DataFrameReader object
▪ Use DataFrameReader settings to specify how to load data from the data source
─ format indicates the data source type, such as csv, json, or parquet(the default is parquet)
─ option specifies a key/value setting for the underlying data source
─ schema specifies a schema to use instead of inferring one from the datasource
▪ Create the DataFrame based on the data source
─ load loads data from a file or files
─ table loads data from a Hive table
Read a CSV text file
─ Treat the first line in the file as a header instead of data;
myDF = spark.read.format("csv"). \
option("header","true"). \
load("/loudacre/myFile.csv")
Read a table defined in the Hive metastore;
myDF = spark.read.table("my_table")
▪ You must specify a location when reading from a file data source #必須指定一個對取資料的位置;
─ The location can be a single file, a list of files, a directory, or a wildcard #位置可是檔案,檔案清單,路徑,萬用字元;
─ Examples
─ spark.read.json("myfile.json")
─ spark.read.json("mydata/")
─ spark.read.json("mydata/*.json")
─ spark.read.json("myfile1.json","myfile2.json")
▪ Files and directories are referenced by absolute or relative URI # 可以是覺得路徑。可以是相對路徑; 最好選擇絕對路徑;
─ Relative URI (uses default file system) #相對路徑
─ myfile.json
─ Absolute URI #絕對路徑
─ hdfs://master/loudacre/myfile.json
─ file:/home/training/myfile.json
▪ You can also create DataFrames from a collection of in-memory data
val mydata = List(("Josiah","Bartlett"),
("Harry","Potter"))
val myDF = spark.createDataFrame(mydata)
myDF.show
mydata = [("Josiah","Bartlett"),("Harry","Potter")]
myDF = spark.createDataFrame(mydata)
myDF.show()
===============================================================
▪ Saving DataFrames to Data Sources
The DataFrame write function returns a DataFrameWriter
─ Saves data to a data source such as a table or set of files
─ Works similarly to DataFrameReader
▪ DataFrameWriter methods
─ format specifies a data source type
─ mode determines the behavior if the directory or table already exists ─ error, overwrite, append, or ignore (default is error)
─ partitionBy stores data in partitioned directories in the form column=value (as with Hive/Impala partitioning)
─ option specifies properties for the target data source
─ save saves the data as files in the specified directory ─ Or use json, csv, parquet, and so on
─ saveAsTable saves the data to a Hive metastore table
─ Uses default table location (/user/hive/warehouse)
─ Set path option to override location
▪ Example: Write data to a Hive metastore table called my_table
─ Append the data if the table already exists
─ Use an alternate location
myDF.write. \
mode("append"). \
option("path","/loudacre/mydata"). \
saveAsTable("my_table")
▪ Example: Write data as Parquet files in the mydata directory
myDF.write.save("mydata")
When you save data from a DataFrame, you must specify a directory
─ Spark saves the data to one or more part- files in the directory
myDF.write.mode("overwrite").json("mydata")
▪ Every DataFrame has an associated schema #每一個data frame都有一個對應的 schema;
─ Defines the names and types of columns #定義列名和列型別
─ Immutable and defined when the DataFrame is created #不可變 和 被定義好的
myDF.printSchema()
root
|-- lastName: string (nullable = true)
|-- firstName: string (nullable = true)
|-- age: integer (nullable = true)
▪ When creating a new DataFrame from a data source, the schema can be
─ Automatically inferred from the data source #自動推測資料來源
─ Specified programmatically
▪ When a DataFrame is created by a transformation, Spark calculates the new schema based on the query。基於查詢自動計算新的 schema;
▪ Spark can infer schemas from structured data, such as #推算schema;
─ Parquet files—schema is embedded in the file #嵌入檔案中
─ Hive tables—schema is defined in the Hive metastore #元資料中
─ Parent DataFrames
▪ Spark can also attempt to infer a schema from semi-structured data sources #半結構化資料推測 schema;
─ For example, JSON and CSV
spark.read.option("inferSchema","true").parquet("/loudacre/mydata").printSchema()
spark.read.option("inferSchema","true").option("header","true").csv("people.csv").printSchema()
Inferred Schemas versus Manual Schemas #推斷schema 和 手工 schema的對比
▪ Drawbacks to relying on Spark’s automatic schema inference #推斷schema的缺點
─ Inference requires an initial file scan, which may take a long time #掃描文件,需要大量時間
─ The schema may not be correct for your use case #推斷出來的 schema, 並不一定正確
▪ You can define the schema manually instead
─ A schema is a StructType object containing a list of StructField objects #每一個shema都是結構化物件
─ Each StructField represents a column in the schema, specifying
─ Column name
─ Column data type
─ Whether the data can be null (optional—the default is true) #預設可以為空
------python---------------------------------------
from pyspark.sql.types import *
columnsList = [StructField("pcode", StringType()),StructField("lastName", StringType()),StructField("firstName", StringType()),StructField("age",IntegerType())]
peopleSchema = StructType(columnsList)
spark.read.option("header","true").schema(peopleSchema).csv("people.csv").printSchema()
-------scala-----------------------------------------
import org.apache.spark.sql.types._
val columnsList = List(StructField("pcode", StringType),StructField("lastName", StringType),StructField("firstName", StringType), StructField("age", IntegerType))
val peopleSchema = StructType(columnsList)
Eager and Lazy Execution #勤快 與 懶惰 執行
▪ Operations are eager when they are executed as soon as the statement is reached in the code; 勤快執行:接收到程式碼立刻執行;
▪ Operations are lazy when the execution occurs only when the result is referenced; 懶惰執行:當結果是參考過來的,並未執行;
▪ Spark queries execute both lazily and eagerly SPARK結合了勤快執行和懶惰執行兩種模式;
─ DataFrame schemas are determined eagerly
─ Data transformations are executed lazily
▪ Lazy execution is triggered when an action is called on a series of transformations
▪ Essential Points
▪ DataFrames can be loaded from and saved to several different types of data sources
─ Semi-structured text files like CSV and JSON
─ Structured binary formats like Parquet and ORC
─ Hive and JDBC tables
▪ DataFrames can infer a schema from a data source, or you can define one manually
▪ DataFrame schemas are determined eagerly (at creation) but queries are executed lazily (when an action is called)
▪ How to specify format and options to save DataFrames
▪ How to define a DataFrame schema through inference or programmatically
▪ The difference between lazy and eager query execution
export DEVDATA=/jacksun/data
export DEVSH=/jacksun
spark2-submit \
/jacksun//exercises/yarn/wordcount.py /user/spark/*
$SPARK_HOME/bin/spark-shell --master yarn --deploy-mode client