spark入門二(運算元介紹核wordcount入門)
阿新 • • 發佈:2018-12-25
[[email protected] ~]# cd /usr/local/apps/spark-2.3.2-bin-hadoop2.7/
[[email protected] spark-2.3.2-bin-hadoop2.7]# ./sbin/start-all.sh
啟動日誌如下:
starting org.apache.spark.deploy.master.Master, logging to /usr/local/apps/spark-2.3.2-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.master.Master-1-master.out
192.168.153.131: starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/apps/spark-2.3.2-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-slvae3.out
192.168.153.130: starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/apps/spark-2.3.2-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-slave2.out
192.168.153.129: starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/apps/spark-2.3.2-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-slave1.out
192.168.153.131: failed to launch: nice -n 0 /usr/local/apps/spark-2.3.2-bin-hadoop2.7/bin/spark-class org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://master:7077
192.168.153.130: failed to launch: nice -n 0 /usr/local/apps/spark-2.3.2-bin-hadoop2.7/bin/spark-class org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://master:7077
192.168.153.130: full log in /usr/local/apps/spark-2.3.2-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-slave2.out
192.168.153.131: full log in /usr/local/apps/spark-2.3.2-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-slvae3.out
192.168.153.129: failed to launch: nice -n 0 /usr/local/apps/spark-2.3.2-bin-hadoop2.7/bin/spark-class org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://master:7077
192.168.153.129: full log in /usr/local/apps/spark-2.3.2-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-slave1.out
啟動spark-shell
spark-shell --master spark://master:7077 --total-executor-cores 2 --executor-memory 513m
啟動日誌如下:
18/10/03 10:30:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://master:4040
Spark context available as 'sc' (master = spark://master:7077, app id = app-20181003103051-0000).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.3.2
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
獲取SparkContext
scala> sc
res0: org.apache.spark.SparkContext = [email protected]
讀取本地檔案:
scala> val file = sc.textFile("licenses")
scala> file.count
wordcount程式碼如下:
scala> sc.textFile("licenses").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect
結果如下:
res2: Array[(String, Int)] = Array(("",1995), (the,366), (OR,303), (#,279), (OF,262), (of,214), (THE,204), (and,180), (to,160), (ANY,136), (in,130), (IN,121), (this,113), (AND,110), (or,104), (following,85), (FOR,82), (conditions,76), (without,74), (copyright,71), (*,69), (NOT,66), (above,61), (BUT,60), (LIMITED,57), (LIABILITY,,56), (is,54), (SOFTWARE,54), (provided,53), (with,53), (COPYRIGHT,50), (source,50), (THIS,49), (binary,49), (are,48), (IMPLIED,47), (Redistributions,46), (be,46), (list,46), (must,46), (notice,,46), (software,45), (TO,,45), (Copyright,44), (NO,44), (CONTRIBUTORS,43), ((c),43), (any,42), (that,42), (DAMAGES,41), (USE,40), (SHALL,40), (LIABLE,40), (BE,40), (rights,40), (WARRANTIES,39), (FITNESS,38), (PSF,38), (A,38), (and/or,38), (PARTICULAR,38), (documentation,38...
讀取hdfs的檔案並儲存結果到hdfs中:
scala> sc.textFile("hdfs://master:9000/wc/wc.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).saveAsTextFile("hdfs://master:9000/wc/out1")
檢視輸出結果:
[[email protected] apps]# hadoop fs -ls /wc/out1
Found 3 items
-rw-r--r-- 3 root supergroup 0 2018-10-03 17:27 /wc/out1/_SUCCESS
-rw-r--r-- 3 root supergroup 76 2018-10-03 17:27 /wc/out1/part-00000
-rw-r--r-- 3 root supergroup 417 2018-10-03 17:27 /wc/out1/part-00001
檢視檔案內容:
[[email protected] apps]# hadoop fs -cat /wc/out/part-*
結果略去。。。
將輸出的結果儲存到一個檔案裡面:
scala> sc.textFile("hdfs://master:9000/wc/wc.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_,1).sortBy(_._2,false).saveAsTextFile("hdfs://master:9000/wc/out2")
檢視結果如下:
[[email protected] apps]# hadoop fs -ls /wc/out2
Found 2 items
-rw-r--r-- 3 root supergroup 0 2018-10-03 17:31 /wc/out2/_SUCCESS
-rw-r--r-- 3 root supergroup 493 2018-10-03 17:31 /wc/out2/part-00000
分析運算元:
分類
- Transform(轉換)
- Action (動作)
案例:呼叫 sc.textFile(“hdfs://master:9000/wc/wc.txt”) 產生了一個rdd,這裡rdd沒有資料,textFile是一個transform,並非一個動作,並不會執行真正的計算。
scala> val file = sc.textFile("hdfs://master:9000/wc/wc.txt")
file: org.apache.spark.rdd.RDD[String] = hdfs://master:9000/wc/wc.txt MapPartitionsRDD[57] at textFile at <console>:24
動作執行:
scala> sc.textFile("hdfs://master:9000/wc/wc.txt").flatMap(_.split(" ")).map((_,1)).collect
res8: Array[(String, Int)] = Array((Apache,1), (Spark,1), (is,1), (a,1), (fast,1), (and,1), (general-purpose,1), (cluster,1), (computing,1), (system.,1), (It,1), (provides,1), (high-level,1), (APIs,1), (in,1), (Java,,1), (Scala,,1), (Python,1), (and,1), (R,,1), (and,1), (an,1), (optimized,1), (engine,1), (that,1), (supports,1), (general,1), (execution,1), (graphs.,1), (It,1), (also,1), (supports,1), (a,1), (rich,1), (set,1), (of,1), (higher-level,1), (tools,1), (including,1), (Spark,1), (SQL,1), (for,1), (SQL,1), (and,1), (structured,1), (data,1), (processing,,1), (MLlib,1), (for,1), (machine,1), (learning,,1), (GraphX,1), (for,1), (graph,1), (processing,,1), (and,1), (Spark,1), (Streaming.,1))
常用運算元介紹:
通過並行化的方式建立RDD
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[62] at parallelize at <console>:24
檢視該rdd的分取數量:
scala> rdd1.partitions.length
res10: Int = 2
將每個元素乘以10
scala> val rdd2 = rdd1.map(_*10).collect
rdd2: Array[Int] = Array(10, 20, 30, 40, 50, 60, 70, 80)
將rdd2裡面小於50的元素取出來
scala> rdd1.map(_*10).filter(_ < 50).collect
res16: Array[Int] = Array(10, 20, 30, 40)