1. 程式人生 > 實用技巧 >SparkStreaming專案實戰 從 0 到 1 學習之(1)使用 Kafka + Spark Streaming + Cassandra 構建資料實時處理引擎

SparkStreaming專案實戰 從 0 到 1 學習之(1)使用 Kafka + Spark Streaming + Cassandra 構建資料實時處理引擎

Apache Kafka 是一個可擴充套件,高效能,低延遲的平臺,允許我們像訊息系統一樣讀取和寫入資料。我們可以很容易地在 Java 中使用 Kafka

Spark Streaming 是 Apache Spark 的一部分,是一個可擴充套件、高吞吐、容錯的實時流處理引擎。雖然是使用 Scala 開發的,但是支援 Java API。

Apache Cassandra 是分散式的 NoSQL 資料庫。

準備

在進行下面文章介紹之前,我們需要先建立好 Kafka 的主題以及 Cassandra 的相關表,具體如下:

在 Kafka 中建立名為 messages 的主題

$KAFKA_HOME$\bin\windows\kafka-topics.bat --create \
--zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 \ --topic messages

Cassandra 中建立 KeySpace 和 table

CREATE KEYSPACE vocabulary WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; USE vocabulary; CREATE TABLE words (word text PRIMARY KEY, count int);

上面我們建立了名為 vocabulary 的 KeySpace,以及名為 words 的表。

新增依賴

我們使用 Maven 進行依賴管理,這個專案使用到的依賴如下:

<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.3.0</version> <scope>provided</scope> </dependency
> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.3.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>com.datastax.spark</groupId> <artifactId>spark-cassandra-connector_2.11</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>com.datastax.spark</groupId> <artifactId>spark-cassandra-connector-java_2.11</artifactId> <version>1.5.2</version> </dependency>

資料管道開發

我們將使用 Spark 在 Java 中建立一個簡單的應用程式,它將與我們之前建立的Kafka主題整合。應用程式將讀取已釋出的訊息並計算每條訊息中的單詞頻率。 然後將結果更新到 Cassandra 表中。整個資料架構如下:

現在我們來詳細介紹程式碼是如何實現的。

獲取 JavaStreamingContext

Spark Streaming 中的切入點是 JavaStreamingContext,所以我們首先需要獲取這個物件,如下:

SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("WordCountingApp"); sparkConf.set("spark.cassandra.connection.host", "127.0.0.1"); JavaStreamingContext streamingContext = new JavaStreamingContext( sparkConf, Durations.seconds(1));

從 Kafka 中讀取資料

有了 JavaStreamingContext 之後,我們就可以從 Kafka 對應主題中讀取實時流資料,如下:

Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put("bootstrap.servers", "localhost:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false); Collection<String> topics = Arrays.asList("messages"); JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream( streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));

我們在程式中提供了 key 和 value 的 deserializer。這個是 Kafka 內建提供的。我們也可以根據自己的需求自定義 deserializer。

處理 DStream

我們在前面只是定義了從 Kafka 中哪張表中獲取資料,這裡我們將介紹如何處理這些獲取的資料:

JavaPairDStream<String, String> results = messages .mapToPair( record -> new Tuple2<>(record.key(), record.value()) ); JavaDStream<String> lines = results .map( tuple2 -> tuple2._2() ); JavaDStream<String> words = lines .flatMap( x -> Arrays.asList(x.split("\\s+")).iterator() ); JavaPairDStream<String, Integer> wordCounts = words .mapToPair( s -> new Tuple2<>(s, 1) ).reduceByKey( (i1, i2) -> i1 + i2 );

將資料傳送到 Cassandra 中

最後我們需要將結果傳送到 Cassandra 中,程式碼也很簡單。

wordCounts.foreachRDD( javaRdd -> { Map<String, Integer> wordCountMap = javaRdd.collectAsMap(); for (String key : wordCountMap.keySet()) { List<Word> wordList = Arrays.asList(new Word(key, wordCountMap.get(key))); JavaRDD<Word> rdd = streamingContext.sparkContext().parallelize(wordList); javaFunctions(rdd).writerBuilder( "vocabulary", "words", mapToRow(Word.class)).saveToCassandra(); } } );

啟動應用程式

最後,我們需要將這個 Spark Streaming 程式啟動起來,如下:

streamingContext.start(); streamingContext.awaitTermination();

使用 Checkpoints

在實時流處理應用中,將每個批次的狀態儲存下來通常很有用。比如在前面的例子中,我們只能計算單詞的當前頻率,如果我們想計算單詞的累計頻率怎麼辦呢?這時候我們就可以使用 Checkpoints。新的資料架構如下

為了啟用 Checkpoints,我們需要進行一些改變,如下:

streamingContext.checkpoint("./.checkpoint");

這裡我們將 checkpoint 的資料寫入到名為 .checkpoint 的本地目錄中。但是在現實專案中,最好使用 HDFS 目錄。

現在我們可以通過下面的程式碼計算單詞的累計頻率:

JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> cumulativeWordCounts = wordCounts .mapWithState( StateSpec.function( (word, one, state) -> { int sum = one.orElse(0) + (state.exists() ? state.get() : 0); Tuple2<String, Integer> output = new Tuple2<>(word, sum); state.update(sum); return output; } ) );

部署應用程式

最後,我們可以使用 spark-submit 來部署我們的應用程式,具體如下:

$SPARK_HOME$\bin\spark-submit \ --class com.baeldung.data.pipeline.WordCountingAppWithCheckpoint \ --master local[2] \target\spark-streaming-app-0.0.1-SNAPSHOT-jar-with-dependencies.jar

最後,我們可以在 Cassandra 中檢視到對應的表中有資料生成了。完整的程式碼可以參見 https://github.com/eugenp/tutorials/tree/master/apache-spark