1. 程式人生 > >Spark Streaming筆記整理(三):DS的transformation與output操作

Spark Streaming筆記整理(三):DS的transformation與output操作

job watermark number 這樣的 格式 current fix work eat

DStream的各種transformation

Transformation  Meaning
map(func)   對DStream中的各個元素進行func函數操作,然後返回一個新的DStream.
flatMap(func)   與map方法類似,只不過各個輸入項可以被輸出為零個或多個輸出項
filter(func)    過濾出所有函數func返回值為true的DStream元素並返回一個新的DStream
repartition(numPartitions)  增加或減少DStream中的分區數,從而改變DStream的並行度
union(otherStream)  將源DStream和輸入參數為otherDStream的元素合並,並返回一個新的DStream.
count()     通過對DStreaim中的各個RDD中的元素進行計數,然後返回只有一個元素的RDD構成的DStream
reduce(func)    對源DStream中的各個RDD中的元素利用func進行聚合操作,然後返回只有一個元素的RDD構成的新的DStream.
countByValue()  對於元素類型為K的DStream,返回一個元素為(K,Long)鍵值對形式的新的DStream,Long對應的值為源DStream中各個RDD的key出現的次數
reduceByKey(func, [numTasks])   利用func函數對源DStream中的key進行聚合操作,然後返回新的(K,V)對構成的DStream
join(otherStream, [numTasks])   輸入為(K,V)、(K,W)類型的DStream,返回一個新的(K,(V,W)類型的DStream
cogroup(otherStream, [numTasks])    輸入為(K,V)、(K,W)類型的DStream,返回一個新的 (K, Seq[V], Seq[W]) 元組類型的DStream
transform(func)     通過RDD-to-RDD函數作用於源碼DStream中的各個RDD,可以是任意的RDD操作,從而返回一個新的RDD
updateStateByKey(func)  根據於key的前置狀態和key的新值,對key進行更新,返回一個新狀態的Dstream
Window 函數: 

可以看到很多都是在RDD中已經有的transformation算子操作,所以這裏只關註transform、updateStateByKey和window函數

transformation之transform操作

DStream transform

1、transform操作,應用在DStream上時,可以用於執行任意的RDD到RDD的轉換操作。它可以用於實現,DStream API中所沒有提供的操作。比如說,DStream API中,並沒有提供將一個DStream中的每個batch,與一個特定的RDD進行join的操作。但是我們自己就可以使用transform操作來實現該功能。

2、DStream.join(),只能join其他DStream。在DStream每個batch的RDD計算出來之後,會去跟其他DStream的RDD進行join。

案例

測試代碼如下:

package cn.xpleaf.bigdata.spark.scala.streaming.p1

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

/**
  * 使用Transformation之transform來完成在線黑名單過濾
  * 需求:
  *     將日誌數據中來自於ip["27.19.74.143", "110.52.250.126"]實時過濾掉
  * 數據格式
  *     27.19.74.143##2016-05-30 17:38:20##GET /static/image/common/faq.gif HTTP/1.1##200##1127
  */
object _06SparkStreamingTransformOps {
    def main(args: Array[String]): Unit = {
        if (args == null || args.length < 2) {
            System.err.println(
                """Parameter Errors! Usage: <hostname> <port>
                  |hostname: 監聽的網絡socket的主機名或ip地址
                  |port:    監聽的網絡socket的端口
                """.stripMargin)
            System.exit(-1)
        }
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

        val conf = new SparkConf()
            .setAppName(_01SparkStreamingNetWorkOps.getClass.getSimpleName)
            .setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(2))

        val hostname = args(0).trim
        val port = args(1).trim.toInt

        //黑名單數據
        val blacklist = List(("27.19.74.143", true), ("110.52.250.126", true))
//        val blacklist = List("27.19.74.143", "110.52.250.126")
        val blacklistRDD:RDD[(String, Boolean)] = ssc.sparkContext.parallelize(blacklist)

        val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port)

        // 如果用到一個DStream和rdd進行操作,無法使用dstream直接操作,只能使用transform來進行操作
        val filteredDStream:DStream[String] = linesDStream.transform(rdd => {
            val ip2InfoRDD:RDD[(String, String)] = rdd.map{line => {
                (line.split("##")(0), line)
            }}
            /** A(M) B(N)兩張表:
              * across join
              *     交叉連接,沒有on條件的連接,會產生笛卡爾積(M*N條記錄) 不能用
              * inner join
              *     等值連接,取A表和B表的交集,也就是獲取在A和B中都有的數據,沒有的剔除掉 不能用
              * left outer join
              *     外鏈接:最常用就是左外連接(將左表中所有的數據保留,右表中能夠對應上的數據正常顯示,在右表中對應不上,顯示為null)
              *         可以通過非空判斷是左外連接達到inner join的結果
              */
            val joinedInfoRDD:RDD[(String, (String, Option[Boolean]))] = ip2InfoRDD.leftOuterJoin(blacklistRDD)

            joinedInfoRDD.filter{case (ip, (line, joined)) => {
                joined == None
            }}//執行過濾操作
                .map{case (ip, (line, joined)) => line}
        })

        filteredDStream.print()

        ssc.start()
        ssc.awaitTermination()
        ssc.stop()  // stop中的boolean參數,設置為true,關閉該ssc對應的SparkContext,默認為false,只關閉自身
    }
}

nc中產生數據:

[uplooking@uplooking01 ~]$ nc -lk 4893
27.19.74.143##2016-05-30 17:38:20##GET /data/attachment/common/c8/common_2_verify_icon.png HTTP/1.1##200##582
110.52.250.126##2016-05-30 17:38:20##GET /static/js/logging.js?y7a HTTP/1.1##200##603
8.35.201.144##2016-05-30 17:38:20##GET /uc_server/avatar.php?uid=29331&size=middle HTTP/1.1##301##-

輸出結果如下:

-------------------------------------------
Time: 1526006084000 ms
-------------------------------------------
8.35.201.144##2016-05-30 17:38:20##GET /uc_server/avatar.php?uid=29331&size=middle HTTP/1.1##301##-

transformation之updateStateByKey操作

概述

1、Spark Streaming的updateStateByKey可以DStream中的數據進行按key做reduce操作,然後對各個批次的數據進行累加。

2、 updateStateByKey 解釋

以DStream中的數據進行按key做reduce操作,然後對各個批次的數據進行累加在有新的數據信息進入或更新時,可以讓用戶保持想要的任何狀。使用這個功能需要完成兩步:

1) 定義狀態:可以是任意數據類型

2) 定義狀態更新函數:用一個函數指定如何使用先前的狀態,從輸入流中的新值更新狀態。對於有狀態操作,要不斷的把當前和歷史的時間切片的RDD累加計算,隨著時間的流失,計算的數據規模會變得越來越大

3、要思考的是如果數據量很大的時候,或者對性能的要求極為苛刻的情況下,可以考慮將數據放在Redis或者tachyon或者ignite上

4、註意,updateStateByKey操作,要求必須開啟Checkpoint機制。

案例

Scala版

測試代碼如下:

package cn.xpleaf.bigdata.spark.scala.streaming.p1

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 狀態函數updateStateByKey
  *     更新key的狀態(就是key對應的value)
  *
  * 通常的作用,計算某個key截止到當前位置的狀態
  *     統計截止到目前為止的word對應count
  * 要想完成截止到目前為止的操作,必須將歷史的數據和當前最新的數據累計起來,所以需要一個地方來存放歷史數據
  * 這個地方就是checkpoint目錄
  *
  */
object _07SparkStreamingUpdateStateByKeyOps {
    def main(args: Array[String]): Unit = {
        if (args == null || args.length < 2) {
            System.err.println(
                """Parameter Errors! Usage: <hostname> <port>
                  |hostname: 監聽的網絡socket的主機名或ip地址
                  |port:    監聽的網絡socket的端口
                """.stripMargin)
            System.exit(-1)
        }
        val hostname = args(0).trim
        val port = args(1).trim.toInt
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

        val conf = new SparkConf()
            .setAppName(_07SparkStreamingUpdateStateByKeyOps.getClass.getSimpleName)
            .setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(2))

        ssc.checkpoint("hdfs://ns1/checkpoint/streaming/usb")

        // 接收到的當前批次的數據
        val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port)
        // 這是記錄下來的當前批次的數據
        val rbkDStream:DStream[(String, Int)] =linesDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)

        val usbDStream:DStream[(String, Int)]  = rbkDStream.updateStateByKey(updateFunc)

        usbDStream.print()

        ssc.start()
        ssc.awaitTermination()
        ssc.stop()  // stop中的boolean參數,設置為true,關閉該ssc對應的SparkContext,默認為false,只關閉自身
    }

    /**
      * @param seq 當前批次的key對應的數據
      * @param history 歷史key對應的數據,可能有可能沒有
      * @return
      */
    def updateFunc(seq: Seq[Int], history: Option[Int]): Option[Int] = {
        var sum = seq.sum
        if(history.isDefined) {
            sum += history.get
        }
        Option[Int](sum)
    }
}

nc產生數據:

[uplooking@uplooking01 ~]$ nc -lk 4893
hello hello
hello you hello he hello me

輸出結果如下:

-------------------------------------------
Time: 1526009358000 ms
-------------------------------------------
(hello,2)

18/05/11 11:29:18 INFO WriteAheadLogManager  for Thread: Attempting to clear 0 old log files in hdfs://ns1/checkpoint/streaming/usb/receivedBlockMetadata older than 1526009338000: 
-------------------------------------------
Time: 1526009360000 ms
-------------------------------------------
(hello,5)
(me,1)
(you,1)
(he,1)

18/05/11 11:29:20 INFO WriteAheadLogManager  for Thread: Attempting to clear 0 old log files in hdfs://ns1/checkpoint/streaming/usb/receivedBlockMetadata older than 1526009340000: 
-------------------------------------------
Time: 1526009362000 ms
-------------------------------------------
(hello,5)
(me,1)
(you,1)
(he,1)

Java版

用法略有不同,主要是 狀態更新函數的寫法上有區別,如下:

package cn.xpleaf.bigdata.spark.java.streaming.p1;

import com.google.common.base.Optional;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;

public class _02SparkStreamingUpdateStateByKeyOps {
    public static void main(String[] args) {
        if(args == null || args.length < 2) {
            System.err.println("Parameter Errors! Usage: <hostname> <port>");
            System.exit(-1);
        }
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF);
        SparkConf conf = new SparkConf()
                .setAppName(_02SparkStreamingUpdateStateByKeyOps.class.getSimpleName())
                .setMaster("local[2]");

        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(2));
        jsc.checkpoint("hdfs://ns1/checkpoint/streaming/usb");

        String hostname = args[0].trim();
        int port = Integer.valueOf(args[1].trim());
        JavaReceiverInputDStream<String> lineDStream = jsc.socketTextStream(hostname, port);//默認的持久化級別:MEMORY_AND_DISK_SER_2

        JavaDStream<String> wordsDStream = lineDStream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterable<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" "));
            }
        });

        JavaPairDStream<String, Integer> pairsDStream = wordsDStream.mapToPair(word -> {
            return new Tuple2<String, Integer>(word, 1);
        });

        JavaPairDStream<String, Integer> rbkDStream = pairsDStream.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        // 做歷史的累計操作
        JavaPairDStream<String, Integer> usbDStream = rbkDStream.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
            @Override
            public Optional<Integer> call(List<Integer> current, Optional<Integer> history) throws Exception {

                int sum = 0;
                for (int i : current) {
                    sum += i;
                }

                if (history.isPresent()) {
                    sum += history.get();
                }
                return Optional.of(sum);
            }
        });

        usbDStream.print();

        jsc.start();//啟動流式計算
        jsc.awaitTermination();//等待執行結束
        jsc.close();
    }
}

transformation之window操作

DStream window 滑動窗口

Spark Streaming提供了滑動窗口操作的支持,從而讓我們可以對一個滑動窗口內的數據執行計算操作。每次掉落在窗口內的RDD的數據,會被聚合起來執行計算操作,然後生成的RDD,會作為window DStream的一個RDD。比如下圖中,就是對每三秒鐘的數據執行一次滑動窗口計算,這3秒內的3個RDD會被聚合起來進行處理,然後過了兩秒鐘,又會對最近三秒內的數據執行滑動窗口計算。所以每個滑動窗口操作,都必須指定兩個參數,窗口長度以及滑動間隔,而且這兩個參數值都必須是batch間隔的整數倍。

技術分享圖片

1.紅色的矩形就是一個窗口,窗口hold的是一段時間內的數據流。

2.這裏面每一個time都是時間單元,在官方的例子中,每隔window size是3 time unit, 而且每隔2個單位時間,窗口會slide一次。

所以基於窗口的操作,需要指定2個參數:

window length - The duration of the window (3 in the figure)
slide interval - The interval at which the window-based operation is performed (2 in the figure). 

1.窗口大小,個人感覺是一段時間內數據的容器。
2.滑動間隔,就是我們可以理解的cron表達式吧。
舉個例子吧:
還是以最著名的wordcount舉例,每隔10秒,統計一下過去30秒過來的數據。
// Reduce last 30 seconds of data, every 10 seconds  
val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10)) 

DSstream window滑動容器功能

window 對每個滑動窗口的數據執行自定義的計算
countByWindow 對每個滑動窗口的數據執行count操作
reduceByWindow 對每個滑動窗口的數據執行reduce操作
reduceByKeyAndWindow 對每個滑動窗口的數據執行reduceByKey操作
countByValueAndWindow 對每個滑動窗口的數據執行countByValue操作

案例

測試代碼如下:

package cn.xpleaf.bigdata.spark.scala.streaming.p1

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  *窗口函數window
  *   每隔多長時間(滑動頻率slideDuration)統計過去多長時間(窗口長度windowDuration)中的數據
  * 需要註意的就是窗口長度和滑動頻率
  * windowDuration = M*batchInterval,
    slideDuration = N*batchInterval
  */
object _08SparkStreamingWindowOps {
    def main(args: Array[String]): Unit = {
        if (args == null || args.length < 2) {
            System.err.println(
                """Parameter Errors! Usage: <hostname> <port>
                  |hostname: 監聽的網絡socket的主機名或ip地址
                  |port:    監聽的網絡socket的端口
                """.stripMargin)
            System.exit(-1)
        }
        val hostname = args(0).trim
        val port = args(1).trim.toInt
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

        val conf = new SparkConf()
            .setAppName(_08SparkStreamingWindowOps.getClass.getSimpleName)
            .setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(2))

        // 接收到的當前批次的數據
        val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port)
        val pairsDStream:DStream[(String, Int)] =linesDStream.flatMap(_.split(" ")).map((_, 1))

        // 每隔4s,統計過去6s中產生的數據
        val retDStream:DStream[(String, Int)] = pairsDStream.reduceByKeyAndWindow(_+_, windowDuration = Seconds(6), slideDuration = Seconds(4))

        retDStream.print()

        ssc.start()
        ssc.awaitTermination()
        ssc.stop()  // stop中的boolean參數,設置為true,關閉該ssc對應的SparkContext,默認為false,只關閉自身
    }
}

nc產生數據:

[uplooking@uplooking01 ~]$ nc -lk 4893
hello you
hello he
hello me
hello you
hello he

輸出結果如下:

-------------------------------------------
Time: 1526016316000 ms
-------------------------------------------
(hello,4)
(me,1)
(you,2)
(he,1)

-------------------------------------------
Time: 1526016320000 ms
-------------------------------------------
(hello,5)
(me,1)
(you,2)
(he,2)

-------------------------------------------
Time: 1526016324000 ms
-------------------------------------------

DStream的output操作以及foreachRDD

DStream output操作

1、print 打印每個batch中的前10個元素,主要用於測試,或者是不需要執行什麽output操作時,用於簡單觸發一下job。

2、saveAsTextFile(prefix, [suffix]) 將每個batch的數據保存到文件中。每個batch的文件的命名格式為:prefix-TIME_IN_MS[.suffix]

3、saveAsObjectFile 同上,但是將每個batch的數據以序列化對象的方式,保存到SequenceFile中。

4、saveAsHadoopFile 同上,將數據保存到Hadoop文件中

5、foreachRDD 最常用的output操作,遍歷DStream中的每個產生的RDD,進行處理。可以將每個RDD中的數據寫入外部存儲,比如文件、數據庫、緩存等。通常在其中,是針對RDD執行action操作的,比如foreach。

DStream foreachRDD詳解

相關內容其實在Spark開發調優中已經有相關的說明。

通常在foreachRDD中,都會創建一個Connection,比如JDBC Connection,然後通過Connection將數據寫入外部存儲。

誤區一:在RDD的foreach操作外部,創建Connection

這種方式是錯誤的,因為它會導致Connection對象被序列化後傳輸到每個Task中。而這種Connection對象,實際上一般是不支持序列化的,也就無法被傳輸。

dstream.foreachRDD { rdd =>
  val connection = createNewConnection() 
  rdd.foreach { record => connection.send(record)
  }
}

誤區二:在RDD的foreach操作內部,創建Connection

這種方式是可以的,但是效率低下。因為它會導致對於RDD中的每一條數據,都創建一個Connection對象。而通常來說,Connection的創建,是很消耗性能的。

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}

DStream foreachRDD合理使用

合理方式一:使用RDD的foreachPartition操作,並且在該操作內部,創建Connection對象,這樣就相當於是,為RDD的每個partition創建一個Connection對象,節省資源的多了。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

合理方式二:自己手動封裝一個靜態連接池,使用RDD的foreachPartition操作,並且在該操作內部,從靜態連接池中,通過靜態方法,獲取到一個連接,使用之後再還回去。這樣的話,甚至在多個RDD的partition之間,也可以復用連接了。而且可以讓連接池采取懶創建的策略,並且空閑一段時間後,將其釋放掉。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  
  }
}

foreachRDD 與foreachPartition實現實戰

需要註意的是:

(1)、你最好使用forEachPartition函數來遍歷RDD,並且在每臺Work上面創建數據庫的connection。

(2)、如果你的數據庫並發受限,可以通過控制數據的分區來減少並發。

(3)、在插入MySQL的時候最好使用批量插入。

(4),確保你寫入的數據庫過程能夠處理失敗,因為你插入數據庫的過程可能會經過網絡,這可能導致數據插入數據庫失敗。

(5)、不建議將你的RDD數據寫入到MySQL等關系型數據庫中。

這部分內容其實可以參考開發調優部分的案例,只是那裏並沒有foreachRDD,因為其並沒有使用DStream,但是原理是一樣的,因為最終都是針對RDD來進行操作的。

原文鏈接:http://blog.51cto.com/xpleaf/2115343

Spark Streaming筆記整理(三):DS的transformation與output操作