Spark系列4- Spark Streaming
1 流計算
靜態資料和流資料
靜態資料類似儲存在水庫中的水,是相對靜止不動的,如資料倉庫中儲存的資料、關係型資料庫中儲存的資料等。流資料是指在時間分佈和數量上無限的一系列動態資料合體,資料記錄是流資料的最小組成單元。
靜態資料和流資料的處理,分別對應兩種不同的計算模式:批量計算和實時計算。資料的兩種處理模型如下圖所示。
2 Spark Streaming
Spark Streaming簡介
Spark Streaming是構建在spark上的實時計算框架,它擴充套件了Spark處理大規模流式資料的能力。Spark Streaming可結合批處理和互動式查詢,適用需要將歷史資料和實時資料聯合分析的應用場景。
針對流資料的實時計算稱為流計算。總體來說,流計算秉承一個基本理念,即資料的價值隨著時間的流逝而降低。對於一個流計算系統來說,它應達到高效能、海量式、實時性、分散式、易用性和可靠性等要求。流計算系統一般包括:資料實時採集、資料實時計算和資料實時查詢服務三大部分,Spark Streaming的實時計算系統的三大組成部分如下圖所示。
Spark Streaming最主要的抽象是離散化資料流(Discretized Stream, DStream),表示連續不斷的資料流。Streaming內部按照時間片將資料分成一段一段,每一段資料轉換成一個RDD,並且對DStream的操作最終都會被轉化成對RDD的操作。其執行流程如下圖所示,
**思考:**DStream是由多個具有時間屬性的RDD組成。
Spark Streaming 和Hadoop Storm對比
Spark Streaming只能實現秒級的計算,而Hadoop Storm可以實現毫秒級的響應,因此Spark Streaming無法滿足實時性要求非常高的場景,只能勝任其他流式準實時計算場景。
相比於Storm,Spark Streaming是基於RDD的,更容易做高效的容錯處理。Spark Streaming的離散化工作機制,使其可以同時相容批量和實時資料處理的邏輯和演算法,適用需要將歷史資料和實時資料聯合分析的應用場景。
Hadoop+Storm的Lambda架構涉及的元件較多,部署比較繁瑣,而Spark Streaming同時集成了流式計算和批量計算的功能,涉及元件少,部署簡單。
Spark Streaming 工作機制
Spark Streaming有一個Reciver元件,作為一個守護程序執行在Executor上,每個Receiver負責一個DStream輸入流。Reciver元件收到資料之後會提交給Spark Streaming程式進行處理。處理的結果可以傳遞給視覺化顯示或者儲存到HBase、HDFS中。如下圖所示,
3 DStream程式設計
Spark Streaming程式設計的基本步驟
- 通過建立輸入DStream來定義輸入源;
- 通過對DStream應用轉換和輸出操作來定義計算流;
- 呼叫StreamContext物件的start()方法,啟動接收和處理流程;
- 呼叫StreamContext物件的awaitTermination()方法,等待流計算結束或者呼叫stop()方法,手動結束流計算;
建立Streaming Context
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(1))
編寫獨立應用程式時
import org.apache.spark.streaming._
val sc = new SparkConf().setAppName("AppName")
val ssc = new StreamingContext(sc, Seconds(1))
從檔案中不停的獲取資料流
package sparkstudy.sparkstreaming
import org.apache.spark._
import org.apache.spark.streaming._
object StreamingFromFile {
def main(args: Array[String]) = {
val sparkConf = new SparkConf().setAppName("StreamingFromFile")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val lines = ssc.textFileStream("file:///root/data/streaming")
val words = lines.flatMap(line => line.split(","))
val wordsCount = words.map(x => (x, 1)).reduceByKey(_ + _)
wordsCount.print()
ssc.start()
ssc.awaitTermination()
}
}
- 只會對/root/data/streaming目錄下新增的檔案進行操作,即使修改歷史檔案,streaming也不會再操作。
- ssc.stop()之後,只有退出會話,不能再通過同一個會話啟動DStreaming;
輔助指令碼,用於不停的生成新的檔案
mkdir /root/data/streaming
rm -f streaming/*
for v in `seq 1 100`;do tail -n $(($v+10)) user_login.txt > streaming/$v.txt; sleep 1; done
從socket中讀取資料流
使用Linux中的nc命令模擬服務端
nc -lk 9999
Streaming客戶端
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level,Logger}
object StreamingExamples extends Logging {
def setStreamingLogLevels() {
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements()
if (!log4jInitialized) {
logInfo("Setting log level to [WARN] for Streaming examples.")
Logger.getRootLogger.setLevel(Level.WARN)
}
}
}
object StreamingFromSocket {
def main(args: Array[String]) = {
if (args.length < 2) {
System.err.println("StreamingFromSocket <hostname> <port>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val sparkConf = new SparkConf().setAppName("StreamingFromSocket")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(line => line.split(","))
val wordsCount = words.map(x => (x, 1)).reduceByKey(_ + _)
wordsCount.print()
ssc.start()
ssc.awaitTermination()
}
}
從RDD佇列中讀取資料流
可以呼叫StreamingContext物件的queueStream()方法建立基於RDD佇列的DStream。下面的例子中,每個1秒建立一個RDD放入佇列,Spark Streaming每隔2秒就從佇列中取出資料進行處理
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
object StreamingFromRddQueue {
def main(args: Array[String]) = {
val sparkConf = new SparkConf().setAppName("StreamingFromRddQueue")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val rddQueue = new scala.collection.mutable.SynchronizedQueue[RDD[Int]]()
val queueStream = ssc.queueStream(rddQueue)
val mappedStream = queueStream.map(r => (r % 10,1))
val reduceStream = mappedStream.reduceByKey(_ + _)
reduceStream.print()
ssc.start()
for( i <- 1 to 10) {
rddQueue += ssc.sparkContext.makeRDD(1 to 100, 2)
Thread.sleep(1000)
}
ssc.stop()
}
}
從kafka中讀取資料流
kafka的安裝和部署方法以及topic建立和測試方法請參考:Kafka叢集的安裝和部署。
建立topic
kafka-topics.sh --create \
--zookeeper master:2181,slave2:2181,slave3:12181 \
--partitions 3 --replication-factor 3 --topic streaming
- 下載並拷貝spark-streaming-kafka*.jar包到
$SPARK_HOME/jars/kafka
目錄下 - 拷貝
$KAFKA_HOME/lib
下的所有jar包到$SPARK_HOME/jars/kafka
mkdir $SPARK_HOME/jars/kafka
cp spark-streaming-kafka-0-8_2.11-2.1.0.jar $SPARK_HOME/jars/kafka/
cp $KAFKA_HOME/libs/*.jar $SPARK_HOME/jars/kafka/
啟動spark
sh $SPARK_HOME/sbin/start-all.sh
生產者程式
package sparkstudy.sparkstreaming
import java.util.HashMap
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object MyKafkaProducer {
def main(args: Array[String]) = {
if (args.length < 4) {
System.err.println("StreamingFromSocket <BrokerList><hostname><messagePerSec><wordsPerMessage>")
System.exit(1)
}
val Array(brokers, topic, messagePerSec, wordsPerMessage) = args
val props = new HashMap[String, Object]
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
// send some messages
while(true) {
(1 to messagePerSec.toInt).foreach {
messageNum => val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ")
print(str)
println()
val message = new ProducerRecord[String, String](topic, null, str)
producer.send(message)
}
Thread.sleep(1000)
}
}
}
消費者程式
package sparkstudy.sparkstreaming
import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils
import common.StreamingExamples
object MyKafkaConsumer {
def main(args: Array[String]) = {
StreamingExamples.setStreamingLogLevels()
val sc = new SparkConf().setAppName("MyKafkaConsumer")
val ssc = new StreamingContext(sc, Seconds(10))
ssc.checkpoint("/data/kafka/checkpoint")
val zkQuorum = "localhost:2181"
val groupId = "1"
val topics = "streaming"
val numThread = 1
val topicMap = topics.split(",").map((_, numThread.toInt)).toMap
val lineMap = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap)
val lines = lineMap.map(_._2)
val words = lines.flatMap(_.split(" "))
val pair = words.map(x => (x,1))
val wordCounts = pair.reduceByKeyAndWindow(_ + _, _ - _, Minutes(2), Seconds(10), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
執行生產者和消費者程式
使用maven編譯上述程式碼,生產的jar包為SparkStreaming-0.0.1-SNAPSHOT.jar。 執行生產者程式
spark-submit --jars /root/software/spark-2.2.0-bin-hadoop2.6/jars/kafka/kafka-clients-0.10.2.0.jar \
--class sparkstudy.sparkstreaming.MyKafkaProducer SparkStreaming-0.0.1-SNAPSHOT.jar master:9092 streaming 10 4
執行消費者程式
spark-submit --jars /root/software/spark-2.2.0-bin-hadoop2.6/jars/kafka/kafka-clients-0.10.2.0.jar,\
/root/software/spark-2.2.0-bin-hadoop2.6/jars/kafka/spark-streaming-kafka-0-8_2.11-2.2.0.jar,\
/root/software/spark-2.2.0-bin-hadoop2.6/jars/kafka/metrics-core-2.2.0.jar,\
/root/software/spark-2.2.0-bin-hadoop2.6/jars/kafka/kafka_2.11-0.10.2.0.jar,\
/root/software/spark-2.2.0-bin-hadoop2.6/jars/kafka/zkclient-0.10.jar \
--class sparkstudy.sparkstreaming.MyKafkaConsumer SparkStreaming-0.0.1-SNAPSHOT.jar
spark streaming去取kafka資料時踩到的坑
**錯誤:**Caused by: java.lang.ClassNotFoundException: org.I0Itec.zkclient.IZkStateListener **解決方案:**是因為未指定zk的依賴,通過jars指定zkclient庫,–jars /root/software/spark-2.2.0-bin-hadoop2.6/jars/kafka/zkclient-0.10.jar
**錯誤:**java.lang.NoClassDefFoundError: org/apache/spark/Logging **解決方案:**org.apache.spark.Logging is available in Spark version 1.5.2 or lower version. It is not in the 2.0.0. 因此我把spark-streaming-kafka替換成spark-streaming-kafka-0-8。因為我的spark版本使用的是2.2,scala版本使用的是2.11.8,所以使用了spark-streaming-kafka-0-8_2.11-2.2.0.jar,pom的依賴配置為:
<!-- <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.1</version>
</dependency>-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.2.0</version>
</dependency>