SparkStreaming HA高可用性
1、UpdateStateByKey、windows等有狀態的操作時,自動進行checkpoint,必須設置checkpoint目錄,數據保留一份在容錯的文件系統中,一旦內存中的數據丟失,可以從文件系統中讀取數據,不需要重新計算。
SparkStreaming.checkpoint("hdfs://ip:port/checkpoint")
2、Driver高可用性(Java版)
第一次在創建和啟動StreamingContext的時候,那麽將持續不斷的產生實時計算的元數據並寫入檢查點,如果driver節點掛掉,那麽可以讓Spark集群自動重啟集群(必須使用yarn cluster模式,spark-submit --deploy-mode cluster --supervise
private static void testDriverHA() {
final Streaming checkpointDir="hdfs://ip:port/checkpoint";
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
@Override
public JavaStreamingContext create() {
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("AdClickRealTimeStatSpark");
JavaStreamingContext jssc = new JavaStreamingContext(
conf, Durations.seconds(5));
jssc.checkpoint(checkpointDir);
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put(Constants.KAFKA_METADATA_BROKER_LIST,
ConfigurationManager.getProperty(Constants.KAFKA_METADATA_BROKER_LIST));
String kafkaTopics = ConfigurationManager.getProperty(Constants.KAFKA_TOPICS);
String[] kafkaTopicsSplited = kafkaTopics.split(",");
Set<String> topics = new HashSet<String>();
for(String kafkaTopic : kafkaTopicsSplited) {
topics.add(kafkaTopic);
}
JavaPairInputDStream<String, String> adRealTimeLogDStream = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topics);
JavaPairDStream<String, String> filteredAdRealTimeLogDStream =
filterByBlacklist(adRealTimeLogDStream);
generateDynamicBlacklist(filteredAdRealTimeLogDStream);
JavaPairDStream<String, Long> adRealTimeStatDStream = calculateRealTimeStat(
filteredAdRealTimeLogDStream);
calculateProvinceTop3Ad(adRealTimeStatDStream);
calculateAdClickCountByWindow(adRealTimeLogDStream);
return jssc;
}
};
JavaStreamingContext context = JavaStreamingContext.getOrCreate(
checkpointDir, contextFactory);
context.start();
context.awaitTermination();
}
3、實現RDD高可用性,啟動WAL預寫日誌機制
sparkStreaming從原理上說,是通過receiver來進行數據接收的,接收到時的數據,會被劃分成一個個的block,block會被組合成batch,針對一個batch,會創建一個Rdd,啟動一個job來執行定義的算子操作。receiver主要接收到數據,那麽就會立即將數據寫入一份到時容錯文件系統(比如hdfs)上的checkpoint目錄中的,一份磁盤文件中去,作為數據的冗余副本。
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("AdClickRealTimeStatSpark")
.set("spark.streaming.receiver.writeAheadLog.enable","true");
SparkStreaming HA高可用性