Spark(一)—— 大資料處理入門
一、Spark介紹
Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.
Spark是一個快速且多功能的叢集計算系統。它為多種不同語言提供高階API,和支援一般執行圖的優化引擎。它也有豐富的高階工具集,Spark SQL進行結構化資料的處理,MLib處理機器學習,GraphX進行圖處理,以及Spark Streaming流計算。
組成
它的主要元件有:
- SparkCore
- 將分散式資料抽象為彈性分散式資料集(RDD),實現了應用任務排程、RPC、序列化和壓縮,併為執行在其上的上層元件提供API。
- SparkSQL
- Spark Sql 是Spark來操作結構化資料的程式包,可以讓我使用SQL語句的方式來查詢資料,Spark支援 多種資料來源,包含Hive表,parquest以及JSON等內容。
- SparkStreaming
- 是Spark提供的實時資料進行流式計算的元件。
- MLlib
- 提供常用機器學習演算法的實現庫。
- GraphX
- 提供一個分散式圖計算框架,能高效進行圖計算。
- BlinkDB
- 用於在海量資料上進行互動式SQL的近似查詢引擎。
- Tachyon
- 以記憶體為中心高容錯的的分散式檔案系統。
返回一個包含資料集前n個元素的陣列
二、WordCount程式講解
編寫程式碼
scala程式編寫
object WordCountDemo { def main(args: Array[String]): Unit = { //建立Spark配置物件 val conf = new SparkConf().setMaster("local").setAppName("MyApp") //通過conf建立sc val sc = new SparkContext(conf) //讀取檔案 val rdd1 = sc.textFile("/Users/README.md") //計算 val rdd2 = rdd1.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_) //列印 rdd2.take(10).foreach(println) } }
java程式編寫
public class WordCountJavaDemo {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("myapp").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> rdd1 = sc.textFile("/Users/README.md");
JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
List<String> list = new ArrayList<>();
String[] arr = s.split(" ");
for (String ss : arr) {
list.add(ss);
}
return list.iterator();
}
});
JavaPairRDD<String, Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
List<Tuple2<String, Integer>> list = rdd4.collect();
for (Tuple2<String, Integer> t : list) {
System.out.println(t._1() + " " + t._2());
}
}
}
三、原理介紹
RDD
- 由一系列Partition組成
- RDD之間有一系列依賴關係
- RDD每個運算元實際上是作用在每個Partition上
- RDD會提供一系列最佳位置
- 分割槽器是作用在KV格式的RDD上
RDD會在多個節點上儲存,就和hdfs的分散式道理是一樣的。hdfs檔案被切分為多個block儲存在各個節點上,而RDD是被切分為多個partition。不同的partition可能在不同的節點上。
Spark執行流程
1、Driver
分發task,在分發之前,會呼叫RDD的方法,獲取partition的位置。
將task的計算結果,拉回到Driver端
Driver是一個JVM程序
2、Worker
寬依賴、窄依賴
圖中stage2的並行度是4,也就是有4個task。
寬依賴
父RDD與子RDD,partition的關係是一對多,就是寬依賴。寬依賴於shuffle對應。
窄依賴
父RDD與子RDD,partition的關係是一對一或多對一,就是窄依賴。
四、Spark常用運算元
Transformation運算元
特點:懶執行
(1)map
map的輸入變換函式應用於RDD中所有元素
(2)flatMap
flatMap與map區別在於map為“對映”,而flatMap“先對映,後扁平化”,map對每一次(func)都產生一個元素,返回一個物件,而flatMap多一步就是將所有物件合併為一個物件。
(3)flatMapValues
每個元素的Value被輸入函式對映為一系列的值,然後這些值再與原RDD中的Key組成一系列新的KV對。
程式碼
x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
def f(x): return x
x.flatMapValues(f).collect()
列印結果
[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
filter
過濾操作,滿足filter內function函式為true的RDD內所有元素組成一個新的資料集。
(4)groupByKey
主要作用是將相同的所有的鍵值對分組到一個集合序列當中,其順序是不確定的。
(5)reduceByKey
與groupByKey類似,卻有不同。如(a,1), (a,2), (b,1), (b,2)。groupByKey產生中間結果為( (a,1), (a,2) ), ( (b,1), (b,2) )。而reduceByKey為(a,3), (b,3)。
reduceByKey主要作用是聚合,groupByKey主要作用是分組。
(6)take
Action運算元
特點:立即觸發執行
五、SparkSQL
介紹
Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations. There are several ways to interact with Spark SQL including SQL and the Dataset API. When computing a result the same execution engine is used, independent of which API/language you are using to express the computation. This unification means that developers can easily switch back and forth between different APIs based on which provides the most natural way to express a given transformation.
SparkSQL是Spark的一個用來處理結構化資料的模組。使用類似SQL的方式訪問Hadoop,實現MR計算。
Datasets的概念
A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.). The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar.
Dataset是分散式資料集合。
DataFrames概念
A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala API, DataFrame is simply a type alias of Dataset[Row]. While, in Java API, users need to use Dataset
to represent a DataFrame.
基本使用
(1)建立DataFrames
資料
{"id":"1","name":"zhangsan","age":"12"}
{"id":"2","name":"lisi","age":"12"}
{"id":"3","name":"wangwu","age":"12"}
程式碼
object SparkSqlDemo {
def main(args: Array[String]): Unit = {
//建立Spark配置物件
val conf = new SparkConf().setMaster("local[4]").setAppName("MyApp");
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
val df = spark.read.json("/Users/opensource/dev-problem/source/people_sample_json.json");
df.show()
}
}
(2)查詢
val df = spark.read.json("/Users/fangzhijie/opensource/dev-problem/source/people_sample_json.json");
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people WHERE name = 'zhangsan'")
sqlDF.show()
六、SparkStreaming
介紹
Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.
基本使用
(1)簡單使用
object SparkStreamingDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
//建立Spark流上下文
val ssc = new StreamingContext(conf, Seconds(1))
//建立Socket文字流
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
//啟動
ssc.start()
//等待結束
ssc.awaitTermination() // Wait for the computation to terminate
}
}
使用shell命令監聽埠,輸入待計算內容
$ nc -lk 9999
原理
SparkStreaming的程式設計抽象是離散化流(DStream),它是一個RDD序列,每個RDD代表資料流中一個時間片內的資料。
參考文件
Spark Quick Start
Spark32個常用運算元總結
SparkSQL Guide
SparkSQL官方文件
《Spark快速大資料分析》
SparkStream官方文