1. 程式人生 > >Flume+Kakfa+Spark Streaming整合(執行WordCount小例子)

Flume+Kakfa+Spark Streaming整合(執行WordCount小例子)

環境版本:Scala 2.10.5; Spark 1.6.0; Kafka 0.10.0.1; Flume 1.6.0

Flume/Kafka的安裝配置請看我之前的部落格:
http://blog.csdn.net/dr_guo/article/details/52130812
http://blog.csdn.net/dr_guo/article/details/51050715
1.Flume 配置檔案 kafka_sink.conf(監聽檔案1.log sink到kafka)

agent.sources = r1
agent.channels = c2
agent.sinks = k2

####define source begin

##define source-r1-exec
agent.sources.r1.channels = c2
agent.sources.r1.type = exec
agent.sources.r1.command = tail -F /home/drguo/flumetest/1.log


####define sink begin

##define sink-k2-kafka
agent.sinks.k2.channel = c2
agent.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k2.topic = test
agent.sinks.k2.brokerList = DXHY-YFEB-01:9092,DXHY-YFEB-02:9092,DXHY-YFEB-03:9092
agent.sinks.k2.requiredAcks = 1
agent.sinks.k2.batchSize = 20
#agent.sinks.k2.serializer.class = Kafka.serializer.StringEncoder
agent.sinks.k2.producer.type = async
#agent.sinks.k2.batchSize = 100


####define channel begin

##define channel-c2-memory
agent.channels.c2.type = memory
agent.channels.c2.capacity = 1000000
agent.channels.c2.transactionCapacity = 100
2.啟動Flume
flume-ng agent --conf conf --conf-file ./kafka_sink.conf --name agent -Dflume.root.logger=INFO,console  
3.執行Spark Streaming(word count)

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 單詞統計 | Spark Streaming 接收並處理 Kafka 中資料
  * Created by drguo on 2017/11/21.
  */
object KafkaWordCount {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")

    val ssc = new StreamingContext(sparkConf, Seconds(50))//方塊長度
    //ssc.checkpoint("checkpoint")

    val topics = "test"
    val numThreads = "2"
    val zkQuorum = "DXHY-YFEB-01:2181,DXHY-YFEB-02:2181,DXHY-YFEB-03:2181"
    val group = "TestConsumerGroup"

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    // Create a direct stream
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)

    val words = lines.flatMap(_.split(" "))
    /**
     reduceByKeyAndWindow(_ + _, _ - _, Seconds(15), Seconds(10), 2)
     總的來說SparkStreaming提供這個方法主要是出於效率考慮。 
     比如每10秒計算一下前15秒的內容,(每個batch 5秒), 
那麼就是通過如下步驟:
    1.儲存上一個window的reduce值
    2.計算出上一個window的begin 時間到 重複段的開始時間的reduce 值 =》 oldRDD
    3.重複時間段的值結束時間到當前window的結束時間的值 =》 newRDD
    4.重複時間段的值等於上一個window的值減去oldRDD

    這樣就不需要去計算每個batch的值, 只需加加減減就能得到新的reduce出來的值。

  */
    val wordCounts = words.map((_, 1)).reduceByKeyAndWindow(
      (v1: Int, v2: Int) => {
        v1 + v2
      },
      //十分鐘 視窗長度
      //Minutes(10),
      //每五秒 視窗移動長度 統計十分鐘內的資料
      Seconds(50))

    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}
4.插入測試資料
[[email protected] flumetest]# echo "hadoop spark hello world world hh hh" >> 1.log
5.檢視執行結果

這裡寫圖片描述

Flume簡單的命令列操作
#建立topic
[[email protected] KAFKA]# bin/kafka-topics --create --zookeeper DXHY-YFEB-01:2181,DXHY-YFEB-02:2181,DXHY-YFEB-03:2181 --replication-factor 3 -partitions 1 --topic test

#檢視topic
[
[email protected]
KAFKA]# bin/kafka-topics --list --zookeeper DXHY-YFEB-01:2181,DXHY-YFEB-02:2181,DXHY-YFEB-03:2181 #使用生產者向topic生產訊息 可以有多個生產者 [[email protected] KAFKA]# bin/kafka-console-producer --broker-list DXHY-YFEB-01:9092,DXHY-YFEB-02:9092,DXHY-YFEB-03:9092 --topic test [[email protected] KAFKA]# ~ [[email protected] KAFKA]# ~ #使用消費者消費topic裡的訊息 可以有多個消費者 [[email protected] KAFKA]# bin/kafka-console-consumer --zookeeper DXHY-YFEB-01:2181,DXHY-YFEB-02:2181,DXHY-YFEB-03:2181 --from-beginning --topic test [[email protected] KAFKA]# ~ [[email protected] KAFKA]# ~ #檢視消費者組 [[email protected] KAFKA]# bin/kafka-consumer-groups --bootstrap-server DXHY-YFEB-01:9092,DXHY-YFEB-02:9092,DXHY-YFEB-03:9092 --list

相關推薦

Flume+Kakfa+Spark Streaming整合執行WordCount例子

環境版本:Scala 2.10.5; Spark 1.6.0; Kafka 0.10.0.1; Flume 1.6.0 Flume/Kafka的安裝配置請看我之前的部落格: http://blog.c

Spark學習筆記15——Spark Streaming 整合 Flume

1 flume 配置檔案 在 flume-env.sh 裡配置 JAVA_HOME 1.1 flume-pull.conf # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.chan

Spark學習拾叄- Spark Streaming整合Flume&Kafka

文章目錄 處理流程畫圖剖析 日誌產生器開發並結合log4j完成日誌的輸出 使用Flume採集Log4j產生的日誌 使用KafkaSInk將Flume收集到的資料輸出到Kafka Spark Streaming消費Kafka的

Spark學習拾壹- Spark Streaming整合Flume

文章目錄 Push方式整合之概述 Push方式整合之Flume Agent配置開發 Push方式整合之Spark Streaming應用開發 Push方式整合之本地IDEA環境聯調 Push方式整合之伺服器環境聯調

Spark 系列十五—— Spark Streaming 整合 Flume

一、簡介 Apache Flume 是一個分散式,高可用的資料收集系統,可以從不同的資料來源收集資料,經過聚合後傳送到分散式計算框架或者儲存系統中。Spark Straming 提供了以下兩種方式用於 Flume 的整合。 二、推送式方法 在推送式方法 (Flume-style Push-based Appr

scala spark-streaming整合kafka spark 2.3 kafka 0.10

obj required word 錯誤 prope apache rop sta move Maven組件如下: <dependency> <groupId>org.apache.spark</groupId> <

Spark學習筆記16——Spark Streaming 整合Kafka

1 啟動 zk(zookeeper-3.4.8) 三個節點同時操作 zkServer.sh start 2 啟動 Kafka 三個節點同時操作 kafka-server-start.sh /home/hadoop/apps/kafka_2.10-0.8.2.1/conf

Spark Streaming整合flume(Poll方式和Push方式)

flume作為日誌實時採集的框架,可以與SparkStreaming實時處理框架進行對接,flume實時產生資料,sparkStreaming做實時處理。 Spark Streaming對接FlumeNG有兩種方式,一種是FlumeNG將訊息Push推給Spark Streaming,還

大資料學習之路97-kafka直連方式spark streaming 整合kafka 0.10版本

我們之前SparkStreaming整合Kafka的時候用的是傻瓜式的方式-----createStream,但是這種方式的效率很低。而且在kafka 0.10版本之後就不再提供了。 接下來我們使用Kafka直連的方式,這種方式其實是呼叫Kafka底層的消費資料的API,我們知道,越底層的東

Spark Streaming整合Spark SQL之wordcount案例

完整原始碼地址:  https://github.com/apache/spark/blob/v2.3.2/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala 案例原

Spark學習拾貳- Spark Streaming整合Kafka

文章目錄 Spark Streaming整合Kafka的版本選擇詳解 以下是基於spark2.2的測試: Receiver方式整合之概述 Receiver方式整合之Kafka測試 Receiver方式整合之Sp

Spark Streaming整合flume實戰

Spark Streaming對接Flume有兩種方式 Poll:Spark Streaming從flume 中拉取資料 Push:Flume將訊息Push推給Spark Streaming 1、安裝flume1.6以上 2、下載依賴包 spark-streaming

spark筆記之Spark Streaming整合flume實戰

a1.sources = r1 a1.sinks = k1 a1.channels = c1 #source a1.sources.r1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.

spark streaming 學習flume結合+和kafka 的結合

spark 2.1 設定日誌級別很簡單 下面幾行程式碼就可以搞定 主要是下面畫橫線的程式碼val conf = new SparkConf().setAppName("FlumePollWordCount").setMaster("local[2]") val sc = ne

【十五】Spark Streaming整合Kafka使用Direct方式使用Scala語言

官網介紹 Kafka提供了新的consumer api 在0.8版本和0.10版本之間。0.8的整合是相容0.9和0.10的。但是0.10的整合不相容以前的版本。 這裡使用的整合是spark-streaming-kafka-0-8。官方文件 配置SparkStrea

Spark Streaming整合Kafka

基於Receiver 方式整合 一、Kafka版本選擇 Spark Streaming支援Kafka0.8.2.1及以上的版本。 Kafka專案介紹了兩個新的Comsumer(消費者)API,在0.8版本和0.10版本之間,根據自身需求選擇版本號,另外要注意,0.8版本是相

SODBASE CEP學習進階篇續:SODBASE CEP與Spark streaming整合-低延遲規則管理 與分散式快取整合

在實際大資料工作中,常常有實時監測資料庫變化或實時同步資料到大資料儲存,解決大資料實時分析的需求。同時,增量同步資料庫資料相比全量查詢也減少了網路頻寬消耗。本文以Mysql的bin-log到Kafka為例,使用Canal Server,通過SODBASE引擎不用寫程式就可以設定資料同步規則。

Spark standalone簡介與執行wordcountmaster、slave1和slave2

 前期部落格 1. Standalone模式       即獨立模式,自帶完整的服務,可單獨部署到一個叢集中,無需依賴任何其他資源管理系統。從一定程度上說,該模式是其他兩種的基礎。借鑑Spark開發模式,我們可以得到一種開發新型計算框架的一般思路:先設計出它的s

【十四】Spark Streaming整合Kafka使用Receiver方式使用Scala語言

官方網站 Kafka提供了新的consumer api 在0.8版本和0.10版本之間。0.8的整合是相容0.9和0.10的。但是0.10的整合不相容以前的版本。 這裡使用的整合是spark-streaming-kafka-0-8。官方文件 配置SparkStrea

SODBASE CEP學習進階篇續:SODBASE CEP與Spark streaming整合-低延遲規則管理

許多大資料平臺專案採用流式計算來處理實時資料,會涉及到一個環節:處理規則管理。因為使用者經常有自己配置資料處理規則或策略的需求。同時,維護人員來也有也有將規則提取出來的需求,方便變更和維護的需求。我們知道Spark streaming作為資料歸檔備份時吞吐量高,與Hadoo