SparkStreaming專案實戰 從 0 到 1 學習之(1)使用 Kafka + Spark Streaming + Cassandra 構建資料實時處理引擎
Apache Kafka 是一個可擴充套件,高效能,低延遲的平臺,允許我們像訊息系統一樣讀取和寫入資料。我們可以很容易地在 Java 中使用 Kafka。
Spark Streaming 是 Apache Spark 的一部分,是一個可擴充套件、高吞吐、容錯的實時流處理引擎。雖然是使用 Scala 開發的,但是支援 Java API。
Apache Cassandra 是分散式的 NoSQL 資料庫。
準備
在進行下面文章介紹之前,我們需要先建立好 Kafka 的主題以及 Cassandra 的相關表,具體如下:
在 Kafka 中建立名為 messages 的主題
$KAFKA_HOME$\bin\windows\kafka-topics.bat --create \
--zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic messages
|
在 Cassandra 中建立 KeySpace 和 table
CREATE KEYSPACE vocabulary
WITH REPLICATION = {
'class' : 'SimpleStrategy' ,
'replication_factor' : 1
};
USE vocabulary;
CREATE TABLE words (word text PRIMARY KEY, count int); |
上面我們建立了名為 vocabulary 的 KeySpace,以及名為 words 的表。
新增依賴
我們使用 Maven 進行依賴管理,這個專案使用到的依賴如下:
< dependency >
< groupId >org.apache.spark</ groupId >
< artifactId >spark-core_2.11</ artifactId >
< version >2.3.0</ version >
< scope >provided</ scope >
</ dependency >
< dependency >
< groupId >org.apache.spark</ groupId >
< artifactId >spark-sql_2.11</ artifactId >
< version >2.3.0</ version >
< scope >provided</ scope >
</ dependency >
< dependency >
< groupId >org.apache.spark</ groupId >
< artifactId >spark-streaming_2.11</ artifactId >
< version >2.3.0</ version >
< scope >provided</ scope >
</ dependency >
< dependency >
< groupId >org.apache.spark</ groupId >
< artifactId >spark-streaming-kafka-0-10_2.11</ artifactId >
< version >2.3.0</ version >
</ dependency >
< dependency >
< groupId >com.datastax.spark</ groupId >
< artifactId >spark-cassandra-connector_2.11</ artifactId >
< version >2.3.0</ version >
</ dependency >
< dependency >
< groupId >com.datastax.spark</ groupId >
< artifactId >spark-cassandra-connector-java_2.11</ artifactId >
< version >1.5.2</ version >
</ dependency >
|
資料管道開發
我們將使用 Spark 在 Java 中建立一個簡單的應用程式,它將與我們之前建立的Kafka主題整合。應用程式將讀取已釋出的訊息並計算每條訊息中的單詞頻率。 然後將結果更新到 Cassandra 表中。整個資料架構如下:
現在我們來詳細介紹程式碼是如何實現的。
獲取 JavaStreamingContext
Spark Streaming 中的切入點是 JavaStreamingContext,所以我們首先需要獲取這個物件,如下:
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName( "WordCountingApp" );
sparkConf.set( "spark.cassandra.connection.host" , "127.0.0.1" );
JavaStreamingContext streamingContext = new JavaStreamingContext(
sparkConf, Durations.seconds( 1 ));
|
從 Kafka 中讀取資料
有了 JavaStreamingContext 之後,我們就可以從 Kafka 對應主題中讀取實時流資料,如下:
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put( "bootstrap.servers" , "localhost:9092" );
kafkaParams.put( "key.deserializer" , StringDeserializer. class );
kafkaParams.put( "value.deserializer" , StringDeserializer. class );
kafkaParams.put( "group.id" , "use_a_separate_group_id_for_each_stream" );
kafkaParams.put( "auto.offset.reset" , "latest" );
kafkaParams.put( "enable.auto.commit" , false );
Collection<String> topics = Arrays.asList( "messages" );
JavaInputDStream<ConsumerRecord<String, String>> messages =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));
|
我們在程式中提供了 key 和 value 的 deserializer。這個是 Kafka 內建提供的。我們也可以根據自己的需求自定義 deserializer。
處理 DStream
我們在前面只是定義了從 Kafka 中哪張表中獲取資料,這裡我們將介紹如何處理這些獲取的資料:
JavaPairDStream<String, String> results = messages
.mapToPair(
record -> new Tuple2<>(record.key(), record.value())
);
JavaDStream<String> lines = results
.map(
tuple2 -> tuple2._2()
);
JavaDStream<String> words = lines
.flatMap(
x -> Arrays.asList(x.split( "\\s+" )).iterator()
);
JavaPairDStream<String, Integer> wordCounts = words
.mapToPair(
s -> new Tuple2<>(s, 1 )
).reduceByKey(
(i1, i2) -> i1 + i2
);
|
將資料傳送到 Cassandra 中
最後我們需要將結果傳送到 Cassandra 中,程式碼也很簡單。
wordCounts.foreachRDD(
javaRdd -> {
Map<String, Integer> wordCountMap = javaRdd.collectAsMap();
for (String key : wordCountMap.keySet()) {
List<Word> wordList = Arrays.asList( new Word(key, wordCountMap.get(key)));
JavaRDD<Word> rdd = streamingContext.sparkContext().parallelize(wordList);
javaFunctions(rdd).writerBuilder(
"vocabulary" , "words" , mapToRow(Word. class )).saveToCassandra();
}
}
);
|
啟動應用程式
最後,我們需要將這個 Spark Streaming 程式啟動起來,如下:
streamingContext.start();
streamingContext.awaitTermination();
|
使用 Checkpoints
在實時流處理應用中,將每個批次的狀態儲存下來通常很有用。比如在前面的例子中,我們只能計算單詞的當前頻率,如果我們想計算單詞的累計頻率怎麼辦呢?這時候我們就可以使用 Checkpoints。新的資料架構如下
為了啟用 Checkpoints,我們需要進行一些改變,如下:
streamingContext.checkpoint( "./.checkpoint" );
|
這裡我們將 checkpoint 的資料寫入到名為 .checkpoint 的本地目錄中。但是在現實專案中,最好使用 HDFS 目錄。
現在我們可以通過下面的程式碼計算單詞的累計頻率:
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> cumulativeWordCounts = wordCounts
.mapWithState(
StateSpec.function(
(word, one, state) -> {
int sum = one.orElse( 0 ) + (state.exists() ? state.get() : 0 );
Tuple2<String, Integer> output = new Tuple2<>(word, sum);
state.update(sum);
return output;
}
)
);
|
部署應用程式
最後,我們可以使用 spark-submit 來部署我們的應用程式,具體如下:
$SPARK_HOME$\bin\spark-submit \
--class com.baeldung.data.pipeline.WordCountingAppWithCheckpoint \
--master local [2]
\target\spark-streaming-app-0.0.1-SNAPSHOT-jar-with-dependencies.jar
|
最後,我們可以在 Cassandra 中檢視到對應的表中有資料生成了。完整的程式碼可以參見 https://github.com/eugenp/tutorials/tree/master/apache-spark