1. 程式人生 > >實時流計算、Spark Streaming、Kafka、Redis、Exactly-once、實時去重

實時流計算、Spark Streaming、Kafka、Redis、Exactly-once、實時去重

http://lxw1234.com/archives/2018/02/901.htm

在實時流式計算中,最重要的是在任何情況下,訊息不重複、不丟失,即Exactly-once。本文以Kafka–>Spark Streaming–>Redis為例,一方面說明一下如何做到Exactly-once,另一方面說明一下我是如何計算實時去重指標的。

spark streaming

1. 關於資料來源

資料來源是文字格式的日誌,由Nginx產生,存放於日誌伺服器上。在日誌伺服器上部署Flume Agent,使用TAILDIR Source和Kafka Sink,將日誌採集到Kafka進行臨時儲存。日誌格式如下:

2018-02-22T00:00:00+08:00|~|200|~|/test?pcid

=DEIBAH&siteid=3

2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=GLLIEG&siteid=3

2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=HIJMEC&siteid=8

2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=HMGBDE&siteid=3

2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=HIJFLA&siteid=4

2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=JCEBBC&siteid=9

2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=KJLAKG&siteid=8

2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=FHEIKI&siteid=3

2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=IGIDLB&siteid=3

2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=IIIJCD&siteid=5

日誌是由測試程式模擬產生的,欄位之間由|~|分隔。

2. 實時計算需求

分天、分小時PV;

分天、分小時、分網站(siteid)PV;

分天 UV;

3. Spark Streaming消費Kafka資料

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

在Spark Streaming中消費Kafka資料,保證Exactly-once的核心有三點:

使用Direct方式連線Kafka;自己儲存和維護Offset;更新Offset和計算在同一事務中完成;

後面的Spark Streaming程式(文章結尾),主要有以下步驟:

  1. 啟動後,先從Redis中獲取上次儲存的Offset,Redis中的key為”topic_partition”,即每個分割槽維護一個Offset;
  2. 使用獲取到的Offset,建立DirectStream;
  3. 在處理每批次的訊息時,利用Redis的事務機制,確保在Redis中指標的計算和Offset的更新維護,在同一事務中完成。只有這兩者同步,才能真正保證訊息的Exactly-once。
  1. ./spark-submit \
  2. --class com.lxw1234.spark.TestSparkStreaming \
  3. --master local[2] \
  4. --conf spark.streaming.kafka.maxRatePerPartition=20000 \
  5. --jars /data1/home/dmp/lxw/realtime/commons-pool2-2.3.jar,\
  6. /data1/home/dmp/lxw/realtime/jedis-2.9.0.jar,\
  7. /data1/home/dmp/lxw/realtime/kafka-clients-0.11.0.1.jar,\
  8. /data1/home/dmp/lxw/realtime/spark-streaming-kafka-0-10_2.11-2.2.1.jar \
  9. /data1/home/dmp/lxw/realtime/testsparkstreaming.jar \
  10. --executor-memory 4G \
  11. --num-executors 1

在啟動Spark Streaming程式時候,有個引數最好指定:

spark.streaming.kafka.maxRatePerPartition=20000(每秒鐘從topic的每個partition最多消費的訊息條數)

如果程式第一次執行,或者因為某種原因暫停了很久重新啟動時候,會積累很多訊息,如果這些訊息同時被消費,很有可能會因為記憶體不夠而掛掉,因此,需要根據實際的資料量大小,以及批次的間隔時間來設定該引數,以限定批次的訊息量。

如果該引數設定20000,而批次間隔時間未10秒,那麼每個批次最多從Kafka中消費20萬訊息。

4. Redis中的資料模型

  • 分小時、分網站PV

普通K-V結構,計算時候使用incr命令遞增,

Key為 “site_pv_網站ID_小時”,

如:site_pv_9_2018-02-21-00、site_pv_10_2018-02-21-01

該資料模型用於計算分網站的按小時及按天PV。

  • 分小時PV

普通K-V結構,計算時候使用incr命令遞增,

Key為“pv_小時”,如:pv_2018-02-21-14、pv_2018-02-22-03

該資料模型用於計算按小時及按天總PV。

  • 分天UV

Set結構,計算時候使用sadd命令新增,

Key為”uv_天”,如:uv_2018-02-21、uv_2018-02-20

該資料模型使用者計算按天UV(獲取時候使用SCARD命令獲取Set元素個數)

注:這些Key對應的時間,均由實際訊息中的第一個欄位(時間)而定。

5. 故障恢復

如果Spark Streaming程式因為停電、網路等意外情況終止而需要恢復,則直接重啟即可;

如果因為其他原因需要重新計算某一時間段的訊息,可以先刪除Redis中對應時間段內的Key,然後從原始日誌中擷取該時間段內的訊息,當做新訊息新增至Kafka,由Spark Streaming程式重新消費並進行計算;

6. 附程式

依賴jar包:

commons-pool2-2.3.jar

jedis-2.9.0.jar

kafka-clients-0.11.0.1.jar

spark-streaming-kafka-0-10_2.11-2.2.1.jar

InternalRedisClient (Redis連結池)

  1. package com.lxw1234.spark
  2. import redis.clients.jedis.JedisPool
  3. import org.apache.commons.pool2.impl.GenericObjectPoolConfig
  4. /**
  5. * @author lxw1234
  6. */
  7. /**
  8. * Internal Redis client for managing Redis connection {@link Jedis} based on {@link RedisPool}
  9. */
  10. objectInternalRedisClientextendsSerializable{
  11. @transientprivatevar pool:JedisPool=null
  12. def makePool(redisHost:String, redisPort:Int, redisTimeout:Int,
  13. maxTotal:Int, maxIdle:Int, minIdle:Int):Unit={
  14. makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle,true,false,10000)
  15. }
  16. def makePool(redisHost:String, redisPort:Int, redisTimeout:Int,
  17. maxTotal:Int, maxIdle:Int, minIdle:Int, testOnBorrow:Boolean,
  18. testOnReturn:Boolean, maxWaitMillis:Long):Unit={
  19. if(pool ==null){
  20. val poolConfig =newGenericObjectPoolConfig()
  21. poolConfig.setMaxTotal(maxTotal)
  22. poolConfig.setMaxIdle(maxIdle)
  23. poolConfig.setMinIdle(minIdle)
  24. poolConfig.setTestOnBorrow(testOnBorrow)
  25. poolConfig.setTestOnReturn(testOnReturn)
  26. poolConfig.setMaxWaitMillis(maxWaitMillis)
  27. pool =newJedisPool(poolConfig, redisHost, redisPort, redisTimeout)
  28. val hook =newThread{
  29. overridedef run = pool.destroy()
  30. }
  31. sys.addShutdownHook(hook.run)
  32. }
  33. }
  34. def getPool:JedisPool={
  35. assert(pool !=null)
  36. pool
  37. }
  38. }

TestSparkStreaming

  1. package com.lxw1234.spark
  2. import org.apache.kafka.clients.consumer.ConsumerRecord
  3. import org.apache.kafka.common.TopicPartition
  4. import org.apache.kafka.common.serialization.StringDeserializer
  5. import org.apache.spark.SparkConf
  6. import org.apache.spark.rdd.RDD
  7. import org.apache.spark.streaming.Seconds
  8. import org.apache.spark.streaming.StreamingContext
  9. import org.apache.spark.streaming.kafka010.ConsumerStrategies
  10. import org.apache.spark.streaming.kafka010.HasOffsetRanges
  11. import org.apache.spark.streaming.kafka010.KafkaUtils
  12. import org.apache.spark.streaming.kafka010.LocationStrategies
  13. import redis.clients.jedis.Pipeline
  14. /**
  15. * @author lxw1234
  16. * 獲取topic最小的offset
  17. * ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list datadev1:9092 --topic lxw1234 --time -2
  18. */
  19. objectTestSparkStreaming{
  20. def main(args :Array[String]):Unit={
  21. val brokers ="datadev1:9092"
  22. val topic ="lxw1234"
  23. val partition :Int=0//測試topic只有一個分割槽
  24. val start_offset :Long=0l
  25. //Kafka引數
  26. val kafkaParams =Map[String,Object](
  27. "bootstrap.servers"-> brokers,
  28. "key.deserializer"-> classOf[StringDeserializer],
  29. "value.deserializer"-> classOf[StringDeserializer],
  30. "group.id"->"exactly-once",
  31. "enable.auto.commit"->(false: java.lang.Boolean),
  32. "auto.offset.reset"->"none"
  33. )
  34. // Redis configurations
  35. val maxTotal =10
  36. val maxIdle =10
  37. val minIdle =1
  38. val redisHost ="172.16.213.79"
  39. val redisPort =6379
  40. val redisTimeout =30000
  41. //預設db,使用者存放Offset和pv資料
  42. val dbDefaultIndex =8
  43. InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle)
  44. val conf =newSparkConf().setAppName("TestSparkStreaming").setIfMissing("spark.master","local[2]")
  45. val ssc =newStreamingContext(conf,Seconds(10))
  46. //從Redis獲取上一次存的Offset
  47. val jedis =InternalRedisClient.getPool.getResource
  48. jedis.select(dbDefaultIndex)
  49. val topic_partition_key = topic +"_"+ partition
  50. var lastOffset =0l
  51. val lastSavedOffset = jedis.get(topic_partition_key)
  52. if(null!= lastSavedOffset){
  53. try{
  54. lastOffset = lastSavedOffset.toLong
  55. 相關推薦

    Spark StreamingKafka中獲取資料,並進行實時單詞統計,統計URL出現的次數

    1、建立Maven專案 2、啟動Kafka 3、編寫Pom檔案 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.or

    實時計算Spark StreamingKafkaRedisExactly-once實時

    http://lxw1234.com/archives/2018/02/901.htm在實時流式計算中,最重要的是在任何情況下,訊息不重複、不丟失,即Exactly-once。本文以Kafka–>Spark Streaming–>Redis為例,一方面說明一下如何

    Spark Streaming實時處理筆記(6)—— Kafka 和 Flume的整合

    1 整體架構 2 Flume 配置 https://flume.apache.org/releases/content/1.6.0/FlumeUserGuide.html 啟動kafka kafka-server-start.sh $KAFKA_HOME/config/se

    Spark Streaming實時處理筆記(5)—— Kafka API 程式設計

    1 新建 Maven工程 pom檔案 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLo

    spark streamingkafka實現實時的案例分析

    package day14 import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{ Se

    Spark Streaming整合Kafka實現網站點選實時統計

    安裝並配置zk 安裝並配置Kafka 啟動zk 啟動Kafka 建立topic bin/kafka-topics.sh --create --zookeeper node1.itcast.cn:2181,node2.itcast.cn:2181 \ --

    java實現spark streamingkafka整合進行計算

    背景:網上關於spark streaming的文章還是比較多的,可是大多數用scala實現,因我們的電商實時推薦專案以java為主,就踩了些坑,寫了java版的實現,程式碼比較意識流,輕噴,歡迎討論。流程:spark streaming從kafka讀使用者實時點選資料,過濾資

    Spark入門實戰系列--7.Spark Streaming(上)--實時計算Spark Streaming原理介紹

    【注】該系列文章以及使用到安裝包/測試資料 可以在《》獲取 1、Spark Streaming簡介 1.1 概述 Spark Streaming 是Spark核心API的一個擴充套件,可以實現高吞吐量的、具備容錯機制的實時流資料的處理。支援從多種資料來源獲取資料,包括Kafk、Flume、Twitt

    Spark整合Kafka實時計算Java案例

    package com.test; import java.util.*; import org.apache.spark.SparkConf; import org.apache.spark.TaskContext; import org.apache.spark.api

    Spark StreamingKafka中獲取數據,並進行實時單詞統計,統計URL出現的次數

    scrip 發送消息 rip mark 3.2 umt 過程 bject ttr 1、創建Maven項目 創建的過程參考:http://blog.csdn.net/tototuzuoquan/article/details/74571374 2、啟動Kafka A:安裝ka

    Spark Streaming整合Kafka,Mysql,實時儲存資料到Mysql(基於Receiver的方式)

    叢集分配如下: 192.168.58.11 spark01 192.168.58.12 spark02 192.168.58.13 spark03 spark版本:spark-2.1.0-bin-hadoop2.7 kafka版本:kafka_2.11-2.0.0 Spark St

    Spark Streaming整合Kafka,Mysql,實時儲存資料到Mysql(直接讀取方式)

    叢集分配如下: 192.168.58.11 spark01 192.168.58.12 spark02 192.168.58.13 spark03 spark版本:spark-2.1.0-bin-hadoop2.7 kafka版本:kafka_2.11-2.0.0 Spark St

    基於Spark機器學習和實時計算的智慧推薦系統

    原文連結:http://blog.csdn.net/qq1010885678/article/details/46675501 概要: 隨著電子商務的高速發展和普及應用,個性化推薦的推薦系統已成為一個重要研究領域。  個性化推薦演算法是推薦系統中最核心的技術,在很大程

    Spark實時計算Java案例

    現在,網上基於spark的程式碼基本上都是Scala,很多書上也都是基於Scala,沒辦法,誰叫spark是Scala寫出來的了,但是我現在還沒系統的學習Scala,所以只能用java寫spark程式了,spark支援java,而且Scala也基於JVM,不說了

    圖平行計算實踐(二)(spark streaming+graphx+kafka

    上回利用transform方法實現對於資料流的圖平行計算過程,今天繼續改進完善已有的計算機制,加入updateStateByKey和checkpoint機制,保障圖平行計算在故障中仍能保證零誤差。 import kafka.serializer.Strin

    spark系列-8Spark Streaming

    參考連結:http://spark.apache.org/docs/latest/streaming-programming-guide.html 一、Spark Streaming 介紹 Spark Streaming是核心Spark API的擴充套件,可實現實時資料流的可伸縮,高吞吐量,容錯流處理。資料可

    Spark深入學習 -15】Spark Streaming前奏-Kafka初體驗

    rod htm 新的 callback tails 包括 -c 舉例 清理 ----本節內容------- 1.Kafka基礎概念 1.1 出世背景 1.2 基本原理 1.2.1.前置知識 1.2.2.架構和原理 1.2.

    【轉】Spark StreamingKafka整合開發指南

    thread ada 關系 方法 拷貝 理解 1.2 reduce arr 基於Receivers的方法 這個方法使用了Receivers來接收數據。Receivers的實現使用到Kafka高層次的消費者API。對於所有的Receivers,接收到的數據將會保存在Spark

    scala spark-streaming整合kafkaspark 2.3 kafka 0.10)

    obj required word 錯誤 prope apache rop sta move Maven組件如下: <dependency> <groupId>org.apache.spark</groupId> <

    spark streaming整合kafka-直連的方式

    import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import kafka.utils.{ZKGroupTopicDi