1. 程式人生 > >使用Flume收集資料

使用Flume收集資料

安裝並配置Flume

下載解壓flume NG二進位制檔案apache-flume-1.6.0-bin.tar.gz

$ tar -xzf /opt/apache-flume-1.6.0-bin.tar.gz

建立符號連結,指向Flume安裝路徑

$ ln -s /opt/apache-flume-1.6.0 /opt/flume

定義環境變數,設定PATH

FLUME_HOME=/opt/flume

PATH=${FLUME_HOME}/bin:${PATH}

驗證已把Hadoop函式庫路徑加入到CLASSPATH變數中(Flume要用到Hadoop函式庫,因此需要檢查Classpath,確認其中已包含了Hadoop函式庫的目錄)

$ echo ${CLASSPATH}

在Hadoop目錄下建立Flume的conf路徑

$ mkdir /home/hadoop/flume/conf

把所需的檔案拷貝到剛建立的conf目錄中

$ cp /opt/flume/conf/log4j.properties /home/hadoop/flume/conf

$ cp /opt/flume/conf/flume-env.sh.sample /home/hadoop/flume/conf/flume-env.sh

編輯flume-env.sh並設定JAVA_HOME

Flume各個部件

Flume程序的配置包含三個部件:信源、信宿和通道。Flume還提供了一個自定義信源、通道和信宿的介面。

信源在接收到足夠資料可以生成一個Flume事件時,它會把新建立的事件發給通道,但如何處理事件卻是對信源不可見的。在大多數情況下,事件就相當於一行接一行的文字。但是,事件並非全部是文字資料。例如,UDP syslogd信源所接收到的每個資料包當做一個事件,並在系統中傳輸。Flume NG支援使用一種序列生成器作為信源(主要用於測試),以及讀取syslogd資料的信源(既支援TCP也支援UDP)

Flume支援logger、file_roll、HDFS、HBase、Avro(用於代理鏈)、null(用於測試)和IRC(用於網際網路中繼聊天服務)信宿,信宿等著從通道接收事件,接收到資料後,會把事件分發給各自的目標主機,還負責處理超時、重試以及迴圈之類的問題。

logsink和filesink完成的任務完全相同,log信宿比其它信宿更適合用作除錯工具,它不僅僅記錄Flume捕獲的信源資訊,同時加入了許多元資料和事件。然而,檔案信宿準確地記錄輸入資料,因為這些資料經過Flume時沒有發生任何變化(如果要修改資料,也是可以實現的)。在大多數情況下,用filesink捕獲輸入資料,但在開發過程中可能需要用到logsink。

通道負責傳輸事件的通訊機機制和保留機制。因為信源和信宿在讀寫資料方面可能有較大差異,或者信源有時需要暫停資料寫入(例如在切換到新檔案的時候,或處理系統I/O阻塞的時候),所以需要通道在信源和信宿之間快取資料。使用Memory通道時,代理從信源把事件讀入記憶體,然後傳給信宿,若代理程序在該過程中崩潰,那麼此時Memory通道中的所有資料都會永久丟失。而file和JDBC通道會永久儲存事件,以防這種意外的資料丟失。在從信源讀取事件之後,file通道將事件內容寫入檔案系統上的檔案,只有代理成功將事件傳給信宿後,才會刪除該檔案。類似地,JDBC通道使用一個內嵌的Derby資料庫以可恢復的形式儲存事件。

配置檔案

新建agent1.conf檔案,並儲存至Flume的工作路徑(/home/hadoop/flume),下面前三行中的每一行都可以有多個值,以空格為分隔符。為了簡便可以在一個配置檔案中完成多個代理的設定。某個代理也可以包含多個流

agent1.sources=netsource execsource avrosource

agent1.sinks=logsink filesink hdfssink avrosinkavro信宿不會把事件寫入本地檔案呀HDFS檔案,而是把事件發給遠端Avro信源

agent1.channels=memorychannel filechannel jdbcchannel

agent1.sources.netsource.type=netcat(監聽某個埠上的網路連線)

agent1.sources.netsource.bind=localhost

agent1.sources.netsource.port=3000

agent5.sources.netsource.interceptors=ts(加入攔截器,在從信源向信宿傳輸事件的過程中操作和修改事件。不僅可以實現資料的自動分塊,簡化了資料管理,而且便於MapReduce作業使用這些資料)

agent5.sources.netsource.interceptors.ts.type=org.apache.flume.interceptor.TimestampInterceptor

agent2.sources.execsource.type=exec(該信源在主機上執行一條命令並將輸出作為Flume代理的輸入。exec信源能使用多條命令以生成持續的資料流,還可以配置在命令終止的時候重啟命令)

agent2.sources.execsource.command=cat /home/hadoop/message(啟動代理前,$ echo "Hello again Flume!"> /home/hadoop/message)

agent3.sources.avrosource.type=avro(avro是一個數據序列化框架,負責對資料進行封包,並所資料從網路中的一個地方傳到另一個地方)

agent3.sources.avrosource.bind=localhost

agent3.sources.avrosource.port=4000

agent3.sources.avrosource.threads=5

agent1.sinks.logsink.type=logger(把輸出寫入一個日誌檔案)

agent2.sinks.filesink.type=FILE_ROLL

agent2.sinks.filesink.sink.directory=/home/hadoop/flume/files

agent2.sinks.filesink.sink.rollInterval=0(預設情況下,Flume每隔30秒將輸出滾動寫入到一個新檔案中。為了便於在單個檔案中跟蹤Flume的輸出,這裡設定為0)

agent4.sinks.hdfssink.type=hdfs

agent4.sinks.hdfssink.hdfs.path=/flume(最好把這些資料寫入位置視為臨時集結區,Flume先把資料寫入這個位置再進行處理。在資料處理結束之後,這些資料應該被移到長期目錄結構)

agent5.sinks.hdfssink.hdfs.path=/flume-%Y-%m-%d

agent4.sinks.hdfssink.hdfs.filePrefix=log-

agent4.sinks.hdfssink.hdfs.rollInterval=0

agent4.sinks.hdfssink.hdfs.rollCount=3(每個檔案最多隻能儲存3條資料)

agent4.sinks.hdfssink.hdfs.fileType=DataStream

agent6.sinks.avrosink.type=avro

agent6.sinks.avrosink.hostname=localhost

agent6.sinks.avrosink.port=4000

agent1.channels.memorychannel.type=memory

agent1.channels.memorychannel.capacity=1000

agent1.channels.memorychannel.transactionCapacity=100

agent2.channels.filechannel.type=file

agent2.channels.filechannel.checkpointDir=/home/hadoop/flume/fc/checkpoint

agent2.channels.filechannel.dataDirs=/home/hadoop/flume/fc/data

agent3.channels.jdbcchannel.type=jdbc

agent1.sources.netsource.channels=memorychannel

agent1.sources.logsink.channel=memorychannel

agent2.sources.execsource.channels=filechannel

agent2.sinks.filesink.channel=filechannel

agent7.sinks.netsource.channels=memorychannel1 memorychannel2

agent7.sources.netsource.selector.type=replicating(信源選擇器有兩種工作模式:replicating;multiplexing,它會依據事件的特定頭部欄位值判斷向哪個通道發事件)

啟動一個Flume代理

$ flume-ng agent --conf conf --conf-file agent1.conf --name agent1 -Dflume.root.logger=INFO,console(日誌輸出到控制檯,即同時輸出到代理視窗)

在另一個視窗中,遠端連線本地主機的3000埠,然後輸入一些文字

$ curl telnet://localhost:3000 (開啟遠端連線會話的傳統方式是使用telnet程式)

使用Ctrl+C關閉curl遠端會話

檢視位於Flume工作目錄下的flume.log檔案內容,當中就是輸入的文字

$ tail flume.log

若是使用agent3配置(從遠端客戶端接收資料),則在另一個視窗中,使用Flume Avro客戶端向agent3傳送檔案,avro客戶端可用於讀取檔案內容並把它發給網路上任何位置的avro信源

$ flume-ng avro-client -H localhost -p 4000 -F /home/hadoop/message2

若使用agent4配置,啟動代理並curl傳送事件後,檢視輸出目錄中的檔案內容。Flume用.tmp字尾標記正在寫入的檔案,而MapReduce作業只處理完整檔案。

$ hdfs dfs -ls /flume

$ hdfs dfs -cat "/flume/*"

agent6是實現多層Flume網路,信源和信宿都是avro型別。Flume支援使用者構建非常複雜的分散式事件收集網路,不同型別的多個代理可以把事件會給鏈條中的下一個代理,成為事件匯集點。

agent7把事件寫入多個信宿,設定信源選擇器的型別為replicating,也就是說,所有事件都會同時傳送給定義的兩個通道。

信宿處理器把所有信宿看做一個信宿組,它會依據各個信宿的型別在事件到達時採取不同的措施。Flume定義了兩種信宿處理器:故障恢復(failover)和負載均衡(loadbalancing)信宿處理器。loadbalancing每次向信宿傳送一個事件,它採用輪詢或隨機演算法選擇下次要使用的信宿。如果某個信宿出現故障,信宿處理器會向另一個信宿傳送相同事件,但發生故障的信宿仍然保留在信宿池中。與此不同,failover把所有信宿視為一個優先表,只有高優先順序信宿發生故障後,它才會使用低優先順序信宿。它會從優先表中刪除發生故障的信宿,在經過一段冷卻期後重試該信宿是否修復。

Sqoop和Flume

根據資料型別來選擇。在很大程度上,Flume被設計成用於處理日誌資料。如果使用者在多個關係資料庫中存有日誌資料,使用Flume應該是個不錯的選擇。

非日誌資料可能需要執行一些只有Sqoop才能完成的操作。如指定目標列的子集,需要處理結構化資料的各個欄位,想要與Hive整合,此時無法使用Flume完成。當然,這些工具能協同完成更復雜的任務。我們可以使用Flume把事件匯聚到HDFS,使用MapReduce進行處理,然後通過Sqoop匯出到一個關係資料庫中。