1. 程式人生 > >flume-1.7.0 簡單使用

flume-1.7.0 簡單使用

在上一篇中,我們安裝了 flume-ng,這一篇我們就來簡單使用一下。

這裡寫圖片描述

官網上是這麼介紹的,我們需要指定一個配置檔案,需要定義一個 agent 的名稱,然後我們就可以使用 flume-ng 命令來啟動了。

1 編寫配置檔案

我們先拿官網上的例子來跑一下看看,就使用 example.conf 檔案:

[[email protected] conf]# pwd
/usr/hadoop/flume-1.7.0-bin/conf
[[email protected] conf]# vi example.conf 

# example.conf: A single-node Flume configuration
# Name the components on this agent # 定義一個 agent 的元素 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source # 配置 source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink # 配置 sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory
# 定義 channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel # 用 channel 連線起來 source 和 sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

這裡我簡單寫了一下注釋,這可不是翻譯過來的昂,大家看的時候別被我誤導了。具體概念大家還是看官網咖。
其中有四個概念:
這裡寫圖片描述


這是官網上給的示意圖,我們可以這樣理解:
WebServer 想給 HDFS 送點東西 (source) ,於是就找了 flume 這個 agent(代理) ,然後 WebServer 把 source 給了 agent , agent 拿到之後,用它自己的手段(channel),可能是”物流“,然後到了離 HDFS 最近的”快遞分揀點“(sink),把東西給了 HDFS。
這張圖只是示意圖,別不是對於 example.conf 的解釋,希望大家別想多了。

1.1 source

別聽我胡扯,我只是為了方便理解,自己意淫的。
那麼,對於這個 example.conf 的配置檔案,我們定義了一個叫 a1 的 agent,然後 source 源配置的是 netcat 型別,對於 netcat source 需要配置的內容官網上寫的很清楚:
這裡寫圖片描述

黑色加粗的幾項是必須配置的,對於其他幾項是可選項。

1.2 channel

接下來是 channel,也就是我們要選那一種”物流“,這裡我們用的是 memory,我們需要配置的是:
這裡寫圖片描述

1.3 sink

我們的”物流分揀點“,sink 我們配置的是 logger,需要配置幾項是:
這裡寫圖片描述

可能你已經被我誤導了,對於他們真正的解釋還是看官網,獲取你對它們的理解,把這些東西轉換成為你能夠理解的東西就行了。

2 啟動 agent

2.1 啟動

上面我們已經說過了,使用 flume-ng 命令啟動。
這裡寫圖片描述

具體引數看上圖。

[[email protected] conf]# pwd
/usr/hadoop/flume-1.7.0-bin/conf

[[email protected] conf]# flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

Info: Including Hadoop libraries found via (/usr/hadoop/hadoop-2.6.4/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/usr/hadoop/hbase-1.2.3/bin/hbase) for HBASE access
...

16/11/18 19:34:27 INFO node.Application: Starting Channel c1
16/11/18 19:34:27 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
16/11/18 19:34:27 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
16/11/18 19:34:27 INFO node.Application: Starting Sink k1
16/11/18 19:34:27 INFO node.Application: Starting Source r1
16/11/18 19:34:27 INFO source.NetcatSource: Source starting
16/11/18 19:34:27 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]

想必大家也都注意到了,我所在的目錄是 conf,因為我們需要指定一個配置檔案,需要它的絕對路徑。然後看到上面的情況就表示我們的名為 a1 的 agent 啟動成功了。由於我們配置的 sink 是 logger ,並且指定了一些列引數,把內容輸出到我們的控制檯。

2.2 傳送訊息

接下來我們就可以再開一個終端,在這個終端上執行下面一系列命令:
這裡寫圖片描述

可能有些朋友的Linux上會提示 command not found,只要安裝一下 telnet 就可以了。這裡我們可以看到一個”Connection refused“,是因為我們的 /etc/hosts 檔案中 localhost 對應的 ”::1…“這樣形式的 ip 沒有識別成功,它嘗試了”127.0.0.1“就連上了。具體的還是要看關於網路這塊兒的內容,我就不細講了。

這個時候,我們可以在這兒輸入一些東西:
這裡寫圖片描述

然後我們回到之前執行著 agent 的那個終端:
這裡寫圖片描述

會看到多了這樣一行內容,可能有時候這一行顯示的內容比我們輸入的內容要少,並不是沒有接收到,是因為超過了它能顯示的長度,給省略掉了。

這個時候應該就有所體會了,flume 是”很多個形容詞“的
這裡寫圖片描述

日誌採集系統。

我們再來寫幾個例項,來體會一下。

3. avro ⇒ hdfs

我們的 source 是 avro,sink 是 hdfs,(我這種說法嚴格來說是不正規的,但是我不知道怎麼說你們能夠理解,暫時就先這樣認為)。
那麼,先找找看 avro source 需要配置的是什麼。

3.1 配置 avro source

這裡寫圖片描述
我們需要配置有四項,並且下面也給出了示例。

[[email protected] conf]# vi AvroHDFS.conf
a1.sources = r1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = master
a1.sources.r1.port = 4141

3.2 配置 channel

這次我們還是使用 memory:
這裡寫圖片描述

我們在 AvroHDFS.conf 檔案中追加:

[[email protected] conf]# vi AvroHDFS.conf
a1.sources = r1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = master
a1.sources.r1.port = 4141

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

3.3 配置 sink

我們是向 HDFS 上導資料,所以我們使用 hdfs sink:

這裡寫圖片描述
這部分圖片太長了,放上來大家也看不清,於是我就只截了示例,大家還是去官網上看看怎麼配置吧。

我們接著在 AvroHDFS.conf 檔案中追加:

[[email protected] conf]# vi AvroHDFS.conf
a1.sinks = k1
a1.sources = r1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = master
a1.sources.r1.port = 4141

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /fromflume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events.
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true

這樣我們就定義好了一個從 avro 到 hdfs 的配置檔案,我們可以啟動 agent 了。

3.4 啟動 agent

這次我們需要先執行起來 hadoop 叢集,不然是會失敗的。

[[email protected] conf]# flume-ng agent --conf conf --conf-file AvroHDFS.conf --name a1 -Dflume.root.logger=INFO,console

Info: Including Hadoop libraries found via (/usr/hadoop/hadoop-2.6.4/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/usr/hadoop/hbase-1.2.3/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/usr/hadoop/apache-hive-2.1.0-bin) for Hive access
...

16/11/18 20:46:13 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
16/11/18 20:46:13 INFO source.AvroSource: Avro source r1 started.

這樣就表示我們的 agent 啟動成功了。不過接下來我們需要使用API 了,java程式如下:

import java.nio.charset.Charset;

import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;

public class FlumeDemo {
    private String hostname;
    private int port;
    private RpcClient client;

    public FlumeDemo(String hostname,int port) {
        this.hostname = hostname;
        this.port = port;
        this.client = RpcClientFactory.getDefaultInstance(hostname, port);
    }

    public void sendMessage(String data){
        Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
        try {
            client.append(event);
        } catch (EventDeliveryException e) {
            e.printStackTrace();
        }
    }

    public void cleanUp(){
        client.close();
    }

    public static void main(String[] args) {
        FlumeDemo rpcClient = new FlumeDemo("master", 4141);

        String data = "testing ";

        for(int i=0;i<10;i++){
            rpcClient.sendMessage(data + i);
        }

        rpcClient.cleanUp();
    }
}

然後,執行我們的 java 程式,這個時候,觀察我們的 agent 是什麼狀況:

...
16/11/18 20:46:13 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
16/11/18 20:46:13 INFO source.AvroSource: Avro source r1 started.

16/11/18 20:46:23 INFO ipc.NettyServer: [id: 0x33dfe52f, /192.168.38.1:64375 => /192.168.38.129:4141] OPEN
16/11/18 20:46:23 INFO ipc.NettyServer: [id: 0x33dfe52f, /192.168.38.1:64375 => /192.168.38.129:4141] BOUND: /192.168.38.129:4141
16/11/18 20:46:23 INFO ipc.NettyServer: [id: 0x33dfe52f, /192.168.38.1:64375 => /192.168.38.129:4141] CONNECTED: /192.168.38.1:64375
16/11/18 20:46:23 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false
16/11/18 20:46:23 INFO ipc.NettyServer: [id: 0x33dfe52f, /192.168.38.1:64375 :> /192.168.38.129:4141] DISCONNECTED
16/11/18 20:46:23 INFO ipc.NettyServer: [id: 0x33dfe52f, /192.168.38.1:64375 :> /192.168.38.129:4141] UNBOUND
16/11/18 20:46:23 INFO ipc.NettyServer: [id: 0x33dfe52f, /192.168.38.1:64375 :> /192.168.38.129:4141] CLOSED
16/11/18 20:46:23 INFO ipc.NettyServer: Connection to /192.168.38.1:64375 disconnected.

16/11/18 20:46:24 INFO hdfs.BucketWriter: Creating /fromflume/events/16-11-18/2040/00/events..1479473183696.tmp
...

這個時候,我們可以看到,已經在往 HDFS 上寫資料了,我們可以通過WebUI(通過瀏覽器訪問:http://master:50070 )開看看 hdfs 上是不是多了 /fromflume/… 的資料夾
這裡寫圖片描述

果然,我們的資料夾已經建立成功了,我們可以一級一級的進去,會看到:
這裡寫圖片描述

檔案已經成功寫入,我們檢視的時候肯定是各種亂碼的…但是我們是已經成功寫入了。

4 avro ⇒ kafka

結合我昨天寫的,我們來寫一下 flume 和 kafka 結合的一個例子。兩者有點相似,都是”代理/中介“。

有了前面兩個練習,那這個我就不寫的那麼詳細了。

這裡寫圖片描述
但是,對於這個 kafka sink 還真是有好多需要說的:

4.1 sinks.type

這個必須設定成”org.apache.flume.sink.kafka.KafkaSink“,你們說這個也是奇怪,這一項沒有預設值,要設定的時候還 ” Must be set to org.apache.flume.sink.kafka.KafkaSink“。

4.2 kafka.topic

關於這一項,那裡也有提到,大概意思就是,當 topic 在 event 的 header 中時,kafka 的 broker 中原來跟這個 topic 同名的就會被覆寫。……我不想翻譯了,我有點繞暈了。等我想好了再來補充這一部分。
這一塊暫時還用不到,現在不理解也不打緊。先記住有這麼一茬。

好了,我們來寫一下 sink :

[[email protected] conf]# vi AvroKafka.conf
      1 a1.sources = r1
      2 a1.sinks = k1
      3 a1.channels = c1
      4 
      5 a1.sources.r1.type = avro
      6 a1.sources.r1.bind = master
      7 a1.sources.r1.port = 44444
      8 
      9 
     10 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
     11 a1.sinks.k1.kafka.bootstrap.servers = localhost:9092,localhost:9093
     12 a1.sinks.k1.kafka.topic = fromFlume 
     13 
     14 a1.channels.c1.type = memory
     15 a1.channels.c1.capacity = 1000
     16 a1.channels.c1.transactionCapacity = 100
     17 
     18 a1.sources.r1.channels = c1
     19 a1.sinks.k1.channel = c1

要 flume 和 kafka 結合,這次我們是讓 kafka 消費 flume 發出去的資料。
那麼,我們需要啟動 kafka 的服務,並且建立一個消費者,為了演示,這次我們就在 master 上啟動兩個 broker 好了。

[[email protected] config]# pwd
/usr/hadoop/kafka_2.11-0.10.1.0/config
[[email protected] config]# kafka-server-start.sh server.properties &
[1] 10693
...
[[email protected] config]# kafka-server-start.sh server1.properties &
[2] 10962
...
[[email protected] config]# jps
11233 Jps
2593 ResourceManager
10962 Kafka
2692 NodeManager
10693 Kafka
3034 QuorumPeerMain
2171 NameNode
2269 DataNode
2446 SecondaryNameNode
[[email protected] config]# 
[[email protected] config]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic fromFlume --from-beginning

這個時候,我們就可以啟動我們的 flume agent 了:

[[email protected] conf]# flume-ng agent --conf conf --conf-file AvroKafka.conf --name a1 -Dflume.root.logger=INFO,console

...

不知道大家有沒有暈掉,反正我當初學的時候是暈了好一陣兒呢。^8^

接下來,我們還使用之前的 java 程式:
不過這時候的埠號需要修改一下,記住,這個埠號需要跟我們的 *.conf 檔案中配置的埠號一致。


import java.nio.charset.Charset;

import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;

public class FlumeDemo {
    private String hostname;
    private int port;
    private RpcClient client;

    public FlumeDemo(String hostname,int port) {
        this.hostname = hostname;
        this.port = port;
        this.client = RpcClientFactory.getDefaultInstance(hostname, port);
    }

    public void sendMessage(String data){
        Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
        try {
            client.append(event);
        } catch (EventDeliveryException e) {
            e.printStackTrace();
        }
    }

    public void cleanUp(){
        client.close();
    }

    public static void main(String[] args) {
        FlumeDemo rpcClient = new FlumeDemo("master", 44444);

        String data = "Hello World! ";

        for(int i=0;i<10;i++){
            rpcClient.sendMessage(data + i);
        }

        rpcClient.cleanUp();
    }
}

執行程式,然後,我們去到啟動 kafka 消費者的那個終端上,我們會看到:

[2016-11-18 21:46:54,915] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-11-18 21:47:08,073] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
Hello World! 0
Hello World! 1
Hello World! 2
Hello World! 3
Hello World! 4
Hello World! 5
Hello World! 6
Hello World! 7
Hello World! 8
Hello World! 9

不知道大家想到了什麼,反正我是一度這跟輸出重定向似的。我都說了,你們別聽我胡扯^v^。
這些還都只是簡單的小例項,大家可以好好看看文件,看看 flume 都是有什麼 sources、channels、sinks,自己整著玩玩兒,玩著玩著就上手了。