1. 程式人生 > >Spark應用基礎--基礎知識 常用樣本程式碼

Spark應用基礎--基礎知識 常用樣本程式碼

#建立spark專案
mvn archetype:generate
-DarchetypeGroupId=org.apache.maven.archetypes
-DgroupId=spark.examples
-DartifactId=JavaWordCount
-Dfilter=org.apache.maven.archetypes:maven-archetype-quickstart

  1. sparkSQL查詢經典程式碼

df.select(countDistinct(“medallion”)).show()

//自定義函式並創造新的列,然後取樣顯示.
val code :(Timestamp => Long) = (arg: Timestamp) => { arg.getTime()/1000/3600%24;}
val addCol = udf(code)

val df22=df2.withColumn(“byhour”,addCol(df(“pickup_datetime”)))
df22.show(10)
val newSample = df22.sample(true, 0.001)

//把csv 行記錄直接對映為case class 物件, case class 就相當於是schema
case class Employee(EmployeeID : String,
LastName : String, FirstName : String,
Title : String,
BirthDate : String, HireDate : String,
City : String, State : String, Zip : String, Country : String,
ReportsTo : String)

case class Order(OrderID : String,CustomerID : String, EmployeeID : String,OrderDate : String, ShipCountry : String)
case class OrderDetails(OrderID : String, ProductID : String, UnitPrice : Double, Qty : Int, Discount : Double)

val filePath = “hdfs://10.20.2.1:8020/study/spark/DataProcessingWithSpark2/”
val employees = spark.read.option(“header”,“true”).csv(filePath + “data/NW-Employees.csv”).as[Employee]
println(“Employees has “+employees.count()+” rows”)
employees.show(5)

//———————————

executor 記憶體大小建議至少4GB
–executor-memory 4g \

//隨機取幾個行
df.take(5)

//訓練和測試集合存到一個數組裡。
val Array(train, test) = preparedDF.randomSplit(Array(0.7, 0.3))

//一次載入多個檔案.
val df = spark.read.format(“csv”).option(“header”, “true”).option(“delimiter”, “,”).schema(tripFareSchema).load(“hdfs://10.20.2.1:8020/data/trip/*.csv”)

檢視spark例項屬性
http://spark-node4.cityworks.cn:4040/environment/

//自定義表頭格式schema
import org.apache.spark.sql.types.{StructField, StructType, StringType,IntegerType,FloatType}
import org.apache.spark.sql.types.Metadata

val poiSchema = StructType(Array(
StructField(“shopID”, IntegerType, true),
StructField(“cityTelCode”, StringType, true),
StructField(“cityName”,StringType,true),
StructField(“name”,StringType,true),
StructField(“address”,StringType,true),
StructField(“tel”,StringType,true),
StructField(“latitude”,FloatType,true),
StructField(“longitude”,FloatType,true),
StructField(“channel”, StringType, true)
))

val df = spark.read.format(“csv”).option(“header”, “true”).option(“delimiter”, “|”).schema(poiSchema).load(“hdfs://10.20.2.1:8020/data/poi/ChinaPOI.csv”)

df.printSchema()
df.createOrReplaceTempView(“ChinaPOI”)

df.cache()
df.count()
df.show(10)

//coalesce(1) 是合併為一個檔案的意思
jasperDF.coalesce(1).write.format(“csv”).option(“mode”, “OVERWRITE”).option(“path”, “hdfs://10.20.2.1:8020/tmp/jasper.csv”).save();

//spark 讀文字按行解析
csv_lines = sc.textFile(“data/example.csv”)
data = csv_lines.map(lambda line: line.split(","))
data.collect()

#!/bin/bash
spark-submit --class com.linyingjie.study.wordcount.SparkWordCount
–master spark://10.20.2.31:7077,10.20.2.32:7077,10.20.2.33:7077,10.20.2.34:7077,10.20.2.35:7077
–deploy-mode cluster
–conf “spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j-spark.properties”
–supervise
–driver-memory 1g
–executor-memory 1g
–executor-cores 1
–queue thequeue
–total-executor-cores 5
–driver-java-options “-Dlog4j.configuration=file:/var/server/spark/conf/log4j.properties -Dvm.logging.level=DEBUG”
–conf “spark.executor.extraJavaOptions=-Dlog4j.configuration=file:/var/server/spark/conf/log4j.properties -Dvm.logging.name=myapp -Dvm.logging.level=DEBUG”
http://10.20.2.31:80/WordCount-assembly-0.2.jar
hdfs://10.20.2.1:8020/tmp/article.txt
hdfs://10.20.2.1:8020/tmp/result3__spark2.4.0_2018_11_21

Spark2.4.0 內建的Scala版本是 Scala 2.11.12
Spark2.3.1 內建的Scala版本是 Scala 2.11.8
這個通過spark-shell 檢視即可知道,部署的scala 應用程式必須和目標spark 叢集scala版本一致.
叢集方式下提交:

#免密碼登陸的條件下批量拷貝檔案:
cat /var/server/spark/conf/slaves | grep ‘^spark-node’ | xargs -i -t scp /etc/ntp.conf [email protected]{}:/etc/ntp.conf
cat /var/server/spark/conf/slaves | grep ‘^spark-node’ | xargs -i -t ssh [email protected]{} “systemctl restart ntpd;ntpdate -u 10.20.1.1; ntpq -p”

echo -e “hadoop-namenode1\nhadoop-namenode2\nhadoop-namenode3\n”>/tmp/namenodes.txt;cat /tmp/namenodes.txt | xargs -i -t scp /var/server/hbase/conf/hbase-site.xml [email protected]{}:/var/server/hbase/conf/
ls

cat /var/server/spark/conf/slaves | grep ‘^spark-node’ | xargs -i -t ssh [email protected]{} “rm -rf /var/server/spark/logs/.
cat /var/server/spark/conf/slaves | grep ‘^spark-node’ | xargs -i -t ssh [email protected]{} “chown -R spark:spark /var/server/spark/”

cat /var/server/spark/conf/slaves | grep ‘^spark-node’ | xargs -i -t ssh [email protected]{} “rm -rf /var/server/spark/logs/.
cat /var/server/spark/conf/slaves | grep ‘^spark-node’ | xargs -i -t ssh [email protected]{} “chown -R spark:spark /var/server/spark/”

#echo -e “spark-node1\nspark-node2\nspark-node3\nspark-node4\nspark-node5\n”>/tmp/spark-slaves.txt;cat /tmp/spark-slaves.txt | xargs -i -t ssh [email protected]{} “rm -rf /var/server/spark/logs/.

spark 執行任務的方式: 1. console 2.spark-sumit 直接從 shell 執行提交

#spark 叢集重新啟動
/var/server/spark/sbin/start-all.sh

spark 連線叢集
#standalone 模式埠7077
spark-shell --master spark://192.168.2.41:7077
#叢集模式下 用6066埠
source java8;spark-submit --class com.cloudera.datascience.lsa.RunLSA --master spark://spark-node1.cityworks.cn:6066,spark-node2.cityworks.cn:6066,spark-node3.cityworks.cn:6066,spark-node4.cityworks.cn:6066,spark-node5.cityworks.cn:6066 --deploy-mode cluster --driver-memory 4g --executor-memory 4g --executor-cores 4 --queue thequeue ch06-lsa-2.0.0-jar-with-dependencies.jar

spark-shell --master spark://10.20.2.21:7077,10.20.2.22:7077,10.20.2.23:7077

source java8;spark-shell --master spark://10.20.2.31:6066,10.20.2.32:6066,10.20.2.33:6066,10.20.2.34:6066,10.20.2.35:6066

source java8;spark-submit --class com.cloudera.datascience.lsa.RunLSA --master spark://spark-node1.cityworks.cn:6066,spark-node2.cityworks.cn:6066,spark-node3.cityworks.cn:6066,spark-node4.cityworks.cn:6066,spark-node5.cityworks.cn:6066 --deploy-mode cluster --driver-memory 4g --executor-memory 4g --executor-cores 4 --queue thequeue ch06-lsa-2.0.0-jar-with-dependencies.jar

val df = spark.read.format(“json”).load(“hdfs://10.20.2.3:8020/study/spark/Spark-The-Definitive-Guide/data/flight-data/json/2015-summary.json”)
//#我的配置是8020 標準配置

val df = spark.read.format(“csv”).option(“header”, “true”).option(“inferSchema”, “true”).load(“hdfs://10.20.2.3:8020/study/spark/Spark-The-Definitive-Guide/data/retail-data/by-day/2010-12-01.csv”)
df.printSchema()
df.createOrReplaceTempView(“dfTable”)

提交Spark 任務
將Spark應用程式捆綁為jar檔案(用Scala或Java編寫)或Python檔案後,可以使用位於Spark分發(即$ SPARK_HOME / bin)bin目錄下的Spark-submit指令碼提交。 根據Spark網站提供的API文件(http://spark.apache.org/docs/latest/submitting-applications.h
tml),該指令碼負責以下事項:
使用Spark設定JAVA_HOME,SCALA_HOME的類路徑
設定執行作業所需的所有依賴項
管理不同的叢集管理器
最後,部署Spark支援的模型
簡而言之,Spark作業提交語法如下:
spark-submit [options] <app-jar | python-file> [app arguments]

//註冊自定義函式:

Now that you have an understanding of the process, let’s work through an example. First, we
need to register the function to make it available as a DataFrame function:
// in Scala
import org.apache.spark.sql.functions.udf
val power3udf = udf(power3(_:Double):Double)
We can use that just like any other DataFrame function:
// in Scala
udfExampleDF.select(power3udf(col(“num”))).show()

//方式2:
spark.udf.register(“power3”, power3(_:Double):Double)
udfExampleDF.selectExpr(“power3(num)”).show(2)

scala> sc.[\t]
scala> sc .
accumulable defaultParallelism jars sequenceFile
accumulableCollection deployMode killExecutor setCallSite
accumulator doubleAccumulator killExecutors setCheckpointDir
addFile emptyRDD killTaskAttempt setJobDescription
addJar files listFiles setJobGroup]/
\\\\\\\\\]/

.
//sc 和spark 是2個系統內建的變數 sc 代表SparkContext, spark 代表SparkSession
scala> sc
res0: org.apache.spark.SparkContext = [email protected]

scala> spark
res1: org.apache.spark.sql.SparkSession = [email protected]

scala>:help //檢視命令使用方式

scala>:history 或 ?h // 檢視之前使用的命令

#多JDK 環境的啟動方式
source java8;spark-shell

#UI節面 ( spark-shell) 啟動後就有
http://localhost:4040/jobs/

Extract, Transform, and Load (ETL)
Schema-on-Read (SOR)

spark-shell 貼程式碼:

scala> :paste
然後貼程式碼。。
ctl+D 退出 貼上模式

Resilient Distributed Datasets (RDDs)