實時流計算、Spark Streaming、Kafka、Redis、Exactly-once、實時去重
http://lxw1234.com/archives/2018/02/901.htm
在實時流式計算中,最重要的是在任何情況下,訊息不重複、不丟失,即Exactly-once。本文以Kafka–>Spark Streaming–>Redis為例,一方面說明一下如何做到Exactly-once,另一方面說明一下我是如何計算實時去重指標的。
1. 關於資料來源
資料來源是文字格式的日誌,由Nginx產生,存放於日誌伺服器上。在日誌伺服器上部署Flume Agent,使用TAILDIR Source和Kafka Sink,將日誌採集到Kafka進行臨時儲存。日誌格式如下:
2018-02-22T00:00:00+08:00|~|200|~|/test?pcid
2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=GLLIEG&siteid=3
2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=HIJMEC&siteid=8
2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=HMGBDE&siteid=3
2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=HIJFLA&siteid=4
2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=JCEBBC&siteid=9
2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=KJLAKG&siteid=8
2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=FHEIKI&siteid=3
2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=IGIDLB&siteid=3
2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=IIIJCD&siteid=5
日誌是由測試程式模擬產生的,欄位之間由|~|分隔。
2. 實時計算需求
分天、分小時PV;
分天、分小時、分網站(siteid)PV;
分天 UV;
3. Spark Streaming消費Kafka資料
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
在Spark Streaming中消費Kafka資料,保證Exactly-once的核心有三點:
使用Direct方式連線Kafka;自己儲存和維護Offset;更新Offset和計算在同一事務中完成;
後面的Spark Streaming程式(文章結尾),主要有以下步驟:
- 啟動後,先從Redis中獲取上次儲存的Offset,Redis中的key為”topic_partition”,即每個分割槽維護一個Offset;
- 使用獲取到的Offset,建立DirectStream;
- 在處理每批次的訊息時,利用Redis的事務機制,確保在Redis中指標的計算和Offset的更新維護,在同一事務中完成。只有這兩者同步,才能真正保證訊息的Exactly-once。
- ./spark-submit \
- --class com.lxw1234.spark.TestSparkStreaming \
- --master local[2] \
- --conf spark.streaming.kafka.maxRatePerPartition=20000 \
- --jars /data1/home/dmp/lxw/realtime/commons-pool2-2.3.jar,\
- /data1/home/dmp/lxw/realtime/jedis-2.9.0.jar,\
- /data1/home/dmp/lxw/realtime/kafka-clients-0.11.0.1.jar,\
- /data1/home/dmp/lxw/realtime/spark-streaming-kafka-0-10_2.11-2.2.1.jar \
- /data1/home/dmp/lxw/realtime/testsparkstreaming.jar \
- --executor-memory 4G \
- --num-executors 1
在啟動Spark Streaming程式時候,有個引數最好指定:
spark.streaming.kafka.maxRatePerPartition=20000(每秒鐘從topic的每個partition最多消費的訊息條數)
如果程式第一次執行,或者因為某種原因暫停了很久重新啟動時候,會積累很多訊息,如果這些訊息同時被消費,很有可能會因為記憶體不夠而掛掉,因此,需要根據實際的資料量大小,以及批次的間隔時間來設定該引數,以限定批次的訊息量。
如果該引數設定20000,而批次間隔時間未10秒,那麼每個批次最多從Kafka中消費20萬訊息。
4. Redis中的資料模型
- 分小時、分網站PV
普通K-V結構,計算時候使用incr命令遞增,
Key為 “site_pv_網站ID_小時”,
如:site_pv_9_2018-02-21-00、site_pv_10_2018-02-21-01
該資料模型用於計算分網站的按小時及按天PV。
- 分小時PV
普通K-V結構,計算時候使用incr命令遞增,
Key為“pv_小時”,如:pv_2018-02-21-14、pv_2018-02-22-03
該資料模型用於計算按小時及按天總PV。
- 分天UV
Set結構,計算時候使用sadd命令新增,
Key為”uv_天”,如:uv_2018-02-21、uv_2018-02-20
該資料模型使用者計算按天UV(獲取時候使用SCARD命令獲取Set元素個數)
注:這些Key對應的時間,均由實際訊息中的第一個欄位(時間)而定。
5. 故障恢復
如果Spark Streaming程式因為停電、網路等意外情況終止而需要恢復,則直接重啟即可;
如果因為其他原因需要重新計算某一時間段的訊息,可以先刪除Redis中對應時間段內的Key,然後從原始日誌中擷取該時間段內的訊息,當做新訊息新增至Kafka,由Spark Streaming程式重新消費並進行計算;
6. 附程式
依賴jar包:
commons-pool2-2.3.jar
jedis-2.9.0.jar
kafka-clients-0.11.0.1.jar
spark-streaming-kafka-0-10_2.11-2.2.1.jar
InternalRedisClient (Redis連結池)
- package com.lxw1234.spark
- import redis.clients.jedis.JedisPool
- import org.apache.commons.pool2.impl.GenericObjectPoolConfig
- /**
- * @author lxw1234
- */
- /**
- * Internal Redis client for managing Redis connection {@link Jedis} based on {@link RedisPool}
- */
- objectInternalRedisClientextendsSerializable{
- @transientprivatevar pool:JedisPool=null
- def makePool(redisHost:String, redisPort:Int, redisTimeout:Int,
- maxTotal:Int, maxIdle:Int, minIdle:Int):Unit={
- makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle,true,false,10000)
- }
- def makePool(redisHost:String, redisPort:Int, redisTimeout:Int,
- maxTotal:Int, maxIdle:Int, minIdle:Int, testOnBorrow:Boolean,
- testOnReturn:Boolean, maxWaitMillis:Long):Unit={
- if(pool ==null){
- val poolConfig =newGenericObjectPoolConfig()
- poolConfig.setMaxTotal(maxTotal)
- poolConfig.setMaxIdle(maxIdle)
- poolConfig.setMinIdle(minIdle)
- poolConfig.setTestOnBorrow(testOnBorrow)
- poolConfig.setTestOnReturn(testOnReturn)
- poolConfig.setMaxWaitMillis(maxWaitMillis)
- pool =newJedisPool(poolConfig, redisHost, redisPort, redisTimeout)
- val hook =newThread{
- overridedef run = pool.destroy()
- }
- sys.addShutdownHook(hook.run)
- }
- }
- def getPool:JedisPool={
- assert(pool !=null)
- pool
- }
- }
TestSparkStreaming
- package com.lxw1234.spark
- import org.apache.kafka.clients.consumer.ConsumerRecord
- import org.apache.kafka.common.TopicPartition
- import org.apache.kafka.common.serialization.StringDeserializer
- import org.apache.spark.SparkConf
- import org.apache.spark.rdd.RDD
- import org.apache.spark.streaming.Seconds
- import org.apache.spark.streaming.StreamingContext
- import org.apache.spark.streaming.kafka010.ConsumerStrategies
- import org.apache.spark.streaming.kafka010.HasOffsetRanges
- import org.apache.spark.streaming.kafka010.KafkaUtils
- import org.apache.spark.streaming.kafka010.LocationStrategies
- import redis.clients.jedis.Pipeline
- /**
- * @author lxw1234
- * 獲取topic最小的offset
- * ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list datadev1:9092 --topic lxw1234 --time -2
- */
- objectTestSparkStreaming{
- def main(args :Array[String]):Unit={
- val brokers ="datadev1:9092"
- val topic ="lxw1234"
- val partition :Int=0//測試topic只有一個分割槽
- val start_offset :Long=0l
- //Kafka引數
- val kafkaParams =Map[String,Object](
- "bootstrap.servers"-> brokers,
- "key.deserializer"-> classOf[StringDeserializer],
- "value.deserializer"-> classOf[StringDeserializer],
- "group.id"->"exactly-once",
- "enable.auto.commit"->(false: java.lang.Boolean),
- "auto.offset.reset"->"none"
- )
- // Redis configurations
- val maxTotal =10
- val maxIdle =10
- val minIdle =1
- val redisHost ="172.16.213.79"
- val redisPort =6379
- val redisTimeout =30000
- //預設db,使用者存放Offset和pv資料
- val dbDefaultIndex =8
- InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle)
- val conf =newSparkConf().setAppName("TestSparkStreaming").setIfMissing("spark.master","local[2]")
- val ssc =newStreamingContext(conf,Seconds(10))
- //從Redis獲取上一次存的Offset
- val jedis =InternalRedisClient.getPool.getResource
- jedis.select(dbDefaultIndex)
- val topic_partition_key = topic +"_"+ partition
- var lastOffset =0l
- val lastSavedOffset = jedis.get(topic_partition_key)
- if(null!= lastSavedOffset){
- try{
- lastOffset = lastSavedOffset.toLong
-
相關推薦
Spark Streaming從Kafka中獲取資料,並進行實時單詞統計,統計URL出現的次數
1、建立Maven專案 2、啟動Kafka 3、編寫Pom檔案 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.or
實時流計算、Spark Streaming、Kafka、Redis、Exactly-once、實時去重
http://lxw1234.com/archives/2018/02/901.htm在實時流式計算中,最重要的是在任何情況下,訊息不重複、不丟失,即Exactly-once。本文以Kafka–>Spark Streaming–>Redis為例,一方面說明一下如何
Spark Streaming實時流處理筆記(6)—— Kafka 和 Flume的整合
1 整體架構 2 Flume 配置 https://flume.apache.org/releases/content/1.6.0/FlumeUserGuide.html 啟動kafka kafka-server-start.sh $KAFKA_HOME/config/se
Spark Streaming實時流處理筆記(5)—— Kafka API 程式設計
1 新建 Maven工程 pom檔案 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLo
spark streaming 與 kafka實現實時流的案例分析
package day14 import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{ Se
Spark Streaming整合Kafka實現網站點選流實時統計
安裝並配置zk 安裝並配置Kafka 啟動zk 啟動Kafka 建立topic bin/kafka-topics.sh --create --zookeeper node1.itcast.cn:2181,node2.itcast.cn:2181 \ --
java實現spark streaming與kafka整合進行流式計算
背景:網上關於spark streaming的文章還是比較多的,可是大多數用scala實現,因我們的電商實時推薦專案以java為主,就踩了些坑,寫了java版的實現,程式碼比較意識流,輕噴,歡迎討論。流程:spark streaming從kafka讀使用者實時點選資料,過濾資
Spark入門實戰系列--7.Spark Streaming(上)--實時流計算Spark Streaming原理介紹
【注】該系列文章以及使用到安裝包/測試資料 可以在《》獲取 1、Spark Streaming簡介 1.1 概述 Spark Streaming 是Spark核心API的一個擴充套件,可以實現高吞吐量的、具備容錯機制的實時流資料的處理。支援從多種資料來源獲取資料,包括Kafk、Flume、Twitt
Spark整合Kafka實時流計算Java案例
package com.test; import java.util.*; import org.apache.spark.SparkConf; import org.apache.spark.TaskContext; import org.apache.spark.api
Spark Streaming從Kafka中獲取數據,並進行實時單詞統計,統計URL出現的次數
scrip 發送消息 rip mark 3.2 umt 過程 bject ttr 1、創建Maven項目 創建的過程參考:http://blog.csdn.net/tototuzuoquan/article/details/74571374 2、啟動Kafka A:安裝ka
Spark Streaming整合Kafka,Mysql,實時儲存資料到Mysql(基於Receiver的方式)
叢集分配如下: 192.168.58.11 spark01 192.168.58.12 spark02 192.168.58.13 spark03 spark版本:spark-2.1.0-bin-hadoop2.7 kafka版本:kafka_2.11-2.0.0 Spark St
Spark Streaming整合Kafka,Mysql,實時儲存資料到Mysql(直接讀取方式)
叢集分配如下: 192.168.58.11 spark01 192.168.58.12 spark02 192.168.58.13 spark03 spark版本:spark-2.1.0-bin-hadoop2.7 kafka版本:kafka_2.11-2.0.0 Spark St
基於Spark機器學習和實時流計算的智慧推薦系統
原文連結:http://blog.csdn.net/qq1010885678/article/details/46675501 概要: 隨著電子商務的高速發展和普及應用,個性化推薦的推薦系統已成為一個重要研究領域。 個性化推薦演算法是推薦系統中最核心的技術,在很大程
Spark實時流計算Java案例
現在,網上基於spark的程式碼基本上都是Scala,很多書上也都是基於Scala,沒辦法,誰叫spark是Scala寫出來的了,但是我現在還沒系統的學習Scala,所以只能用java寫spark程式了,spark支援java,而且Scala也基於JVM,不說了
圖平行計算實踐(二)(spark streaming+graphx+kafka)
上回利用transform方法實現對於資料流的圖平行計算過程,今天繼續改進完善已有的計算機制,加入updateStateByKey和checkpoint機制,保障圖平行計算在故障中仍能保證零誤差。 import kafka.serializer.Strin
spark系列-8、Spark Streaming
參考連結:http://spark.apache.org/docs/latest/streaming-programming-guide.html 一、Spark Streaming 介紹 Spark Streaming是核心Spark API的擴充套件,可實現實時資料流的可伸縮,高吞吐量,容錯流處理。資料可
【Spark深入學習 -15】Spark Streaming前奏-Kafka初體驗
rod htm 新的 callback tails 包括 -c 舉例 清理 ----本節內容------- 1.Kafka基礎概念 1.1 出世背景 1.2 基本原理 1.2.1.前置知識 1.2.2.架構和原理 1.2.
【轉】Spark Streaming和Kafka整合開發指南
thread ada 關系 方法 拷貝 理解 1.2 reduce arr 基於Receivers的方法 這個方法使用了Receivers來接收數據。Receivers的實現使用到Kafka高層次的消費者API。對於所有的Receivers,接收到的數據將會保存在Spark
scala spark-streaming整合kafka (spark 2.3 kafka 0.10)
obj required word 錯誤 prope apache rop sta move Maven組件如下: <dependency> <groupId>org.apache.spark</groupId> <
spark streaming整合kafka-直連的方式
import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import kafka.utils.{ZKGroupTopicDi