Flume實時監控目錄sink到hdfs
目標:Flume實時監控目錄sink到hdfs,再用sparkStreaming監控hdfs的這個目錄,對數據進行計算
1、flume的配置,配置spoolDirSource_hdfsSink.properties,監控本地的一個目錄,上傳到hdfs一個目錄下。
agent1.channels = ch1
agent1.sources = spoolDir-source1
agent1.sinks = hdfs-sink1
# 定義channel
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity=10000
agent1.channels.ch1.transactionCapacity=1000
# 定義source
agent1.sources.spoolDir-source1.channels = ch1
agent1.sources.spoolDir-source1.type = spooldir
agent1.sources.spoolDir-source1.spoolDir = /home/hadoop/flumeDir
agent1.sources.spoolDir-source1.fileHeader = false
agent1.sources.spoolDir-source1.interceptors=i1 i2
agent1.sources.spoolDir-source1.interceptors.i1.type=timestamp
agent1.sources.spoolDir-source1.interceptors.i2.type=static
agent1.sources.spoolDir-source1.interceptors.i2.key=k
agent1.sources.spoolDir-source1.interceptors.i2.value=v
# 定義sink
agent1.sinks.hdfs-sink1.channel = ch1
agent1.sinks.hdfs-sink1.type = hdfs
agent1.sinks.hdfs-sink1.hdfs.path = hdfs://192.168.1.123:9000/user/hadoop/hdfsSink/%Y-%m-%d
agent1.sinks.hdfs-sink1.fileType = DataStream
agent1.sinks.hdfs-sink1.writeFormat=TEXT
agent1.sinks.hdfs-sink1.filePrefix = flumeHdfs
agent1.sinks.hdfs-sink1.batchSize = 1000
agent1.sinks.hdfs-sink1.rollSize = 10240
agent1.sinks.hdfs-sink1.rollCount = 0
agent1.sinks.hdfs-sink1.rollInterval = 1
agent1.sinks.hdfs-sink1.useLocalTimeStamp = true
2、測試本地目錄中的文件是否能被監控傳入到hdfs目錄
1>、啟動flume命令:bin/flume-ng agent --conf conf/ --conf-file conf/spoolDirSource_hdfsSink.properties --name agent1 -Dflume.root.logger=INFO,console &
啟動成功!
2>、往/home/hadoop/flumeDir中touch一個文件,d.txt。
flume會監控到這個目錄裏添加了新文件,就會把這個文件收集到hdfs相應目錄下,在hdfs的位置如下圖所示:
運行完成的文件,flume會把文件標記為完成,如下所示:
3>、這時候運行的sparkStreaming就會監控到hdfs上的變化,運行必要的邏輯,這裏我們是實現簡單的計數。
結果如下:
4>、sparkStreaming的代碼如下:
package hdfsStreaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.Seconds
import org.apache.spark.SparkContext
/**
* 監控HDFS一個目錄下的文件,有一定的時間間隔,隔一段時間執行一次
* 要等待執行完成
* 離線的批量流式處理
*/
object HdfsStreaming {
def main(args: Array[String]) {
if(args.length !=1){
println("Usage: <inputPath>");
System.exit(1)
}
//構造配置對象,獲取系統默認的配置對象
val conf=new SparkConf
val sc=new SparkContext(conf)
//構造sparkStreaming上下文對象,參數一是配置,參數二是時間間隔30s
val scc=new StreamingContext(sc,Seconds(30))
//指定接收器,參數為hdfs目錄
val datas=scc.textFileStream(args(0))
//業務邏輯
val rs=datas.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
//打印結果集
rs.print
//啟動任務,需要使用上下文對象啟動
scc.start
//等待任務完成
scc.awaitTermination
}
}
Flume實時監控目錄sink到hdfs