1. 程式人生 > >Flume實時監控目錄sink到hdfs

Flume實時監控目錄sink到hdfs

imp spa bat 添加 flume-ng mat header star txt

目標:Flume實時監控目錄sink到hdfs,再用sparkStreaming監控hdfs的這個目錄,對數據進行計算

1、flume的配置,配置spoolDirSource_hdfsSink.properties,監控本地的一個目錄,上傳到hdfs一個目錄下。

agent1.channels = ch1
agent1.sources = spoolDir-source1
agent1.sinks = hdfs-sink1

# 定義channel
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity=10000
agent1.channels.ch1.transactionCapacity=1000

# 定義source
agent1.sources.spoolDir-source1.channels = ch1
agent1.sources.spoolDir-source1.type = spooldir
agent1.sources.spoolDir-source1.spoolDir = /home/hadoop/flumeDir
agent1.sources.spoolDir-source1.fileHeader = false


agent1.sources.spoolDir-source1.interceptors=i1 i2
agent1.sources.spoolDir-source1.interceptors.i1.type=timestamp
agent1.sources.spoolDir-source1.interceptors.i2.type=static
agent1.sources.spoolDir-source1.interceptors.i2.key=k
agent1.sources.spoolDir-source1.interceptors.i2.value=v


# 定義sink
agent1.sinks.hdfs-sink1.channel = ch1
agent1.sinks.hdfs-sink1.type = hdfs
agent1.sinks.hdfs-sink1.hdfs.path = hdfs://192.168.1.123:9000/user/hadoop/hdfsSink/%Y-%m-%d
agent1.sinks.hdfs-sink1.fileType = DataStream
agent1.sinks.hdfs-sink1.writeFormat=TEXT
agent1.sinks.hdfs-sink1.filePrefix = flumeHdfs
agent1.sinks.hdfs-sink1.batchSize = 1000
agent1.sinks.hdfs-sink1.rollSize = 10240
agent1.sinks.hdfs-sink1.rollCount = 0
agent1.sinks.hdfs-sink1.rollInterval = 1
agent1.sinks.hdfs-sink1.useLocalTimeStamp = true

2、測試本地目錄中的文件是否能被監控傳入到hdfs目錄

  1>、啟動flume命令:bin/flume-ng agent --conf conf/ --conf-file conf/spoolDirSource_hdfsSink.properties --name agent1 -Dflume.root.logger=INFO,console &

  技術分享

  啟動成功!

  2>、往/home/hadoop/flumeDir中touch一個文件,d.txt。

flume會監控到這個目錄裏添加了新文件,就會把這個文件收集到hdfs相應目錄下,在hdfs的位置如下圖所示:

技術分享

  運行完成的文件,flume會把文件標記為完成,如下所示:

技術分享

  3>、這時候運行的sparkStreaming就會監控到hdfs上的變化,運行必要的邏輯,這裏我們是實現簡單的計數。

結果如下:

技術分享

  4>、sparkStreaming的代碼如下:

package hdfsStreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.Seconds
import org.apache.spark.SparkContext
/**
* 監控HDFS一個目錄下的文件,有一定的時間間隔,隔一段時間執行一次
* 要等待執行完成
* 離線的批量流式處理
*/
object HdfsStreaming {
def main(args: Array[String]) {

if(args.length !=1){
println("Usage: <inputPath>");
System.exit(1)
}
//構造配置對象,獲取系統默認的配置對象
val conf=new SparkConf
val sc=new SparkContext(conf)
//構造sparkStreaming上下文對象,參數一是配置,參數二是時間間隔30s
val scc=new StreamingContext(sc,Seconds(30))

//指定接收器,參數為hdfs目錄
val datas=scc.textFileStream(args(0))

//業務邏輯
val rs=datas.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

//打印結果集
rs.print

//啟動任務,需要使用上下文對象啟動
scc.start

//等待任務完成
scc.awaitTermination

}
}

Flume實時監控目錄sink到hdfs