1. 程式人生 > 實用技巧 >Spark Streaming 程式設計初級實踐

Spark Streaming 程式設計初級實踐

一、實驗目的 (1)通過實驗學習日誌採集工具 Flume 的安裝和使用方法; (2)掌握採用 Flume 作為 Spark Streaming 資料來源的程式設計方法。 二、實驗平臺 作業系統: Ubuntu16.04 Spark 版本:2.1.0 Flume 版本:1.7.0 三、實驗內容和要求 1.安裝 Flume Flume 是 Cloudera 提供的一個分散式、可靠、可用的系統,它能夠將不同資料來源的海量日誌資料進行高效收集、聚合、移動,最後儲存到一箇中心化資料儲存系統中。Flume 的核心是把資料從資料來源收集過來,再送到目的地。請到 Flume 官網下載 Flume1.7.0 安裝檔案,下載地址如下:http://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz或者也可以直接到本教程官網的 “ 下 載 專 區 ” 中 的 “ 軟 件 ” 目 錄 中 下 載apache-flume-1.7.0-bin.tar.gz。下載後,把 Flume1.7.0 安裝到 Linux 系統的“/usr/local/flume”目錄下,具體安裝和使用方法可以參考教程官網的“實驗指南”欄目中的“日誌採集工具 Flume 的安裝與使用方法”。 2. 使用 Avro 資料來源測試 Flume
Avro 可以傳送一個給定的檔案給 Flume,Avro 源使用 AVRO RPC 機制。請對 Flume的相關配置檔案進行設定,從而可以實現如下功能:在一個終端中新建一個檔案helloworld.txt(裡面包含一行文字“Hello World”),在另外一個終端中啟動 Flume 以後,可以把 helloworld.txt 中的文字內容顯示出來。 建立agent配置檔案 在/flume/conf/下建立檔案avro.conf,內容如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type 
= avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels
= c1 a1.sinks.k1.channel = c1

將helloworld.txt放在flume的主目錄路徑下。

啟動agent

./bin/flume-ng agent agent -c conf -f ./conf/avro.conf -n a1 -Dflume.root.logger=INFO,console

新建一個命令終端,使用avro-client傳送檔案

./bin/flume-ng avro-client -H localhost -p 4141 -F ./helloworld.txt

這是就可以在第一個命令終端中看到輸出的“hello world!”了

3. 使用 netcat 資料來源測試 Flume 請對 Flume 的相關配置檔案進行設定,從而可以實現如下功能:在一個 Linux 終端(這裡稱為“Flume 終端”)中,啟動 Flume,在另一個終端(這裡稱為“Telnet 終端”)中,輸入命令“telnet localhost 44444”,然後,在 Telnet 終端中輸入任何字元,讓這些字元可以順利地在 Flume 終端中顯示出來。 在“conf”目錄下建立“flume-conf.properties.example”檔案並編輯:
# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

啟動一個agent:

./bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties.example --name a1 -Dflume.root.logger=INFO,console

新開啟一個終端(Tenlent終端),輸入命令:

telnet localhost 44444

終端響應:

telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Hello world!
OK

輸入“Hello world!”後tenlent終端輸出ok,同時在Flume終端也會顯示Hello world!

4.使用 Flume 作為 Spark Streaming 資料來源 Flume 是非常流行的日誌採集系統,可以作為 Spark Streaming 的高階資料來源。請把 FlumeSource 設定為 netcat 型別,從終端上不斷給 Flume Source 傳送各種訊息,Flume 把訊息彙集到 Sink,這裡把 Sink 型別設定為 avro,由 Sink 把訊息推送給 Spark Streaming,由自己編寫的 Spark Streaming 應用程式對訊息進行處理。

http://dblab.xmu.edu.cn/blog/1357-2/