Spark Streaming筆記整理(三):DS的transformation與output操作
DStream的各種transformation
Transformation Meaning
map(func) 對DStream中的各個元素進行func函數操作,然後返回一個新的DStream.
flatMap(func) 與map方法類似,只不過各個輸入項可以被輸出為零個或多個輸出項
filter(func) 過濾出所有函數func返回值為true的DStream元素並返回一個新的DStream
repartition(numPartitions) 增加或減少DStream中的分區數,從而改變DStream的並行度
union(otherStream) 將源DStream和輸入參數為otherDStream的元素合並,並返回一個新的DStream.
count() 通過對DStreaim中的各個RDD中的元素進行計數,然後返回只有一個元素的RDD構成的DStream
reduce(func) 對源DStream中的各個RDD中的元素利用func進行聚合操作,然後返回只有一個元素的RDD構成的新的DStream.
countByValue() 對於元素類型為K的DStream,返回一個元素為(K,Long)鍵值對形式的新的DStream,Long對應的值為源DStream中各個RDD的key出現的次數
reduceByKey(func, [numTasks]) 利用func函數對源DStream中的key進行聚合操作,然後返回新的(K,V)對構成的DStream
join(otherStream, [numTasks]) 輸入為(K,V)、(K,W)類型的DStream,返回一個新的(K,(V,W)類型的DStream
cogroup(otherStream, [numTasks]) 輸入為(K,V)、(K,W)類型的DStream,返回一個新的 (K, Seq[V], Seq[W]) 元組類型的DStream
transform(func) 通過RDD-to-RDD函數作用於源碼DStream中的各個RDD,可以是任意的RDD操作,從而返回一個新的RDD
updateStateByKey(func) 根據於key的前置狀態和key的新值,對key進行更新,返回一個新狀態的Dstream
Window 函數:
可以看到很多都是在RDD中已經有的transformation算子操作,所以這裏只關註transform、updateStateByKey和window函數
transformation之transform操作
DStream transform
1、transform操作,應用在DStream上時,可以用於執行任意的RDD到RDD的轉換操作。它可以用於實現,DStream API中所沒有提供的操作。比如說,DStream API中,並沒有提供將一個DStream中的每個batch,與一個特定的RDD進行join的操作。但是我們自己就可以使用transform操作來實現該功能。
2、DStream.join(),只能join其他DStream。在DStream每個batch的RDD計算出來之後,會去跟其他DStream的RDD進行join。
案例
測試代碼如下:
package cn.xpleaf.bigdata.spark.scala.streaming.p1
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
/**
* 使用Transformation之transform來完成在線黑名單過濾
* 需求:
* 將日誌數據中來自於ip["27.19.74.143", "110.52.250.126"]實時過濾掉
* 數據格式
* 27.19.74.143##2016-05-30 17:38:20##GET /static/image/common/faq.gif HTTP/1.1##200##1127
*/
object _06SparkStreamingTransformOps {
def main(args: Array[String]): Unit = {
if (args == null || args.length < 2) {
System.err.println(
"""Parameter Errors! Usage: <hostname> <port>
|hostname: 監聽的網絡socket的主機名或ip地址
|port: 監聽的網絡socket的端口
""".stripMargin)
System.exit(-1)
}
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf()
.setAppName(_01SparkStreamingNetWorkOps.getClass.getSimpleName)
.setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(2))
val hostname = args(0).trim
val port = args(1).trim.toInt
//黑名單數據
val blacklist = List(("27.19.74.143", true), ("110.52.250.126", true))
// val blacklist = List("27.19.74.143", "110.52.250.126")
val blacklistRDD:RDD[(String, Boolean)] = ssc.sparkContext.parallelize(blacklist)
val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port)
// 如果用到一個DStream和rdd進行操作,無法使用dstream直接操作,只能使用transform來進行操作
val filteredDStream:DStream[String] = linesDStream.transform(rdd => {
val ip2InfoRDD:RDD[(String, String)] = rdd.map{line => {
(line.split("##")(0), line)
}}
/** A(M) B(N)兩張表:
* across join
* 交叉連接,沒有on條件的連接,會產生笛卡爾積(M*N條記錄) 不能用
* inner join
* 等值連接,取A表和B表的交集,也就是獲取在A和B中都有的數據,沒有的剔除掉 不能用
* left outer join
* 外鏈接:最常用就是左外連接(將左表中所有的數據保留,右表中能夠對應上的數據正常顯示,在右表中對應不上,顯示為null)
* 可以通過非空判斷是左外連接達到inner join的結果
*/
val joinedInfoRDD:RDD[(String, (String, Option[Boolean]))] = ip2InfoRDD.leftOuterJoin(blacklistRDD)
joinedInfoRDD.filter{case (ip, (line, joined)) => {
joined == None
}}//執行過濾操作
.map{case (ip, (line, joined)) => line}
})
filteredDStream.print()
ssc.start()
ssc.awaitTermination()
ssc.stop() // stop中的boolean參數,設置為true,關閉該ssc對應的SparkContext,默認為false,只關閉自身
}
}
nc中產生數據:
[uplooking@uplooking01 ~]$ nc -lk 4893
27.19.74.143##2016-05-30 17:38:20##GET /data/attachment/common/c8/common_2_verify_icon.png HTTP/1.1##200##582
110.52.250.126##2016-05-30 17:38:20##GET /static/js/logging.js?y7a HTTP/1.1##200##603
8.35.201.144##2016-05-30 17:38:20##GET /uc_server/avatar.php?uid=29331&size=middle HTTP/1.1##301##-
輸出結果如下:
-------------------------------------------
Time: 1526006084000 ms
-------------------------------------------
8.35.201.144##2016-05-30 17:38:20##GET /uc_server/avatar.php?uid=29331&size=middle HTTP/1.1##301##-
transformation之updateStateByKey操作
概述
1、Spark Streaming的updateStateByKey可以DStream中的數據進行按key做reduce操作,然後對各個批次的數據進行累加。
2、 updateStateByKey 解釋
以DStream中的數據進行按key做reduce操作,然後對各個批次的數據進行累加在有新的數據信息進入或更新時,可以讓用戶保持想要的任何狀。使用這個功能需要完成兩步:
1) 定義狀態:可以是任意數據類型
2) 定義狀態更新函數:用一個函數指定如何使用先前的狀態,從輸入流中的新值更新狀態。對於有狀態操作,要不斷的把當前和歷史的時間切片的RDD累加計算,隨著時間的流失,計算的數據規模會變得越來越大
3、要思考的是如果數據量很大的時候,或者對性能的要求極為苛刻的情況下,可以考慮將數據放在Redis或者tachyon或者ignite上
4、註意,updateStateByKey操作,要求必須開啟Checkpoint機制。
案例
Scala版
測試代碼如下:
package cn.xpleaf.bigdata.spark.scala.streaming.p1
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 狀態函數updateStateByKey
* 更新key的狀態(就是key對應的value)
*
* 通常的作用,計算某個key截止到當前位置的狀態
* 統計截止到目前為止的word對應count
* 要想完成截止到目前為止的操作,必須將歷史的數據和當前最新的數據累計起來,所以需要一個地方來存放歷史數據
* 這個地方就是checkpoint目錄
*
*/
object _07SparkStreamingUpdateStateByKeyOps {
def main(args: Array[String]): Unit = {
if (args == null || args.length < 2) {
System.err.println(
"""Parameter Errors! Usage: <hostname> <port>
|hostname: 監聽的網絡socket的主機名或ip地址
|port: 監聽的網絡socket的端口
""".stripMargin)
System.exit(-1)
}
val hostname = args(0).trim
val port = args(1).trim.toInt
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf()
.setAppName(_07SparkStreamingUpdateStateByKeyOps.getClass.getSimpleName)
.setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(2))
ssc.checkpoint("hdfs://ns1/checkpoint/streaming/usb")
// 接收到的當前批次的數據
val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port)
// 這是記錄下來的當前批次的數據
val rbkDStream:DStream[(String, Int)] =linesDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
val usbDStream:DStream[(String, Int)] = rbkDStream.updateStateByKey(updateFunc)
usbDStream.print()
ssc.start()
ssc.awaitTermination()
ssc.stop() // stop中的boolean參數,設置為true,關閉該ssc對應的SparkContext,默認為false,只關閉自身
}
/**
* @param seq 當前批次的key對應的數據
* @param history 歷史key對應的數據,可能有可能沒有
* @return
*/
def updateFunc(seq: Seq[Int], history: Option[Int]): Option[Int] = {
var sum = seq.sum
if(history.isDefined) {
sum += history.get
}
Option[Int](sum)
}
}
nc產生數據:
[uplooking@uplooking01 ~]$ nc -lk 4893
hello hello
hello you hello he hello me
輸出結果如下:
-------------------------------------------
Time: 1526009358000 ms
-------------------------------------------
(hello,2)
18/05/11 11:29:18 INFO WriteAheadLogManager for Thread: Attempting to clear 0 old log files in hdfs://ns1/checkpoint/streaming/usb/receivedBlockMetadata older than 1526009338000:
-------------------------------------------
Time: 1526009360000 ms
-------------------------------------------
(hello,5)
(me,1)
(you,1)
(he,1)
18/05/11 11:29:20 INFO WriteAheadLogManager for Thread: Attempting to clear 0 old log files in hdfs://ns1/checkpoint/streaming/usb/receivedBlockMetadata older than 1526009340000:
-------------------------------------------
Time: 1526009362000 ms
-------------------------------------------
(hello,5)
(me,1)
(you,1)
(he,1)
Java版
用法略有不同,主要是 狀態更新函數的寫法上有區別,如下:
package cn.xpleaf.bigdata.spark.java.streaming.p1;
import com.google.common.base.Optional;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
public class _02SparkStreamingUpdateStateByKeyOps {
public static void main(String[] args) {
if(args == null || args.length < 2) {
System.err.println("Parameter Errors! Usage: <hostname> <port>");
System.exit(-1);
}
Logger.getLogger("org.apache.spark").setLevel(Level.OFF);
SparkConf conf = new SparkConf()
.setAppName(_02SparkStreamingUpdateStateByKeyOps.class.getSimpleName())
.setMaster("local[2]");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(2));
jsc.checkpoint("hdfs://ns1/checkpoint/streaming/usb");
String hostname = args[0].trim();
int port = Integer.valueOf(args[1].trim());
JavaReceiverInputDStream<String> lineDStream = jsc.socketTextStream(hostname, port);//默認的持久化級別:MEMORY_AND_DISK_SER_2
JavaDStream<String> wordsDStream = lineDStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
});
JavaPairDStream<String, Integer> pairsDStream = wordsDStream.mapToPair(word -> {
return new Tuple2<String, Integer>(word, 1);
});
JavaPairDStream<String, Integer> rbkDStream = pairsDStream.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
// 做歷史的累計操作
JavaPairDStream<String, Integer> usbDStream = rbkDStream.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
@Override
public Optional<Integer> call(List<Integer> current, Optional<Integer> history) throws Exception {
int sum = 0;
for (int i : current) {
sum += i;
}
if (history.isPresent()) {
sum += history.get();
}
return Optional.of(sum);
}
});
usbDStream.print();
jsc.start();//啟動流式計算
jsc.awaitTermination();//等待執行結束
jsc.close();
}
}
transformation之window操作
DStream window 滑動窗口
Spark Streaming提供了滑動窗口操作的支持,從而讓我們可以對一個滑動窗口內的數據執行計算操作。每次掉落在窗口內的RDD的數據,會被聚合起來執行計算操作,然後生成的RDD,會作為window DStream的一個RDD。比如下圖中,就是對每三秒鐘的數據執行一次滑動窗口計算,這3秒內的3個RDD會被聚合起來進行處理,然後過了兩秒鐘,又會對最近三秒內的數據執行滑動窗口計算。所以每個滑動窗口操作,都必須指定兩個參數,窗口長度以及滑動間隔,而且這兩個參數值都必須是batch間隔的整數倍。
1.紅色的矩形就是一個窗口,窗口hold的是一段時間內的數據流。
2.這裏面每一個time都是時間單元,在官方的例子中,每隔window size是3 time unit, 而且每隔2個單位時間,窗口會slide一次。
所以基於窗口的操作,需要指定2個參數:
window length - The duration of the window (3 in the figure)
slide interval - The interval at which the window-based operation is performed (2 in the figure).
1.窗口大小,個人感覺是一段時間內數據的容器。
2.滑動間隔,就是我們可以理解的cron表達式吧。
舉個例子吧:
還是以最著名的wordcount舉例,每隔10秒,統計一下過去30秒過來的數據。
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
DSstream window滑動容器功能
window 對每個滑動窗口的數據執行自定義的計算
countByWindow 對每個滑動窗口的數據執行count操作
reduceByWindow 對每個滑動窗口的數據執行reduce操作
reduceByKeyAndWindow 對每個滑動窗口的數據執行reduceByKey操作
countByValueAndWindow 對每個滑動窗口的數據執行countByValue操作
案例
測試代碼如下:
package cn.xpleaf.bigdata.spark.scala.streaming.p1
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
*窗口函數window
* 每隔多長時間(滑動頻率slideDuration)統計過去多長時間(窗口長度windowDuration)中的數據
* 需要註意的就是窗口長度和滑動頻率
* windowDuration = M*batchInterval,
slideDuration = N*batchInterval
*/
object _08SparkStreamingWindowOps {
def main(args: Array[String]): Unit = {
if (args == null || args.length < 2) {
System.err.println(
"""Parameter Errors! Usage: <hostname> <port>
|hostname: 監聽的網絡socket的主機名或ip地址
|port: 監聽的網絡socket的端口
""".stripMargin)
System.exit(-1)
}
val hostname = args(0).trim
val port = args(1).trim.toInt
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf()
.setAppName(_08SparkStreamingWindowOps.getClass.getSimpleName)
.setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(2))
// 接收到的當前批次的數據
val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port)
val pairsDStream:DStream[(String, Int)] =linesDStream.flatMap(_.split(" ")).map((_, 1))
// 每隔4s,統計過去6s中產生的數據
val retDStream:DStream[(String, Int)] = pairsDStream.reduceByKeyAndWindow(_+_, windowDuration = Seconds(6), slideDuration = Seconds(4))
retDStream.print()
ssc.start()
ssc.awaitTermination()
ssc.stop() // stop中的boolean參數,設置為true,關閉該ssc對應的SparkContext,默認為false,只關閉自身
}
}
nc產生數據:
[uplooking@uplooking01 ~]$ nc -lk 4893
hello you
hello he
hello me
hello you
hello he
輸出結果如下:
-------------------------------------------
Time: 1526016316000 ms
-------------------------------------------
(hello,4)
(me,1)
(you,2)
(he,1)
-------------------------------------------
Time: 1526016320000 ms
-------------------------------------------
(hello,5)
(me,1)
(you,2)
(he,2)
-------------------------------------------
Time: 1526016324000 ms
-------------------------------------------
DStream的output操作以及foreachRDD
DStream output操作
1、print
打印每個batch中的前10個元素,主要用於測試,或者是不需要執行什麽output操作時,用於簡單觸發一下job。
2、saveAsTextFile(prefix, [suffix])
將每個batch的數據保存到文件中。每個batch的文件的命名格式為:prefix-TIME_IN_MS[.suffix]
3、saveAsObjectFile
同上,但是將每個batch的數據以序列化對象的方式,保存到SequenceFile中。
4、saveAsHadoopFile
同上,將數據保存到Hadoop文件中
5、foreachRDD
最常用的output操作,遍歷DStream中的每個產生的RDD,進行處理。可以將每個RDD中的數據寫入外部存儲,比如文件、數據庫、緩存等。通常在其中,是針對RDD執行action操作的,比如foreach。
DStream foreachRDD詳解
相關內容其實在Spark開發調優中已經有相關的說明。
通常在foreachRDD中,都會創建一個Connection,比如JDBC Connection,然後通過Connection將數據寫入外部存儲。
誤區一:在RDD的foreach操作外部,創建Connection
這種方式是錯誤的,因為它會導致Connection對象被序列化後傳輸到每個Task中。而這種Connection對象,實際上一般是不支持序列化的,也就無法被傳輸。
dstream.foreachRDD { rdd =>
val connection = createNewConnection()
rdd.foreach { record => connection.send(record)
}
}
誤區二:在RDD的foreach操作內部,創建Connection
這種方式是可以的,但是效率低下。因為它會導致對於RDD中的每一條數據,都創建一個Connection對象。而通常來說,Connection的創建,是很消耗性能的。
dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}
DStream foreachRDD合理使用
合理方式一:使用RDD的foreachPartition操作,並且在該操作內部,創建Connection對象,這樣就相當於是,為RDD的每個partition創建一個Connection對象,節省資源的多了。
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
合理方式二:自己手動封裝一個靜態連接池,使用RDD的foreachPartition操作,並且在該操作內部,從靜態連接池中,通過靜態方法,獲取到一個連接,使用之後再還回去。這樣的話,甚至在多個RDD的partition之間,也可以復用連接了。而且可以讓連接池采取懶創建的策略,並且空閑一段時間後,將其釋放掉。
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection)
}
}
foreachRDD 與foreachPartition實現實戰
需要註意的是:
(1)、你最好使用forEachPartition函數來遍歷RDD,並且在每臺Work上面創建數據庫的connection。
(2)、如果你的數據庫並發受限,可以通過控制數據的分區來減少並發。
(3)、在插入MySQL的時候最好使用批量插入。
(4),確保你寫入的數據庫過程能夠處理失敗,因為你插入數據庫的過程可能會經過網絡,這可能導致數據插入數據庫失敗。
(5)、不建議將你的RDD數據寫入到MySQL等關系型數據庫中。
這部分內容其實可以參考開發調優部分的案例,只是那裏並沒有foreachRDD,因為其並沒有使用DStream,但是原理是一樣的,因為最終都是針對RDD來進行操作的。
原文鏈接:http://blog.51cto.com/xpleaf/2115343
Spark Streaming筆記整理(三):DS的transformation與output操作