Spark的Streaming和Spark的SQL簡單入門學習
1、Spark Streaming是什麽?
a、Spark Streaming是什麽?
Spark Streaming類似於Apache Storm,用於流式數據的處理。根據其官方文檔介紹,Spark Streaming有高吞吐量和容錯能力強等特點。Spark Streaming支持的數據輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和簡單的TCP套接字等等。數據輸入後可以用Spark的高度抽象原語如:map、reduce、join、window等進行運算。而結果也能保存在很多地方,如HDFS,數據庫等。另外Spark Streaming也能和MLlib(機器學習)以及Graphx完美融合。b、Spark Streaming的特點?
易用、容錯、易整合到Spark體系、
2、Spark與Storm的對比
a、Spark開發語言:Scala、Storm的開發語言:Clojure。
b、Spark編程模型:DStream、Storm編程模型:Spout/Bolt。
c、Spark和Storm的對比介紹:
Spark:
Storm:
3、什麽是DStream?
3.1、Discretized Stream是Spark Streaming的基礎抽象,代表持續性的數據流和經過各種Spark原語操作後的結果數據流。在內部實現上,DStream是一系列連續的RDD來表示。每個RDD含有一段時間間隔內的數據,如下圖:
對數據的操作也是按照RDD為單位來進行的:
計算過程由Spark engine來完成
3.2、DStream相關操作:
DStream上的原語與RDD的類似,分為Transformations(轉換)和Output Operations(輸出)兩種,此外轉換操作中還有一些比較特殊的原語,如:updateStateByKey()、transform()以及各種Window相關的原語。
a、Transformations on DStream:
Transformation |
Meaning |
map(func) |
Return a new DStream by passing each element of the source DStream through a function func. |
flatMap(func) |
Similar to map, but each input item can be mapped to 0 or more output items. |
filter(func) |
Return a new DStream by selecting only the records of the source DStream on which func returns true. |
repartition(numPartitions) |
Changes the level of parallelism in this DStream by creating more or fewer partitions. |
union(otherStream) |
Return a new DStream that contains the union of the elements in the source DStream and otherDStream. |
count() |
Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream. |
reduce(func) |
Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed in parallel. |
countByValue() |
When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream. |
reduceByKey(func, [numTasks]) |
When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark‘s default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. |
join(otherStream, [numTasks]) |
When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key. |
cogroup(otherStream, [numTasks]) |
When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples. |
transform(func) |
Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream. |
updateStateByKey(func) |
Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key. |
特殊的Transformations 1.UpdateStateByKey Operation UpdateStateByKey原語用於記錄歷史記錄,上文中Word Count示例中就用到了該特性。若不用UpdateStateByKey來更新狀態,那麽每次數據進來後分析完成後,結果輸出後將不在保存 2.Transform Operation Transform原語允許DStream上執行任意的RDD-to-RDD函數。通過該函數可以方便的擴展Spark API。此外,MLlib(機器學習)以及Graphx也是通過本函數來進行結合的。 3.Window Operations Window Operations有點類似於Storm中的State,可以設置窗口的大小和滑動窗口的間隔來動態的獲取當前Steaming的允許狀態
b、Output Operations on DStreams:
Output Operations可以將DStream的數據輸出到外部的數據庫或文件系統,當某個Output Operations原語被調用時(與RDD的Action相同),streaming程序才會開始真正的計算過程。
Output Operation |
Meaning |
print() |
Prints the first ten elements of every batch of data in a DStream on the driver node running the streaming application. This is useful for development and debugging. |
saveAsTextFiles(prefix, [suffix]) |
Save this DStream‘s contents as text files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". |
saveAsObjectFiles(prefix, [suffix]) |
Save this DStream‘s contents as SequenceFiles of serialized Java objects. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". |
saveAsHadoopFiles(prefix, [suffix]) |
Save this DStream‘s contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". |
foreachRDD(func) |
The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs. |
4、 Spark Streaming的練習使用:
從Socket實時讀取數據,進行實時處理,首先測試是否安裝nc:
然後檢查是否安裝:[root@slaver1 hadoop]# which nc
然後安裝nc:[root@slaver1 hadoop]# yum install -y nc(此種方法安裝出現錯誤,不建議使用)
[root@slaver1 hadoop]# wget http://vault.centos.org/6.6/os/x86_64/Packages/nc-1.84-22.el6.x86_64.rpm
[root@slaver1 hadoop]# rpm -iUv nc-1.84-22.el6.x86_64.rpm
然後在窗口執行如下命令:[root@slaver1 hadoop]# nc -lk 9999(輸入消息)。
然後復制這個窗口,執行如下命令:[root@slaver1 hadoop]# nc slaver1 9999(可以接受輸入的消息)。
5、開始測試:
[hadoop@slaver1 ~]$ nc -lk 9999
[hadoop@slaver1 spark-1.5.1-bin-hadoop2.4]$ ./bin/run-example streaming.NetworkWordCount 192.168.19.131 9999
然後在第一行的窗口輸入例如:hello world hello world hadoop world spark world flume world hello world
看第二行的窗口是否進行計數計算;
1、Spark SQL and DataFrame
a、什麽是Spark SQL?
Spark SQL是Spark用來處理結構化數據的一個模塊,它提供了一個編程抽象叫做DataFrame並且作為分布式SQL查詢引擎的作用。
b、為什麽要學習Spark SQL?
我們已經學習了Hive,它是將Hive SQL轉換成MapReduce然後提交到集群上執行,大大簡化了編寫MapReduce的程序的復雜性,由於MapReduce這種計算模型執行效率比較慢。所有Spark SQL的應運而生,它是將Spark SQL轉換成RDD,然後提交到集群執行,執行效率非常快!
c、Spark的特點:
易整合、統一的數據訪問方式、兼容Hive、標準的數據連接。
d、什麽是DataFrames?
與RDD類似,DataFrame也是一個分布式數據容器。然而DataFrame更像傳統數據庫的二維表格,除了數據以外,還記錄數據的結構信息,即schema。同時,與Hive類似,DataFrame也支持嵌套數據類型(struct、array和map)。從API易用性的角度上 看,DataFrame API提供的是一套高層的關系操作,比函數式的RDD API要更加友好,門檻更低。由於與R和Pandas的DataFrame類似,Spark DataFrame很好地繼承了傳統單機數據分析的開發體驗。
2、創建DataFrames?
在Spark SQL中SQLContext是創建DataFrames和執行SQL的入口,在spark-1.5.2中已經內置了一個sqlContext:
1.在本地創建一個文件,有三列,分別是id、name、age,用空格分隔,然後上傳到hdfs上 hdfs dfs -put person.txt / 2.在spark shell執行下面命令,讀取數據,將每一行的數據使用列分隔符分割 val lineRDD = sc.textFile("hdfs://node1.itcast.cn:9000/person.txt").map(_.split(" ")) 3.定義case class(相當於表的schema) case class Person(id:Int, name:String, age:Int) 4.將RDD和case class關聯 val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) 5.將RDD轉換成DataFrame val personDF = personRDD.toDF 6.對DataFrame進行處理 personDF.show
3、DataFrame常用操作:
DSL風格語法 //查看DataFrame中的內容 personDF.show //查看DataFrame部分列中的內容 personDF.select(personDF.col("name")).show personDF.select(col("name"), col("age")).show personDF.select("name").show //打印DataFrame的Schema信息 personDF.printSchema //查詢所有的name和age,並將age+1 personDF.select(col("id"), col("name"), col("age") + 1).show personDF.select(personDF("id"), personDF("name"), personDF("age") + 1).show
//過濾age大於等於18的
personDF.filter(col("age") >= 18).show
//按年齡進行分組並統計相同年齡的人數
personDF.groupBy("age").count().show()
4、SQL風格語法:
如果想使用SQL風格的語法,需要將DataFrame註冊成表 personDF.registerTempTable("t_person") //查詢年齡最大的前兩名 sqlContext.sql("select * from t_person order by age desc limit 2").show //顯示表的Schema信息 sqlContext.sql("desc t_person").show
待續......
Spark的Streaming和Spark的SQL簡單入門學習