Flume+Kakfa+Spark Streaming整合(執行WordCount小例子)
環境版本:Scala 2.10.5; Spark 1.6.0; Kafka 0.10.0.1; Flume 1.6.0
Flume/Kafka的安裝配置請看我之前的部落格:
http://blog.csdn.net/dr_guo/article/details/52130812
http://blog.csdn.net/dr_guo/article/details/51050715
1.Flume 配置檔案 kafka_sink.conf(監聽檔案1.log sink到kafka) agent.sources = r1 agent.channels = c2 agent.sinks = k2 ####define source begin ##define source-r1-exec agent.sources.r1.channels = c2 agent.sources.r1.type = exec agent.sources.r1.command = tail -F /home/drguo/flumetest/1.log ####define sink begin ##define sink-k2-kafka agent.sinks.k2.channel = c2 agent.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.k2.topic = test agent.sinks.k2.brokerList = DXHY-YFEB-01:9092,DXHY-YFEB-02:9092,DXHY-YFEB-03:9092 agent.sinks.k2.requiredAcks = 1 agent.sinks.k2.batchSize = 20 #agent.sinks.k2.serializer.class = Kafka.serializer.StringEncoder agent.sinks.k2.producer.type = async #agent.sinks.k2.batchSize = 100 ####define channel begin ##define channel-c2-memory agent.channels.c2.type = memory agent.channels.c2.capacity = 1000000 agent.channels.c2.transactionCapacity = 100
2.啟動Flume
flume-ng agent --conf conf --conf-file ./kafka_sink.conf --name agent -Dflume.root.logger=INFO,console
3.執行Spark Streaming(word count) import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 單詞統計 | Spark Streaming 接收並處理 Kafka 中資料 * Created by drguo on 2017/11/21. */ object KafkaWordCount { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(50))//方塊長度 //ssc.checkpoint("checkpoint") val topics = "test" val numThreads = "2" val zkQuorum = "DXHY-YFEB-01:2181,DXHY-YFEB-02:2181,DXHY-YFEB-03:2181" val group = "TestConsumerGroup" val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap // Create a direct stream val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val words = lines.flatMap(_.split(" ")) /** reduceByKeyAndWindow(_ + _, _ - _, Seconds(15), Seconds(10), 2) 總的來說SparkStreaming提供這個方法主要是出於效率考慮。 比如每10秒計算一下前15秒的內容,(每個batch 5秒), 那麼就是通過如下步驟: 1.儲存上一個window的reduce值 2.計算出上一個window的begin 時間到 重複段的開始時間的reduce 值 =》 oldRDD 3.重複時間段的值結束時間到當前window的結束時間的值 =》 newRDD 4.重複時間段的值等於上一個window的值減去oldRDD 這樣就不需要去計算每個batch的值, 只需加加減減就能得到新的reduce出來的值。 */ val wordCounts = words.map((_, 1)).reduceByKeyAndWindow( (v1: Int, v2: Int) => { v1 + v2 }, //十分鐘 視窗長度 //Minutes(10), //每五秒 視窗移動長度 統計十分鐘內的資料 Seconds(50)) wordCounts.print() ssc.start() ssc.awaitTermination() } }
4.插入測試資料
[[email protected] flumetest]# echo "hadoop spark hello world world hh hh" >> 1.log
5.檢視執行結果
Flume簡單的命令列操作
#建立topic
[[email protected] KAFKA]# bin/kafka-topics --create --zookeeper DXHY-YFEB-01:2181,DXHY-YFEB-02:2181,DXHY-YFEB-03:2181 --replication-factor 3 -partitions 1 --topic test
#檢視topic
[ [email protected] KAFKA]# bin/kafka-topics --list --zookeeper DXHY-YFEB-01:2181,DXHY-YFEB-02:2181,DXHY-YFEB-03:2181
#使用生產者向topic生產訊息 可以有多個生產者
[[email protected] KAFKA]# bin/kafka-console-producer --broker-list DXHY-YFEB-01:9092,DXHY-YFEB-02:9092,DXHY-YFEB-03:9092 --topic test
[[email protected] KAFKA]# ~
[[email protected] KAFKA]# ~
#使用消費者消費topic裡的訊息 可以有多個消費者
[[email protected] KAFKA]# bin/kafka-console-consumer --zookeeper DXHY-YFEB-01:2181,DXHY-YFEB-02:2181,DXHY-YFEB-03:2181 --from-beginning --topic test
[[email protected] KAFKA]# ~
[[email protected] KAFKA]# ~
#檢視消費者組
[[email protected] KAFKA]# bin/kafka-consumer-groups --bootstrap-server DXHY-YFEB-01:9092,DXHY-YFEB-02:9092,DXHY-YFEB-03:9092 --list
相關推薦
Flume+Kakfa+Spark Streaming整合(執行WordCount小例子)
環境版本:Scala 2.10.5; Spark 1.6.0; Kafka 0.10.0.1; Flume 1.6.0 Flume/Kafka的安裝配置請看我之前的部落格: http://blog.c
Spark學習筆記(15)——Spark Streaming 整合 Flume
1 flume 配置檔案 在 flume-env.sh 裡配置 JAVA_HOME 1.1 flume-pull.conf # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.chan
Spark學習(拾叄)- Spark Streaming整合Flume&Kafka
文章目錄 處理流程畫圖剖析 日誌產生器開發並結合log4j完成日誌的輸出 使用Flume採集Log4j產生的日誌 使用KafkaSInk將Flume收集到的資料輸出到Kafka Spark Streaming消費Kafka的
Spark學習(拾壹)- Spark Streaming整合Flume
文章目錄 Push方式整合之概述 Push方式整合之Flume Agent配置開發 Push方式整合之Spark Streaming應用開發 Push方式整合之本地IDEA環境聯調 Push方式整合之伺服器環境聯調
Spark 系列(十五)—— Spark Streaming 整合 Flume
一、簡介 Apache Flume 是一個分散式,高可用的資料收集系統,可以從不同的資料來源收集資料,經過聚合後傳送到分散式計算框架或者儲存系統中。Spark Straming 提供了以下兩種方式用於 Flume 的整合。 二、推送式方法 在推送式方法 (Flume-style Push-based Appr
scala spark-streaming整合kafka (spark 2.3 kafka 0.10)
obj required word 錯誤 prope apache rop sta move Maven組件如下: <dependency> <groupId>org.apache.spark</groupId> <
Spark學習筆記(16)——Spark Streaming 整合Kafka
1 啟動 zk(zookeeper-3.4.8) 三個節點同時操作 zkServer.sh start 2 啟動 Kafka 三個節點同時操作 kafka-server-start.sh /home/hadoop/apps/kafka_2.10-0.8.2.1/conf
Spark Streaming整合flume(Poll方式和Push方式)
flume作為日誌實時採集的框架,可以與SparkStreaming實時處理框架進行對接,flume實時產生資料,sparkStreaming做實時處理。 Spark Streaming對接FlumeNG有兩種方式,一種是FlumeNG將訊息Push推給Spark Streaming,還
大資料學習之路97-kafka直連方式(spark streaming 整合kafka 0.10版本)
我們之前SparkStreaming整合Kafka的時候用的是傻瓜式的方式-----createStream,但是這種方式的效率很低。而且在kafka 0.10版本之後就不再提供了。 接下來我們使用Kafka直連的方式,這種方式其實是呼叫Kafka底層的消費資料的API,我們知道,越底層的東
Spark Streaming整合Spark SQL之wordcount案例
完整原始碼地址: https://github.com/apache/spark/blob/v2.3.2/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala 案例原
Spark學習(拾貳)- Spark Streaming整合Kafka
文章目錄 Spark Streaming整合Kafka的版本選擇詳解 以下是基於spark2.2的測試: Receiver方式整合之概述 Receiver方式整合之Kafka測試 Receiver方式整合之Sp
Spark Streaming整合flume實戰
Spark Streaming對接Flume有兩種方式 Poll:Spark Streaming從flume 中拉取資料 Push:Flume將訊息Push推給Spark Streaming 1、安裝flume1.6以上 2、下載依賴包 spark-streaming
spark筆記之Spark Streaming整合flume實戰
a1.sources = r1 a1.sinks = k1 a1.channels = c1 #source a1.sources.r1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.
spark streaming 學習(和flume結合+和kafka 的結合)
spark 2.1 設定日誌級別很簡單 下面幾行程式碼就可以搞定 主要是下面畫橫線的程式碼val conf = new SparkConf().setAppName("FlumePollWordCount").setMaster("local[2]") val sc = ne
【十五】Spark Streaming整合Kafka使用Direct方式(使用Scala語言)
官網介紹 Kafka提供了新的consumer api 在0.8版本和0.10版本之間。0.8的整合是相容0.9和0.10的。但是0.10的整合不相容以前的版本。 這裡使用的整合是spark-streaming-kafka-0-8。官方文件 配置SparkStrea
Spark Streaming整合Kafka(一)
基於Receiver 方式整合 一、Kafka版本選擇 Spark Streaming支援Kafka0.8.2.1及以上的版本。 Kafka專案介紹了兩個新的Comsumer(消費者)API,在0.8版本和0.10版本之間,根據自身需求選擇版本號,另外要注意,0.8版本是相
SODBASE CEP學習進階篇(七)續:SODBASE CEP與Spark streaming整合-低延遲規則管理 與分散式快取整合
在實際大資料工作中,常常有實時監測資料庫變化或實時同步資料到大資料儲存,解決大資料實時分析的需求。同時,增量同步資料庫資料相比全量查詢也減少了網路頻寬消耗。本文以Mysql的bin-log到Kafka為例,使用Canal Server,通過SODBASE引擎不用寫程式就可以設定資料同步規則。
Spark standalone簡介與執行wordcount(master、slave1和slave2)
前期部落格 1. Standalone模式 即獨立模式,自帶完整的服務,可單獨部署到一個叢集中,無需依賴任何其他資源管理系統。從一定程度上說,該模式是其他兩種的基礎。借鑑Spark開發模式,我們可以得到一種開發新型計算框架的一般思路:先設計出它的s
【十四】Spark Streaming整合Kafka使用Receiver方式(使用Scala語言)
官方網站 Kafka提供了新的consumer api 在0.8版本和0.10版本之間。0.8的整合是相容0.9和0.10的。但是0.10的整合不相容以前的版本。 這裡使用的整合是spark-streaming-kafka-0-8。官方文件 配置SparkStrea
SODBASE CEP學習進階篇(七)續:SODBASE CEP與Spark streaming整合-低延遲規則管理
許多大資料平臺專案採用流式計算來處理實時資料,會涉及到一個環節:處理規則管理。因為使用者經常有自己配置資料處理規則或策略的需求。同時,維護人員來也有也有將規則提取出來的需求,方便變更和維護的需求。我們知道Spark streaming作為資料歸檔備份時吞吐量高,與Hadoo