Spark2.x 快速入門教程 7
Spark Streaming 整合 Kafka
一、實驗介紹
1.1 實驗內容
Kafka是一個分散式的釋出-訂閱式的訊息系統,可以作為 DStream 的高階資料來源,本部分以單擊統計為例介紹 Spark Streaming 程式從 kafka 中消費資料,包括兩部分(基於 Kafka Receiver 方式,基於Kafka Direct方式)。
1.2 先學課程
1.2 先學課程
1.3 實驗知識點
- Kafka Receiver
- Kafka Direct
- Spark Streaming
- Maven
1.4 實驗環境
- Hadoop-2.6.1
- kafka_2.10-0.10.0.0
- Xfce 終端
1.5 適合人群
二、實驗步驟
2.1 Spark Streaming設計設計思想
Spark Streaming 是 Spark 的核心元件之一,為 Spark 提供了可拓展、高吞吐、容錯的流計算能力。如下圖所示,Spark Streaming 可整合多種輸入資料來源,如 Kafka、Flume、HDFS等,經處理後的資料可儲存至檔案系統、資料庫,或顯示在儀表盤裡。
Spark Streaming 最主要的抽象是 DStream(Discretized Stream,離散化資料流),表示連續不斷的資料流。在內部實現上,Spark Streaming 的輸入資料按照時間片(如1秒)分成一段一段的 DStream,每一段資料轉換為 Spark 中的 RDD,並且對 DStream 的操作都最終轉變為對相應的 RDD 的操作。例如,下圖展示了進行單詞統計時,每個時間片的資料(儲存句子的 RDD)經 flatMap 操作,生成了儲存單詞的 RDD。整個流式計算可根據業務的需求對這些中間的結果進一步處理,或者儲存到外部裝置中。
2.2 準備工作
我們已經在實驗樓環境裡下載並配置啟動 hadoop-2.6.1 所需的檔案,免除您配置檔案的麻煩,您可以在 /opt
找到,只需格式化並啟動
hadoop 程序即可。
雙擊開啟桌面上的 Xfce 終端,用 sudo
命令切換到 hadoop 使用者,hadoop 使用者密碼為 hadoop,用 cd
命令進入 /opt
目錄。
$ su hadoop
$ cd /opt/
在 /opt
目錄下格式化 hadoop。
$ hadoop-2.6.1/bin/hdfs namenode -format
在 /opt
目錄下啟動 hadoop 程序。
$ hadoop-2.6 .1/sbin/start-all.sh
用 jps
檢視 hadoop 程序是否啟動。
2.3 下載配置 Kafka
在 /opt
目錄下,用 hadoop 使用者通過 wget
命令下載,並用 tar
解壓。
$ sudo wget http://labfile.oss.aliyuncs.com/courses/785/kafka_2.10-0.10.0.0.tgz
$ sudo tar -zxf kafka_2.10-0.10.0.0.tgz
分別啟動 zookeeper,kafka。
#許可權不足,授權
$ sudo chmod 777 -R kafka_2.10-0.10.0.0
$ cd kafka_2.10-0.10.0.0
#啟動zookeeper
$ bin/zookeeper-server-start.sh config//zookeeper.properties &
#啟動kafka
$ bin/kafka-server-start.sh config/server.properties &
用 jps
命令檢視程序。
用 kafka-topics.sh
指令碼建立主題。
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wc1
三、程式碼實現及測試
注意:本節課實驗是沿用上節課的 scala IDE 環境,pom.xml 不需要修改,需要的 spark-streaming-kafka_2.10
jar 依賴已經新增在裡面。
1). 基於Kafka Receiver方式
選中 cn.com.syl.spark 包 -> 用快捷鍵 Ctrl+N
->搜尋 class -> 選中 java class
-> Next
輸入類名 -> Finish
KafkaReceiverSpark.java
程式碼如下:
package cn.com.syl.spark;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import scala.Tuple2;
public class KafkaReceiverSpark {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("KafkaReceiverSpark");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(6));
// 使用KafkaUtils.createStream()方法,建立 Kafka 的輸入資料流
Map<String, Integer> topicThreadMap = new HashMap<String, Integer>();
topicThreadMap.put("wc1", 1);
JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(
jsc,
"localhost:2181",
"DefaultConsumerGroup",
topicThreadMap);
// wordcount code
JavaDStream<String> words = lines.flatMap(
new FlatMapFunction<Tuple2<String,String>, String>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable<String> call(Tuple2<String, String> tuple)
throws Exception {
return Arrays.asList(tuple._2.split(" "));
}
});
JavaPairDStream<String, Integer> pairs = words.mapToPair(
new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(String word)
throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
wordCounts.print();
jsc.start();
jsc.awaitTermination();
jsc.close();
}
}
啟動 Spark Streaming。
開啟 Xfce 終端啟動 kafka Producer。
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wc1
#輸入任意
快速切換到scala IDE Console 控制檯,螢幕上會顯示程式執行的相關資訊,並會每隔6秒鐘重新整理一次資訊,大量資訊中會包含如下重要資訊,預設只顯示前十條:
同樣地,您也可以再另外開啟 consume 終端。
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic wc1
至此基於 Kafka Receiver 方式整哈Spark Streaming 順利完成。實驗結束後,要關閉各個終端,只要切換到該終端視窗,然後按鍵盤的 Ctrl+C
組合鍵,就可以結束程式執行。
2). 基於Kafka Direct 方式
關於 基於 Kafka Direct 方式,只需要新建一個類 KafkaDirectSpark
,具體程式碼如下:
package cn.com.syl.spark;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
public class KafkaDirectSpark{
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("KafkaDirectSpark");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(6));
// 建立map,新增引數
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list",
"localhost:9092");
// 建立一個集合set,新增讀取的topic
Set<String> topics = new HashSet<String>();
topics.add("wc1");
// 建立輸入DStream
JavaPairInputDStream<String, String> lines = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topics);
// 單詞統計
JavaDStream<String> words = lines.flatMap(
new FlatMapFunction<Tuple2<String,String>, String>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable<String> call(Tuple2<String, String> tuple)
throws Exception {
return Arrays.asList(tuple._2.split(" "));
}
});
JavaPairDStream<String, Integer> pairs = words.mapToPair(
new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
wordCounts.print();
jssc.start();
jssc.awaitTermination();
jssc.close();
}
}
執行方式和上述基於 Kafka Receiver 方式一模一樣,在此就不演示了,請您完成。
補充知識:
假定您是在 windows 平臺寫的程式碼 ,對於上面的程式碼,您完全可以用打 jar
包的方式執行,具體參考上節 Streaming
整合 Flume
四、實驗總結
本節課主要介紹了 Spark Streaming 與 Kafka 的整合的兩種方式,並就 Windows 平臺如何打 jar 包提交到遠端伺服器進行講解,希望學完本節課,能幫助您理解 Spark Streaming,並能很快上手。