1. 程式人生 > >Hadoop學習筆記-Day 2

Hadoop學習筆記-Day 2

名詞解釋

▪ Operations are eager when they are executed as soon as the statement is reached in the code; 勤快執行:接收到程式碼立刻執行;
▪ Operations are lazy when the execution occurs only when the result is referenced; 懶惰執行:當結果是參考過來的,並未執行;


$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
--queue thequeue \
examples/jars/spark-examples*.jar \



>>>>>>>>>>>>>>>>>>>>>>>>>>>>
Features of Parquet # parquet檔案的特徵
─ Optimized binary storage of structured data #優化儲存
─ Schema metadata is embedded in the file #嵌入schema
─ Efficient performance and size for large amounts of data #高效處理大資料
─ Supported by many Hadoop ecosystem tools #支援hadoop生態系統工具
─ Spark, Hadoop MapReduce, Hive, Impala, and others #支援的工具
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
Use parquet-tools to view Parquet file schema and data
─ Use head to display the first few records
$ parquet-tools head mydatafile.parquet
─ Use schema to view the schema
$ parquet-tools schema mydatafile.parquet

====安裝 parquet-tools==================
1.安裝maven
$ yum install maven
2.編譯parquet-tools
Option 1: Build and install all modules of the parent directory:

$ git clone https://github.com/apache/parquet-mr
$ cd parquet-mr
$ mvn install -Plocal

This will put the snapshot artifacts in your local ~/.m2 directory. Subsequently, you can (re)build just parquet-tools like you initially tried, because now the snapshot artifacts will already be available from ~/.m2.

Option 2: Build the parquet-mr modules from the parent directory, while asking Maven to build needed modules as well along the way:

$ git clone https://github.com/apache/parquet-mr
$ cd parquet-mr
$ mvn package -pl parquet-tools -am -Plocal

======================
cd /opt/software;java -jar parquet-tools-1.10.0.jar cat /jacksun/data/base_stations.parquet


>>>>>>>>>>>>>>>>>>>>>>>>>>>


第六章 Working with DataFrames and Schemas
#DataFrames(讀&寫 ) and Schemas手工定製 & 自動推測

▪ How to create DataFrames from a variety of sources
▪ Creating DataFrames from Data Sources
▪ Spark SQL supports a wide range of data source types and formats
─ Text files
─ CSV
─ JSON
─ Plain text
─ Binary format files
─ Apache Parquet
─ Apache ORC
─ Tables
─ Hive metastore
─ JDBC

▪ spark.read returns a DataFrameReader object

▪ Use DataFrameReader settings to specify how to load data from the data source
─ format indicates the data source type, such as csv, json, or parquet(the default is parquet)
─ option specifies a key/value setting for the underlying data source
─ schema specifies a schema to use instead of inferring one from the datasource

▪ Create the DataFrame based on the data source
─ load loads data from a file or files
─ table loads data from a Hive table

Read a CSV text file
─ Treat the first line in the file as a header instead of data;

myDF = spark.read.format("csv"). \
option("header","true"). \
load("/loudacre/myFile.csv")

Read a table defined in the Hive metastore;

myDF = spark.read.table("my_table")


▪ You must specify a location when reading from a file data source #必須指定一個對取資料的位置;
─ The location can be a single file, a list of files, a directory, or a wildcard #位置可是檔案,檔案清單,路徑,萬用字元;
─ Examples
─ spark.read.json("myfile.json")
─ spark.read.json("mydata/")
─ spark.read.json("mydata/*.json")
─ spark.read.json("myfile1.json","myfile2.json")

▪ Files and directories are referenced by absolute or relative URI # 可以是覺得路徑。可以是相對路徑; 最好選擇絕對路徑;
─ Relative URI (uses default file system) #相對路徑
─ myfile.json
─ Absolute URI #絕對路徑
─ hdfs://master/loudacre/myfile.json
─ file:/home/training/myfile.json

▪ You can also create DataFrames from a collection of in-memory data
val mydata = List(("Josiah","Bartlett"),
("Harry","Potter"))
val myDF = spark.createDataFrame(mydata)
myDF.show

mydata = [("Josiah","Bartlett"),("Harry","Potter")]
myDF = spark.createDataFrame(mydata)
myDF.show()

===============================================================
▪ Saving DataFrames to Data Sources
The DataFrame write function returns a DataFrameWriter
─ Saves data to a data source such as a table or set of files
─ Works similarly to DataFrameReader

▪ DataFrameWriter methods
─ format specifies a data source type
─ mode determines the behavior if the directory or table already exists ─ error, overwrite, append, or ignore (default is error)
─ partitionBy stores data in partitioned directories in the form column=value (as with Hive/Impala partitioning)
─ option specifies properties for the target data source
─ save saves the data as files in the specified directory ─ Or use json, csv, parquet, and so on
─ saveAsTable saves the data to a Hive metastore table
─ Uses default table location (/user/hive/warehouse)
─ Set path option to override location

▪ Example: Write data to a Hive metastore table called my_table
─ Append the data if the table already exists
─ Use an alternate location

myDF.write. \
mode("append"). \
option("path","/loudacre/mydata"). \
saveAsTable("my_table")

▪ Example: Write data as Parquet files in the mydata directory
myDF.write.save("mydata")

When you save data from a DataFrame, you must specify a directory
─ Spark saves the data to one or more part- files in the directory
myDF.write.mode("overwrite").json("mydata")

▪ Every DataFrame has an associated schema #每一個data frame都有一個對應的 schema;
─ Defines the names and types of columns #定義列名和列型別
─ Immutable and defined when the DataFrame is created #不可變 和 被定義好的

myDF.printSchema()
root
|-- lastName: string (nullable = true)
|-- firstName: string (nullable = true)
|-- age: integer (nullable = true)


▪ When creating a new DataFrame from a data source, the schema can be
─ Automatically inferred from the data source #自動推測資料來源
─ Specified programmatically

▪ When a DataFrame is created by a transformation, Spark calculates the new schema based on the query。基於查詢自動計算新的 schema;

▪ Spark can infer schemas from structured data, such as #推算schema;
─ Parquet files—schema is embedded in the file #嵌入檔案中
─ Hive tables—schema is defined in the Hive metastore #元資料中
─ Parent DataFrames

▪ Spark can also attempt to infer a schema from semi-structured data sources #半結構化資料推測 schema;
─ For example, JSON and CSV
spark.read.option("inferSchema","true").parquet("/loudacre/mydata").printSchema()
spark.read.option("inferSchema","true").option("header","true").csv("people.csv").printSchema()


Inferred Schemas versus Manual Schemas #推斷schema 和 手工 schema的對比
▪ Drawbacks to relying on Spark’s automatic schema inference #推斷schema的缺點
─ Inference requires an initial file scan, which may take a long time #掃描文件,需要大量時間
─ The schema may not be correct for your use case #推斷出來的 schema, 並不一定正確
▪ You can define the schema manually instead
─ A schema is a StructType object containing a list of StructField objects #每一個shema都是結構化物件
─ Each StructField represents a column in the schema, specifying
─ Column name
─ Column data type
─ Whether the data can be null (optional—the default is true) #預設可以為空

------python---------------------------------------
from pyspark.sql.types import *
columnsList = [StructField("pcode", StringType()),StructField("lastName", StringType()),StructField("firstName", StringType()),StructField("age",IntegerType())]

peopleSchema = StructType(columnsList)
spark.read.option("header","true").schema(peopleSchema).csv("people.csv").printSchema()

-------scala-----------------------------------------
import org.apache.spark.sql.types._
val columnsList = List(StructField("pcode", StringType),StructField("lastName", StringType),StructField("firstName", StringType), StructField("age", IntegerType))
val peopleSchema = StructType(columnsList)


Eager and Lazy Execution #勤快 與 懶惰 執行
▪ Operations are eager when they are executed as soon as the statement is reached in the code; 勤快執行:接收到程式碼立刻執行;
▪ Operations are lazy when the execution occurs only when the result is referenced; 懶惰執行:當結果是參考過來的,並未執行;
▪ Spark queries execute both lazily and eagerly SPARK結合了勤快執行和懶惰執行兩種模式;
─ DataFrame schemas are determined eagerly
─ Data transformations are executed lazily
▪ Lazy execution is triggered when an action is called on a series of transformations

▪ Essential Points
▪ DataFrames can be loaded from and saved to several different types of data sources
─ Semi-structured text files like CSV and JSON
─ Structured binary formats like Parquet and ORC
─ Hive and JDBC tables
▪ DataFrames can infer a schema from a data source, or you can define one manually
▪ DataFrame schemas are determined eagerly (at creation) but queries are executed lazily (when an action is called)

 

▪ How to specify format and options to save DataFrames


▪ How to define a DataFrame schema through inference or programmatically

▪ The difference between lazy and eager query execution


export DEVDATA=/jacksun/data
export DEVSH=/jacksun

 

spark2-submit \
/jacksun//exercises/yarn/wordcount.py /user/spark/*

$SPARK_HOME/bin/spark-shell --master yarn --deploy-mode client