1. 程式人生 > >flumebase介紹(一)

flumebase介紹(一)

 

1.介紹

FlumeBase是一個建立在Flume之上的資料驅動的流處理系統。這個系統允許使用者動態的向資料收集環境插入查詢並且監控被Flume收集到的事件流。這些查詢可能是抽樣調查輸入的資料,也可能是指定固定的監控,也可能是資料轉換或者是質量過濾任務。這些查詢是用一個類似於SQL的名為“rtsql”語言編寫的。

   FlumeBase可以把資料反饋給一個在互動的shell環境下的使用者。它也可以配置成把輸出的事件流返回到Flume網路中,用於被其他的工具使用或者持久化到Hbase、HDFS或者其他的儲存介質中。

     這個系統強調對被Flume捕獲到的輸入資料的低延遲分析。“rtsql”語言的名字強調了系統的實時查詢特性,也說明了衍生於基於SQL的查詢語言語法。FlumeBase被期望於在把被Flume捕捉的資料被用類似於Hadoop一樣的工具去深入分析(可能是高延時的)之前,可以允許你執行有效的in-line資料轉換或者過濾,或者有時效性的訊息或者是一個更廣泛系統的調諧。

警告

FlumeBase是一個實驗性的系統!現在還不能滿足生產環境使用的要求。把這個系統連線到Flume生產環境的節點上可能會導致資料丟失,錯配置或者其他比較嚴重的問題。

這篇文件說明了怎樣安裝和配置FlumeBase系統。接著說明了一下rtsql語言,用於向執行環境提交查詢和控制終端客戶端的命令。這個文件的使用物件是:

l  系統管理員

l  資料分析師

l  資料工程師

2.快速開始

對於那些瞭解Flume、SQL,只是想知道FlumeBase可以做什麼的人,只需要按照一下部分中的步驟做就可以。這只是個在FlumeBase世界裡短短5分鐘的旅行。

首先拷貝一下文字到一個名為data.txt的檔案中:

1,aaron,purple,42
2,bob,blue,11
3,cindy,green,312

安裝Flume0.9.3,Hadoop0.20和Java1.6。如果你執行的是Cloudera的CDH3B4,你已經安裝了這些。比較老的版本的使用者需要進行更新。可以參考第3節的安裝部分獲得更詳細的說明。

解壓FlumeBase安裝包:

$ tar vzxf flumebase-(version).tar.gz

啟動FlumeBase的shell:

$ cd flumebase-(version)/
$ bin/flumebase shell

預設情況下,FlumeBase是被在這個shell相同程序中內嵌在FlumeBase伺服器和Flume本身的配置齊全的環境所配置的。現在讓我們定義一個基於這個檔案的流,並且查詢它。

rtsql> CREATE STREAM data(id int, name string, favcolor string,
    -> luckynumber int) FROM LOCAL SOURCE 'tail("/path/to/data.txt")';
CREATE STREAM
rtsql> SELECT * FROM data;

你在執行在一個本地的(代管的)Flume的邏輯節點上的流,這個節點讀取data.txt的所有行。你接著運行了一個從這個流中每一個事件提取所有的欄位的查詢。檔案中的每一行相當於一個不同的事件。

在另外一個終端中,現在執行下面的命令:

$ echo 4,dave,orange,611 >> /path/to/data.txt    

你應該可以觀察到,一旦Flume檢測到了一行新的記錄(大約一秒鐘的延遲),這行記錄將被傳遞給FlumeBase,並最終輸出到你的控制檯中。

這個提交的查詢已經建立了一個流,它會一直執行。如果更多的資料通過那個檔案進入Flume,我們將持續處理它。現在我們來取消這個流。

rtsql> \d 1

(隨著FlumeBase解除了內部的邏輯節點,可能會有Flume輸出的一個錯誤資訊,這很正常。通常情況下,執行在一個單一的程序中會比較雜亂,因為客戶端和伺服器的活動會被堆放在一個單一的控制檯中。對一個乾淨的會話體驗,請按照第三節安裝部分的說明,把客戶端和伺服器執行在單獨的程序中。

現在我們來執行另外一個查詢:

rtsql> SELECT favcolor FROM data WHERE luckynumber = 42;    

幾秒鐘以後,這個流將被Flume邏輯節點中的資料初始化。注意,我們只從我們的原始資料集中獲取一列。如果你通過向檔案中新增更多的行新增事件,當事件中有luckynumber列為42時,你將看到它們出現在FlumeBase的控制檯。

這就到了我們旅程的尾聲。要退出FlumeBase,請執行:

rtsql> \q    

這個使用者指定的剩餘部分將更詳細的描述如何配置多處理的配置、rtsql語言和shell操作。

3.安裝

3.1.先決條件

FlumeBase在你的機器上執行之前需要一些先決條件:

l  Java 6.0

l  Hadoop 0.20

l  Flume 0.9.3

因為FlumeBase是用Java實現,因此它也是跨平臺的。系統的測試只是在Linux上進行的。它似乎也可以在cywin或者其他的作業系統上執行,但是不能保證可以正常執行。

The following prerequisite knowledge is required to understand this documentation:

3.2.程式安裝

FlumeBase本身是用一個tar包釋出的。可以通過解壓一個tar包來安裝FlumeBase。

$ tar vzxf flumebase-(version).tar.gz      

這將解壓到一個名為flumebase-(version)/的目錄。

4.配置

預設情況下,FlumeBase被配置成執行在一個組合了互動式shell和執行引擎的單程序中。關閉shell也將關閉執行環境,包含執行著的所有查詢。這可能對評估FlumeBase來講很有用,但是對於比較正式的場景下,執行環境應該執行在一臺伺服器的守護程序上。客戶端被配置為連線到這個伺服器,或者使用者明確的指定去這樣做。

為了達到在評估FlumeBase中0配置的目的,FlumeBase程序也自己包含了一個內嵌的Flume主節點。如果要和一個已存在的流動資料來源互動,也需要重新配置為指向一個已經存在的Flume部署環境。

4.1.伺服器配置

在查詢執行引擎所要執行在的伺服器上安裝FlumeBase。接著編輯etc/flumebase-site.xml檔案,包含下列的值。

表1:FlumeBase伺服器的配置項

屬性

flume.home

The path to $FLUME_HOME on your server.

flumebase.remote.port

The port where the FlumeBase server listens for clients.

embedded.flume.master

This should be set to false if a Flume master is available. A value of true means that the FlumeBase environment acts as its own Flume master, separate from an existing Flume network.

flumebase.flume.master.host

The hostname of the foreign Flume master to connect to.

flumebase.flume.master.port

The port the foreign Flume master listens on.

flumebase.flume.collector.port.min/max

FlumeBase uses Flume collectors to receive data from the broader Flume network. set ...port.min and ...port.max to the range of ports on the FlumeBase server which the FlumeBase daemon may use for this purpose.

一些環境變數還控制了FlumeBase怎樣找出各種路徑,怎樣例項化它的JVM。對這些環境變數的預設值可以在一個etc/flumebase-env.sh 檔案中設定。為了方便已經有一個版本的模板檔案供參考。FlumeBase將使用一個pidfile來保證多個伺服器的例項不會競爭埠。Pidfile將被放在$FLUMEBASE_PID_DIR 指定的目錄中。一個執行在後臺的FlumeBase server把日誌寫入到$FLUMEBASE_LOG_DIR指定的目錄的一個檔案中。

最終,要執行在分散式的模式中,Flume的master節點需要註冊FlumeBase的外掛。你應該從FlumeBase的安裝目錄中複製flumebase-(version).jar 檔案到FlumeBase的master節點的/usr/lib/flume/lib下。接下來,編輯在master上的flume-site.xml去包含下面的設定:

<property>
  <name>flume.plugin.classes</name>
  <value>com.odiago.flumebase.flume.FlumePlugin</value>
</property>

你可能需要重啟一下Flume的master程序以使這些設定生效。

要學習怎樣啟動一個server,請看第七部分。

4.2.客戶端配置

在每個使用者用於提交查詢到FlumeBase系統的客戶端上都安裝一個FlumeBase的副本。這個客戶端必須可以對FlumeBase server開啟一個TCP連線。為了在FlumeBase的控制檯看到輸出的事件,FlumeBase server也必須可以反過來對客戶端建立一個TCP連線。

在client的機器上的etc/flumebase-site.xml 設定一下內容:

表2:FlumeBase的客戶端配置

屬性

flume.home

The path to $FLUME_HOME on the client.

flumebase.autoconnect

The host:port of the FlumeBase server to connect to. If set to local, this will use an in-process server. If set to none, the user must explicitly open a server connection with \open in the console.

flumebase.flow.autowatch

Defaults to true; this boolean property specifies whether you want every query to automatically send its output to the console when submitted. If false, you must explicitly watch flow output with the \watch command.

flumebase.console.port

FlumeBase uses a Thrift RPC connection to relay query output back to the client. The client listens on the port specified by this property.

5. 架構

FlumeBase系統是由一個命令列客戶端、一個成為執行環境的伺服器和負責收集和傳輸資料的Flume系統組成。這些可能被配置為單獨的、分散式的程序、或者相對的在一個機器上甚至是在一個程序中。

命令列客戶端是這裡面最簡單的元件。這個程序直接被一個使用者執行(可能是一個伺服器、但是更多的是他的桌面電腦或者筆記本)。它需要連線到執行環境。這個客戶端提供給使用者一個提示資訊,這個提示中可能會有新的查詢或者控制語句被給出。

每一個查詢(比如說,SELECT語句)會產生一個在執行環境中的流。使用者可能需要訂閱正在執行的流(這對於使用者新建立的流是自動完成的)。當一個訂閱的流發出一個輸出事件,它的文字將被列印到客戶端的終端上。

關閉客戶端不會終止任何已提交的流,它們執行在執行環境中,執行環境是一個單獨的可以被多個使用者共享的常駐程序。一個執行環境儲存著所有流的定義(用CREATE STREAM建立),並處理執行中的流。所以執行環境一般執行在專用的伺服器上。為了測試的需要,可能也會包含在和命令列客戶端相同的程序中。(當執行環境內嵌到客戶端的時候,關閉客戶端將關閉所有的流,並且不能夠在獲得這些流)

提交的查詢可以在流的資料上做計算。流被定義為一個事件集,就類似於在以表為基礎的SQL執行環境中的記錄。這些事件直接和Flume中的事件連線。使用者需要在查詢之前先定義這個流,這個定義指定了在這個事件中的欄位,包括怎樣把事件主體轉換成欄位和這個流起源於哪裡。每一個查詢流本身是一個流,它的輸出是一系列的事件,這些輸出的事件是基於使用者指定的計算處理過程和這個查詢流所輸入的事件集。

預設情況下,被使用者提交的查詢將導致一個沒有名稱標識的查詢流,這個查詢流只會把它的輸出傳送給訂閱的客戶端例項。這些查詢流在沒有使用者訂閱的時候持續的執行,但是這時候所生成的輸出結果將被丟棄掉(而且也沒辦法在此獲取)。

使用者可以給執行的查詢流繫結一個名稱(或者在使用CREATE STREAM AS SELECT語句提交一個查詢流的時候這樣做)。這個名稱被作為一個Flume邏輯節點的名稱,這個邏輯節點把這個查詢流的結果作為一個Avro編碼的事件集廣播出去。使用者可以接著使用Flume的shell把這個邏輯節點配置成把一個該輸出結果的拷貝定向到一個監控的應用,比如持久化儲存(比如HDFS)或者其他的應用。

FlumeBase通過修改在CREATE STREAM語句中指定的節點的接收器的定義來從一個Flume的網路讀取。當一個邏輯節點被確定為一個流的源頭,它的接收器定義被重寫為一個包含它原有的接收器和一個新的代理接收器的新的接收器,這個新的代理接收器負責把這個節點的輸出推送給在FlumeBase執行環境中的一個collector source 。(FlumeBase將擁有一個內嵌的Flume物理節點,這節點將擁有託管給它的一個邏輯節點作為接受和傳送事件流的必要條件).當一個流被丟棄掉(使用DROP STREAM或者\shutdown!直接關掉執行環境)的時候,最初的邏輯節點的定義會被恢復為提供資料流的那個邏輯節點。

在FlumeBase執行環境和Flume之間的互動式通過Flume的master節點的thrift介面完成的。寄宿在一個執行環境中的物理節點是被Flume的master節點所控制的,而且是對於所有的Flume節點都是這樣的。因為這個原因,查詢流可能需要花費幾秒鐘去初始化或者取消,因為這些都依賴於Flume方面的配置。一旦初始化成功,查詢流將在事件的處理上有較低的延時。如果沒有外部的Flume網路可用,你可以配置Flume的執行環境來維護一個內嵌的Flume的master節點,以用於測試或者單機計算的目的。

7.FlumeBase server

FlumeBase server允許一個不需要持續的shell會話的server程序去處理查詢。FlumeBase shell例項可以連線到一個本地或者遠端的server例項去提交查詢。在執行FlumeBase server之前請參考前面的講述配置好server。

7.1.啟動一個前臺server

FlumeBase server可以通過在FlumeBase的安裝目錄下執行bin/flumebase server來啟動一個前臺的server程序。

7.2.啟動一個後臺server

FlumeBase server也可以通過執行bin/flumebase start啟動在後臺執行。 執行日誌將被輸出到$FLUMEBASE_LOG_DIR(預設為$FLUMEBASE_HOME/logs)中的一個日誌檔案。如果server出現不正常的情況,執行bin/flumebase start -debug 可以開啟除錯的日誌。

7.3.關閉server

可以在FlumeBase的shell中停止一個執行的server。要知道怎樣操作,請參考8.2節。

要從命令列關閉server,可以通過執行bin/flumebase stop 完成。這可能需要花費幾秒鐘,因為FlumeBase需要傳送一套指令到Flume的master節點以還原它的流到初始的狀態並拔除執行的查詢。一個強制的殺掉(比如,kill 9)FlumeBase server 程序的方式是不推薦的。

8.FlumeBase shell

FlumeBase shell使得使用者可以和FlumeBase的環境互動。預設的FlumeBase的配置是連線到在相同的程序中的一個作為shell的單一執行緒。你也可以連線到一個執行在其他程序(可能是相同的機器或者是不同的機器)中的遠端執行環境。

FlumeBase shell可以被用來發送rtsql語句到執行環境。它還定義了一些用於使用者對shell或者執行環境本身的控制命令。

8.1.啟動shell

要啟動FlumeBase的shell,請在FlumeBase的安裝目錄下執行bin/flumebase shell

8.2.連線到執行環境

這如上面所說的,FlumeBase的預設設定檔案會導致自動連線到本地的環境。你可以執行下面的命令連線到遠端的環境:

rtsql> \open server [port]

你可以執行下面的命令明確的連線到本地環境:

rtsql> \open local

shell一次只能連線到一個環境。一個\open命令自動斷開之前連線的任何執行環境。你可以執行下面的銘刻明確地斷開與執行環境的連線:

rtsql> \disconnect

你可以使用如下的命令關閉shell:

rtsql> \q

下面的命令與上面的功能相同:

rtsql> exit;

你可以執行下面的命令關閉執行環境(這將關閉所有執行著的查詢流):

rtsql> \shutdown!

8.3.監控查詢流

每一個SELECT語句都被例項化成在執行環境中的一個查詢流。查詢流是持久的,它們無限期的連續從對應的Flume資料來源讀取資料,即使和客戶端斷開了連線。

你可以執行下面的命令得到一個所有執行著的查詢流的清單:

rtsql> \f

下面給出了上面的命令返回的資訊的各欄位含義:

表6.執行的流清單的欄位

描述

Watch?

如果你正在你的控制檯監視這個流的輸出,這個列就包含一個*。

FlowId

該查詢流對應的數值ID。對流的控制的命令會用到這個ID 。

Stream

如果這個查詢流的輸出是作為一個流(比如CREATE STREAM foo AS SELECT...),這個流的名稱(foo)就會出現在這兒了

Query

用於建立該流的rtsql的查詢語句

預設情況下,當你提交一個查詢流時(也就是執行一個SELECT語句),你就在監視它的輸出。被這個查詢流生成的任何事件都將被列印到你的控制檯。你可以通過\u 或者 \unwatch命令取消對一個查詢流的監控。

下面的命令取消了對flowid為3的查詢流的監視:

rtsql> \u 3          

(你可以先通過\f命令找到一個查詢流的flowId)

你可以使用\w或者\watch命令重新訂閱一個查詢流。下面的命令重新訂閱了上面相同查詢流的輸出:

rtsql> \w 3

你可以配置你的會話不在你建立查詢流的時候自動的監視它的輸出。你可以接下來明確的監視你建立的任何查詢流,如果你需要檢視它的輸出的話。這個特性是被在回話配置中的flumebase.flow.autowatch 配置項所控制。

8.4. 控制查詢流

你可以使用\d或者\D命令完全取消一個查詢流。這個命令後面需要有一個要取消的查詢流的FlowId 。\d的命令不阻塞,\D將一直等到查詢流完成之後再返回。

rtsql> \d 3

如果一個查詢流是通過CREATE STREAM AS SELECT 語句建立的,和這個查詢流相關的流的名稱可以通過\dname命令在不停止這個查詢流的情況下被移除。

rtsql> CREATE STREAM foo AS SELECT x FROM bar;
Started flow: flow[mId=5]
rtsql> \dname 5
從查詢流5中移除了流的名稱

一個新的流的名稱接下來可以關聯到相同的查詢流上:

rtsql> \name 5 baz
在查詢流5上建立了名為“baz”的名稱。

一個流的名稱可以通過\name命令被關聯到任何執行著的查詢流上。這用相同的名稱建立了一個新的Flume邏輯節點作為流。它的源頭帶有包含了被查詢流輸出的欄位的avro表示形式。一個查詢流一次只能和一個流的名稱相關聯。

8.5. 控制會話配置

每一個客戶端都會有一個和它相關的配置,這最初是由配置檔案來初始化的。這些配置是一個鍵值對的集合,這裡的鍵和值都是字串。這些配置資訊可以使用\set命令檢視。

rtsql> \set
io.seqfile.compress.blocksize = '1000000'
io.skip.checksum.errors = 'false'
fs.checkpoint.size = '67108864'
...

配置的鍵是分層次的。你可以通過輸入\set prefix.: 檢視任何一個字首後跟一個”.”的配置鍵的資訊。

rtsql> \set flumebase.
flumebase.flume.master.port = '35873'
flumebase.flume.master.host = 'localhost'
flumebase.autoconnect = 'local' 

你也可以檢視任何指定的鍵的資訊:

rtsql> \set flumebase.autoconnect
flumebase.autoconnect = 'local'

這個命令也可以被用來給鍵設定相應的值:

rtsql> \set flumebase.flume.master.port=12345
flumebase.flume.master.port = '12345'  

設定配置值並不影響之前提交的任何查詢流。新建立的查詢流可能會被所設定的配置鍵控制。

8.6. 其他的命令

任何時候輸入一個命令,都可以通過追加一個\c到當前行的方式把這個命令取消。

rtsql> SELECT something where I made a typo
    -> and then continued \c
rtsql> 

可以通過輸入help或者\h來獲取幫助資訊,這將給出所有可以使用的命令的清單:

rtsql> \h
All text commands must end with a ';' character.
Session control commands must be on a line by themselves.
Session control commands:
  \c                    Cancel the current input statement.
  \d flowId             Drop the specified flow.
  \D flowId             Drop a flow and wait for it to stop.
...