1. 程式人生 > >sparkstreaming+kafka+redis+hbase消費kafka的資料實現exactly-once的語義

sparkstreaming+kafka+redis+hbase消費kafka的資料實現exactly-once的語義

最近在做實時流處理的一個專案,遇到N多問題,經過不斷的除錯,終於有點進展,記錄一下,防止後人遇到同樣的問題.

1,sparkstreaming消費kafka有兩種方法,這裡我就不介紹了,網上關於這方面的資料很多,我就簡單說一下兩者的區別吧,

(1)基於receiver的方式,是使用Kafka的高階API來在ZooKeeper中儲存消費過的offset的。這是消費Kafka資料的傳統方式。這種方式配合著WAL機制可以保證資料零丟失的高可靠性,但是卻無法保證資料被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的.

(2)基於direct的方式,使用kafka的簡單api,Spark Streaming自己就負責追蹤消費的offset,並儲存在checkpoint中。Spark自己一定是同步的,因此可以保證資料是消費一次且僅消費一次,在實際生產環境中大都用Direct方式.

特別說明一下:kafka0.9之前,offest都是儲存在zk中,有zk來維護的,0.9之後kafka的offest儲存在kafka的 __consumer_offsets 這個topic中了.

2,手動維護kafka的offest.

(1),為了實現exactly-once的語義,我採用自己儲存offest的方法,offest可以儲存在zk,kafka,mysql,hbase,redis中自己根據情況而定,我選擇把offest儲存到redis中.建立Dstream之前,先判斷是否消費過,如果沒有消費就從頭開始,如果已經消費過了,就從上次儲存的offest處開始消費,廢話不多說,直接上程式碼.(因程式碼有點多,就只貼了重要的部分)

(2),spark版本2.2.0,scala版本2.11.8,kafka版本0.10.1,hbase版本1.1.2.

程式碼如下:

package test

import java.util
import kafka.{PropertiesScalaUtils, RedisKeysListUtils}
import kafka.SparkStreamingKafka.{dbIndex, kafkaStreams}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import redis.RedisPool

object sparkstreaming {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.INFO)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.INFO)
    Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.INFO)
    val conf = new SparkConf().setAppName("sparkstreaming")
    conf.set("spark.streaming.kafka.maxRatePerPartition", "2000")
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.set("spark.streaming.concurrentJobs", "10")
    conf.set("spark.streaming.kafka.maxRetries", "50")
    val scc = new StreamingContext(conf, Seconds(5))
    val topic = PropertiesScalaUtils.loadProperties("topic")
    val topicSet: Set[String] = Set(topic)
    val kafkaParams = Map[String, Object](
      "auto.offset.reset" -> "latest",
      "value.deserializer" -> classOf[StringDeserializer]
      , "key.deserializer" -> classOf[StringDeserializer]
      , "bootstrap.servers" -> PropertiesScalaUtils.loadProperties("broker")
      , "group.id" -> PropertiesScalaUtils.loadProperties("groupId")
      , "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val maxTotal = 200
    val maxIdle = 100
    val minIdle = 10
    val testOnBorrow = false
    val testOnReturn = false
    val maxWaitMillis = 500
    RedisPool.makePool(PropertiesScalaUtils.loadProperties("redisHost"), PropertiesScalaUtils.loadProperties("redisPort").toInt, maxTotal, maxIdle, minIdle, testOnBorrow, testOnReturn, maxWaitMillis)
    val jedis = RedisPool.getPool.getResource
    jedis.select(dbIndex)
    val keys: util.Set[String] = jedis.keys(topic + "*")
    if (keys.size() == 0) {
      kafkaStreams = KafkaUtils.createDirectStream[String, String](
        scc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))
    } else {
      val fromOffsets: Map[TopicPartition, Long] = RedisKeysListUtils.getKeysList(PropertiesScalaUtils.loadProperties("redisHost"), PropertiesScalaUtils.loadProperties("redisPort").toInt, topic)
      kafkaStreams = KafkaUtils.createDirectStream[String, String](
        scc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, fromOffsets))
    }
    RedisPool.getPool.returnResource(jedis)
    kafkaStreams.foreachRDD(rdd=>{
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd.foreachPartition(partiton=>{
        val conf = HBaseConfiguration.create()
        conf.set("hbase.zookeeper.quorum", PropertiesScalaUtils.loadProperties("zk_hbase")) //zk的地址;
        conf.set("hbase.zookeeper.property.clientPort", PropertiesScalaUtils.loadProperties("zk_port"))
        conf.set("hbase.master", PropertiesScalaUtils.loadProperties("hbase_master"))
        conf.set("hbase.defaults.for.version.skip", "true")
        conf.set("hhbase.rootdir", PropertiesScalaUtils.loadProperties("hbase_rootdir"))
        conf.set("zookeeper.znode.parent", PropertiesScalaUtils.loadProperties("zookeeper_znode_parent"))
        myTable = new HTable(conf, TableName.valueOf(PropertiesScalaUtils.loadProperties("hbase_table")))
        myTable.setAutoFlush(false, false) //關閉自動提交
        myTable.setWriteBufferSize(3 * 1024 * 1024)
        partiton.foreach(pair=>{
          //自己的處理邏輯;
        })
        myTable.flushCommits()
        myTable.close()
        offsetRanges.foreach { offsetRange =>
          println("partition : " + offsetRange.partition + " fromOffset:  " + offsetRange.fromOffset + " untilOffset: " + offsetRange.untilOffset)
          val topic_partition_key_new = offsetRange.topic + "_" + offsetRange.partition
          jedis_jason.set(topic_partition_key_new, offsetRange.untilOffset + "")
        })
      })
      scc.start()
      scc.awaitTermination()
    }
  }

程式碼已經測試過了,沒有問題,如果有寫的不對的地方,歡迎大家指正,如果有什麼疑問,可以加QQ群:340297350,謝謝

相關推薦

sparkstreaming+kafka+redis+hbase消費kafka資料實現exactly-once語義

最近在做實時流處理的一個專案,遇到N多問題,經過不斷的除錯,終於有點進展,記錄一下,防止後人遇到同樣的問題. 1,sparkstreaming消費kafka有兩種方法,這裡我就不介紹了,網上關於這方面的資料很多,我就簡單說一下兩者的區別吧, (1)基於receiver的方

sparkstreaming同時消費多個topic的資料實現exactly-once語義

import java.io.File import kafka.{PropertiesScalaUtils, RedisKeysListUtils} import kafka.streamingRedisHive.{dbIndex} import org.apache.kafka.clients.consu

【Big Data 每日一題20180922】sparkstreaming同時消費多個topic的資料實現exactly-once語義

最近很多人問我,sparkstreaming怎麼消費多個topic的資料,自己維護offest,其實這個跟消費一個topic是一樣的,但還是有很多問我,今天就簡單的寫一個demo,供大家參考,直接上程式碼吧,已經測試過了.我把offest存到redis裡了,當然也可以儲存在z

Kafka 0.11.0.0 是如何實現 Exactly-once 語義

很高興地告訴大家,具備新的里程碑意義的功能的Kafka 0.11.x版本(對應 Confluent Platform 3.3)已經release,該版本引入了exactly-once語義,本文闡述的內容包括:Apache Kafka的exactly-once語義;為什麼exactly-once是一個很難解決的

Spark Streaming 中如何實現 Exactly-Once 語義

Exactly-once 語義是實時計算的難點之一。要做到每一條記錄只會被處理一次,即使伺服器或網路發生故障時也能保證沒有遺漏,這不僅需要實時計算框架本身的支援,還對上游的訊息系統、下游的資料儲存有所要求。此外,我們在編寫計算流程時也需要遵循一定規範,才能真正實

[spark-streaming,kafka] Exactly-once 語義實現設計文件

kafka 版本 0.8.x spark 版本 1.3 文章連結址: 翻譯原因: 0.8 的 kafka 版本中, 所有 topic partition 的 offset 消費記錄集中儲存在 zookeeper 上,而 spark-streaming 中資料

Kafka 0.11.0.0 實現 producer的Exactly-once 語義(中文)

很高興地告訴大家,具備新的里程碑意義的功能的Kafka 0.11.x版本(對應 Confluent Platform 3.3)已經release,該版本引入了exactly-once語義,本文闡述的內容包括: Apache Kafka的exactly-once語義; 為什麼exactly-once是一個很

Kafka 0.11.0.0 實現 producer的Exactly-once 語義(英文)

Exactly-once Semantics are Possible: Here’s How Kafka Does it I’m thrilled that we have hit an exciting milestone the Kafka community has long been waiting

Kafka 0.11.0.0 實現 producer的Exactly-once 語義(官方DEMO)

A Kafka client that publishes records to the Kafka cluster. The producer is thread safe and sharing a single producer instance across threads will generall

Kafka設計解析(八)- Exactly Once語義與事務機制原理

1 寫在前面的話 本文所有Kafka原理性的描述除特殊說明外均基於Kafka 1.0.0版本。 2 為什麼要提供事務機制 Kafka事務機制的實現主要是為了支援 Exactly Once即正好一次語義操作的原子性有狀態操作的可恢復性2.1 Exactly Once 《Kafka背景及架構介紹》一文

[Kafka設計解析]--(八)Exactly Once語義與事務機制原理

寫在前面的話本文所有Kafka原理性的描述除特殊說明外均基於Kafka 1.0.0版本。為什麼要提供事務機制Kafka事務機制的實現主要是為了支援Exactly Once即正好一次語義操作的原子性有狀態操作的可恢復性Exactly Once《Kafka背景及架構介紹》一文中有

SparkStreaming消費Kafka中的資料 使用zookeeper和MySQL儲存偏移量的兩種方式

Spark讀取Kafka資料的方式有兩種,一種是receiver方式,另一種是直連方式。今天分享的SparkStreaming消費Kafka中的資料儲存偏移量的兩種方式都是基於直連方式上的 話不多說 直接上程式碼 ! 第一種是使用zookeeper儲存偏移量 object Kafka

Spark Streaming消費Kafka Direct方式資料零丟失實現

一、概述 上次寫這篇文章文章的時候,Spark還是1.x,kafka還是0.8x版本,轉眼間spark到了2.x,kafka也到了2.x,儲存offset的方式也發生了改變,筆者根據上篇文章和網上文章,將offset儲存到Redis,既保證了併發也保證了資料不丟失,經過測試,有效。 二、

簡單Storm消費Kafka資料並存儲到redis例項(訂單資訊處理)

maven依賴 <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId&g

Kafka程式碼實現--from-beginning,讀取歷史未消費資料

Kafka實際環境有可能會出現Consumer全部宕機,雖然基於Kafka的高可用特性,消費者群組中的消費者可以實現再均衡,所有Consumer不處理資料的情況很少,但是還是有可能會出現,此時就要求Consumer重啟的時候能夠讀取在宕機期間Producer傳送的資料。基於消費者訂閱模式預設

structuredstreaming消費kafka資料實現wordcount

最近也是有很多同學問我,StructuredStreaming結合kafka的使用,我簡單的寫了一個wordcount的demo,後續會有更加具體和詳細的介紹,今天先來一個簡單的demo吧.程式碼測試過了,可以執行. package spark import org.ap

SparkStreaming消費kafka數據

字符串 targe val offset 1.0 error .org 依賴 oot 概要:本例子為SparkStreaming消費kafka消息的例子,實現的功能是將數據實時的進行抽取、過濾、轉換,然後存儲到HDFS中。 實例代碼 package com.fwmagic.

【原始碼追蹤】SparkStreaming 中用 Direct 方式每次從 Kafka 拉取多少條資料(offset取值範圍)

我們知道 SparkStreaming 用 Direct 的方式拉取 Kafka 資料時,是根據 kafka 中的 fromOffsets 和 untilOffsets 來進行獲取資料的,而 fromOffsets 一般都是需要我們自己管理的,而每批次的 untilOffsets 是由

資料之Spark(六)--- Spark Streaming介紹,DStream,Receiver,Streamin整合Kafka,Windows,容錯的實現

一、Spark Streaming介紹 ----------------------------------------------------------- 1.介紹 是spark core的擴充套件,針對實時資料的實時流處理技術 具有可擴充套件、高吞吐量、

Spark 消費Kafka資料

spark RDD消費的哦,不是spark streaming。 導maven包: 注意版本哦,要跟自己機器的一致 <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->