sparkstreaming+kafka+redis+hbase消費kafka的資料實現exactly-once的語義
最近在做實時流處理的一個專案,遇到N多問題,經過不斷的除錯,終於有點進展,記錄一下,防止後人遇到同樣的問題.
1,sparkstreaming消費kafka有兩種方法,這裡我就不介紹了,網上關於這方面的資料很多,我就簡單說一下兩者的區別吧,
(1)基於receiver的方式,是使用Kafka的高階API來在ZooKeeper中儲存消費過的offset的。這是消費Kafka資料的傳統方式。這種方式配合著WAL機制可以保證資料零丟失的高可靠性,但是卻無法保證資料被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的.
(2)基於direct的方式,使用kafka的簡單api,Spark Streaming自己就負責追蹤消費的offset,並儲存在checkpoint中。Spark自己一定是同步的,因此可以保證資料是消費一次且僅消費一次,在實際生產環境中大都用Direct方式.
特別說明一下:kafka0.9之前,offest都是儲存在zk中,有zk來維護的,0.9之後kafka的offest儲存在kafka的 __consumer_offsets 這個topic中了.
2,手動維護kafka的offest.
(1),為了實現exactly-once的語義,我採用自己儲存offest的方法,offest可以儲存在zk,kafka,mysql,hbase,redis中自己根據情況而定,我選擇把offest儲存到redis中.建立Dstream之前,先判斷是否消費過,如果沒有消費就從頭開始,如果已經消費過了,就從上次儲存的offest處開始消費,廢話不多說,直接上程式碼.(因程式碼有點多,就只貼了重要的部分)
(2),spark版本2.2.0,scala版本2.11.8,kafka版本0.10.1,hbase版本1.1.2.
程式碼如下:
package test import java.util import kafka.{PropertiesScalaUtils, RedisKeysListUtils} import kafka.SparkStreamingKafka.{dbIndex, kafkaStreams} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import redis.RedisPool object sparkstreaming { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.INFO) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.INFO) Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.INFO) val conf = new SparkConf().setAppName("sparkstreaming") conf.set("spark.streaming.kafka.maxRatePerPartition", "2000") conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.set("spark.streaming.concurrentJobs", "10") conf.set("spark.streaming.kafka.maxRetries", "50") val scc = new StreamingContext(conf, Seconds(5)) val topic = PropertiesScalaUtils.loadProperties("topic") val topicSet: Set[String] = Set(topic) val kafkaParams = Map[String, Object]( "auto.offset.reset" -> "latest", "value.deserializer" -> classOf[StringDeserializer] , "key.deserializer" -> classOf[StringDeserializer] , "bootstrap.servers" -> PropertiesScalaUtils.loadProperties("broker") , "group.id" -> PropertiesScalaUtils.loadProperties("groupId") , "enable.auto.commit" -> (false: java.lang.Boolean) ) val maxTotal = 200 val maxIdle = 100 val minIdle = 10 val testOnBorrow = false val testOnReturn = false val maxWaitMillis = 500 RedisPool.makePool(PropertiesScalaUtils.loadProperties("redisHost"), PropertiesScalaUtils.loadProperties("redisPort").toInt, maxTotal, maxIdle, minIdle, testOnBorrow, testOnReturn, maxWaitMillis) val jedis = RedisPool.getPool.getResource jedis.select(dbIndex) val keys: util.Set[String] = jedis.keys(topic + "*") if (keys.size() == 0) { kafkaStreams = KafkaUtils.createDirectStream[String, String]( scc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams)) } else { val fromOffsets: Map[TopicPartition, Long] = RedisKeysListUtils.getKeysList(PropertiesScalaUtils.loadProperties("redisHost"), PropertiesScalaUtils.loadProperties("redisPort").toInt, topic) kafkaStreams = KafkaUtils.createDirectStream[String, String]( scc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, fromOffsets)) } RedisPool.getPool.returnResource(jedis) kafkaStreams.foreachRDD(rdd=>{ val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreachPartition(partiton=>{ val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", PropertiesScalaUtils.loadProperties("zk_hbase")) //zk的地址; conf.set("hbase.zookeeper.property.clientPort", PropertiesScalaUtils.loadProperties("zk_port")) conf.set("hbase.master", PropertiesScalaUtils.loadProperties("hbase_master")) conf.set("hbase.defaults.for.version.skip", "true") conf.set("hhbase.rootdir", PropertiesScalaUtils.loadProperties("hbase_rootdir")) conf.set("zookeeper.znode.parent", PropertiesScalaUtils.loadProperties("zookeeper_znode_parent")) myTable = new HTable(conf, TableName.valueOf(PropertiesScalaUtils.loadProperties("hbase_table"))) myTable.setAutoFlush(false, false) //關閉自動提交 myTable.setWriteBufferSize(3 * 1024 * 1024) partiton.foreach(pair=>{ //自己的處理邏輯; }) myTable.flushCommits() myTable.close() offsetRanges.foreach { offsetRange => println("partition : " + offsetRange.partition + " fromOffset: " + offsetRange.fromOffset + " untilOffset: " + offsetRange.untilOffset) val topic_partition_key_new = offsetRange.topic + "_" + offsetRange.partition jedis_jason.set(topic_partition_key_new, offsetRange.untilOffset + "") }) }) scc.start() scc.awaitTermination() } }
程式碼已經測試過了,沒有問題,如果有寫的不對的地方,歡迎大家指正,如果有什麼疑問,可以加QQ群:340297350,謝謝
相關推薦
sparkstreaming+kafka+redis+hbase消費kafka的資料實現exactly-once的語義
最近在做實時流處理的一個專案,遇到N多問題,經過不斷的除錯,終於有點進展,記錄一下,防止後人遇到同樣的問題. 1,sparkstreaming消費kafka有兩種方法,這裡我就不介紹了,網上關於這方面的資料很多,我就簡單說一下兩者的區別吧, (1)基於receiver的方
sparkstreaming同時消費多個topic的資料實現exactly-once的語義
import java.io.File import kafka.{PropertiesScalaUtils, RedisKeysListUtils} import kafka.streamingRedisHive.{dbIndex} import org.apache.kafka.clients.consu
【Big Data 每日一題20180922】sparkstreaming同時消費多個topic的資料實現exactly-once的語義
最近很多人問我,sparkstreaming怎麼消費多個topic的資料,自己維護offest,其實這個跟消費一個topic是一樣的,但還是有很多問我,今天就簡單的寫一個demo,供大家參考,直接上程式碼吧,已經測試過了.我把offest存到redis裡了,當然也可以儲存在z
Kafka 0.11.0.0 是如何實現 Exactly-once 語義的
很高興地告訴大家,具備新的里程碑意義的功能的Kafka 0.11.x版本(對應 Confluent Platform 3.3)已經release,該版本引入了exactly-once語義,本文闡述的內容包括:Apache Kafka的exactly-once語義;為什麼exactly-once是一個很難解決的
Spark Streaming 中如何實現 Exactly-Once 語義
Exactly-once 語義是實時計算的難點之一。要做到每一條記錄只會被處理一次,即使伺服器或網路發生故障時也能保證沒有遺漏,這不僅需要實時計算框架本身的支援,還對上游的訊息系統、下游的資料儲存有所要求。此外,我們在編寫計算流程時也需要遵循一定規範,才能真正實
[spark-streaming,kafka] Exactly-once 語義實現設計文件
kafka 版本 0.8.x spark 版本 1.3 文章連結址: 翻譯原因: 0.8 的 kafka 版本中, 所有 topic partition 的 offset 消費記錄集中儲存在 zookeeper 上,而 spark-streaming 中資料
Kafka 0.11.0.0 實現 producer的Exactly-once 語義(中文)
很高興地告訴大家,具備新的里程碑意義的功能的Kafka 0.11.x版本(對應 Confluent Platform 3.3)已經release,該版本引入了exactly-once語義,本文闡述的內容包括: Apache Kafka的exactly-once語義; 為什麼exactly-once是一個很
Kafka 0.11.0.0 實現 producer的Exactly-once 語義(英文)
Exactly-once Semantics are Possible: Here’s How Kafka Does it I’m thrilled that we have hit an exciting milestone the Kafka community has long been waiting
Kafka 0.11.0.0 實現 producer的Exactly-once 語義(官方DEMO)
A Kafka client that publishes records to the Kafka cluster. The producer is thread safe and sharing a single producer instance across threads will generall
Kafka設計解析(八)- Exactly Once語義與事務機制原理
1 寫在前面的話 本文所有Kafka原理性的描述除特殊說明外均基於Kafka 1.0.0版本。 2 為什麼要提供事務機制 Kafka事務機制的實現主要是為了支援 Exactly Once即正好一次語義操作的原子性有狀態操作的可恢復性2.1 Exactly Once 《Kafka背景及架構介紹》一文
[Kafka設計解析]--(八)Exactly Once語義與事務機制原理
寫在前面的話本文所有Kafka原理性的描述除特殊說明外均基於Kafka 1.0.0版本。為什麼要提供事務機制Kafka事務機制的實現主要是為了支援Exactly Once即正好一次語義操作的原子性有狀態操作的可恢復性Exactly Once《Kafka背景及架構介紹》一文中有
SparkStreaming消費Kafka中的資料 使用zookeeper和MySQL儲存偏移量的兩種方式
Spark讀取Kafka資料的方式有兩種,一種是receiver方式,另一種是直連方式。今天分享的SparkStreaming消費Kafka中的資料儲存偏移量的兩種方式都是基於直連方式上的 話不多說 直接上程式碼 ! 第一種是使用zookeeper儲存偏移量 object Kafka
Spark Streaming消費Kafka Direct方式資料零丟失實現
一、概述 上次寫這篇文章文章的時候,Spark還是1.x,kafka還是0.8x版本,轉眼間spark到了2.x,kafka也到了2.x,儲存offset的方式也發生了改變,筆者根據上篇文章和網上文章,將offset儲存到Redis,既保證了併發也保證了資料不丟失,經過測試,有效。 二、
簡單Storm消費Kafka資料並存儲到redis例項(訂單資訊處理)
maven依賴 <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId&g
Kafka程式碼實現--from-beginning,讀取歷史未消費的資料
Kafka實際環境有可能會出現Consumer全部宕機,雖然基於Kafka的高可用特性,消費者群組中的消費者可以實現再均衡,所有Consumer不處理資料的情況很少,但是還是有可能會出現,此時就要求Consumer重啟的時候能夠讀取在宕機期間Producer傳送的資料。基於消費者訂閱模式預設
structuredstreaming消費kafka的資料實現wordcount
最近也是有很多同學問我,StructuredStreaming結合kafka的使用,我簡單的寫了一個wordcount的demo,後續會有更加具體和詳細的介紹,今天先來一個簡單的demo吧.程式碼測試過了,可以執行. package spark import org.ap
SparkStreaming消費kafka數據
字符串 targe val offset 1.0 error .org 依賴 oot 概要:本例子為SparkStreaming消費kafka消息的例子,實現的功能是將數據實時的進行抽取、過濾、轉換,然後存儲到HDFS中。 實例代碼 package com.fwmagic.
【原始碼追蹤】SparkStreaming 中用 Direct 方式每次從 Kafka 拉取多少條資料(offset取值範圍)
我們知道 SparkStreaming 用 Direct 的方式拉取 Kafka 資料時,是根據 kafka 中的 fromOffsets 和 untilOffsets 來進行獲取資料的,而 fromOffsets 一般都是需要我們自己管理的,而每批次的 untilOffsets 是由
大資料之Spark(六)--- Spark Streaming介紹,DStream,Receiver,Streamin整合Kafka,Windows,容錯的實現
一、Spark Streaming介紹 ----------------------------------------------------------- 1.介紹 是spark core的擴充套件,針對實時資料的實時流處理技術 具有可擴充套件、高吞吐量、
Spark 消費Kafka資料
spark RDD消費的哦,不是spark streaming。 導maven包: 注意版本哦,要跟自己機器的一致 <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->