1. 程式人生 > 實用技巧 >學習進度筆記7

學習進度筆記7

今天完成spark實驗6:Spark Streaming程式設計初級實踐。

1、安裝Flume

Flume 是 Cloudera 提供的一個分散式、可靠、可用的系統,它能夠將不同資料來源的海量 日誌資料進行高效收集、聚合、移動,最後儲存到一箇中心化資料儲存系統中。Flume 的 核心是把資料從資料來源收集過來,再送到目的地。請到 Flume 官網下載 Flume1.7.0 安裝文 件,下載地址如下: http://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz 或者也可以直接到本教程官網的 “ 下 載 專 區 ” 中 的 “ 軟 件 ” 目 錄 中 下 載 apache-flume-1.7.0-bin.tar.gz。 下載後,把 Flume1.7.0 安裝到 Linux 系統的“/usr/local/flume”目錄下,具體安裝和使用方法可以參考教程官網的“實驗指南”欄目中的“日誌採集工具 Flume 的安裝與使用方法”。

2、使用Avro資料來源測試Flume

Avro 可以傳送一個給定的檔案給 Flume,Avro 源使用 AVRO RPC 機制。請對 Flume 的相關配置檔案進行設定,從而可以實現如下功能:在一個終端中新建一個檔案 helloworld.txt(裡面包含一行文字“Hello World),在另外一個終端中啟動 Flume 以後, 可以把 helloworld.txt 中的文字內容顯示出來。

⑴agent 配置檔案

然後,我們在 avro.conf 寫入以下內容

上面 Avro Source 引數說明如下: Avro Source 的別名是 avro,也可以使用完整類別名 稱 org.apache.flume.source.AvroSource,因此,上面有一行設定是 a1.sources.r1.type = avro,表示資料來源的型別是 avro。bind 繫結的 ip 地址或主機名,使用 0.0.0.0 表示繫結機 器所有的介面。a1.sources.r1.bind = 0.0.0.0,就表示繫結機器所有的介面。port 表示繫結 的埠。a1.sources.r1.port = 4141,表示繫結的埠是 4141。a1.sinks.k1.type = logger, 表示 sinks 的型別是 logger。

⑵啟動 flume agent a1

⑶建立指定檔案

先開啟另外一個終端,在/usr/local/flume 下寫入一個檔案 log.00,內容為 hello,world:

啟動日誌控制檯

我們再開啟另外一個終端,執行:

此時我們可以看到第一個終端(agent 視窗)下的顯示,也就是在日誌控制檯,就會把 log.00 檔案的內容打印出來:

avro source 執行成功!

3、使用netcat資料來源測試Flume

請對 Flume 的相關配置檔案進行設定,從而可以實現如下功能:在一個 Linux 終端(這裡稱為“Flume 終端”)中,啟動 Flume,在另一個終端(這裡稱為“Telnet 終端”)中, 輸入命令“telnet localhost 44444”,然後,在 Telnet 終端中輸入任何字元,讓這些字元可以 順利地在 Flume 終端中顯示出來。

⑴建立 agent 配置檔案

在 example.conf 裡寫入以下內容:

1.#example.conf: A single-node Flume configuration
2.# Name the components on this agent
3.a1.sources = r1
4.a1.sinks = k1
5.a1.channels = c1
6.# Describe/configure the source
7.a1.sources.r1.type = netcat
8.a1.sources.r1.bind = localhost
9.a1.sources.r1.port = 44444
10.#同上,記住該埠名
11.# Describe the sink
12.a1.sinks.k1.type = logger
13.# Use a channel which buffers events in memory
14.a1.channels.c1.type = memory
15.a1.channels.c1.capacity = 1000
16.a1.channels.c1.transactionCapacity = 100
17.# Bind the source and sink to the channel
18.a1.sources.r1.channels = c1
19.a1.sinks.k1.channel = c1 

⑵啟動 flume agent (即開啟日誌控制檯):

再開啟一個終端,輸入命令:telnet localhost 44444

然後我們可以在終端下輸入任何字元,第一個終端的日誌控制檯也會有相應的顯示,如 我們輸入”hello,world”,得出

第一個終端的日誌控制檯顯示:

netcatsource 執行成功!

這裡補充一點,flume 只能傳遞英文和字元,不能用中文.

4、使用Flume作為Spark Streaming資料來源

Flume 是非常流行的日誌採集系統,可以作為 Spark Streaming 的高階資料來源。請把 Flume Source 設定為 netcat 型別,從終端上不斷給 Flume Source 傳送各種訊息,Flume 把訊息彙集 到 Sink,這裡把 Sink 型別設定為 avro,由 Sink 把訊息推送給 Spark Streaming,由自己編寫的Spark Streaming 應用程式對訊息進行處理。

⑴配置 Flume 資料來源

請登入 Linux 系統,開啟一個終端,執行如下命令新建一個 Flume 配置檔案 flume-to-spark.conf:

flume-to-spark.conf 檔案中寫入如下內容:

#flume-to-spark.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 33333
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port =44444
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 1000000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

在上面的配置檔案中,我們把 Flume Source 類別設定為 netcat,繫結到 localhost 的 33333 埠,這樣,我們後面就可以通過“telnet localhost 33333”命令向 Flume Source 發 送訊息。

同時,我們把 Flume Sink 類別設定為 avro,繫結到 localhost 的 44444 埠,這樣, Flume Source 把採集到的訊息彙集到 Flume Sink 以後,Sink 會把訊息推送給 localhost 的 44444 埠,而我們編寫的 Spark Streaming 程式一直在監聽 localhost 的 44444 埠,一 旦有訊息到達,就會被 Spark Streaming 應用程式取走進行處理。

特別要強調的是,上述配置檔案完成以後,暫時“不要”啟動 Flume Agent,如果這個時 候使用“flume-ng agent”命令啟動 agent,就會出現錯誤提示“localhost:44444 拒絕連線”,也 就是 Flume Sink 要傳送訊息給 localhost 的 44444 埠,但是,無法連線上 localhost 的 44444 埠。

為什麼會出現這個錯誤呢?因為,這個時候我們還沒有啟動 Spark Streaming 應用程式,也就沒有啟動 localhost 的44444 埠,所以,Sink 是無法向這個埠傳送訊息 的。

⑵Spark 的準備工作

Kafka 和 Flume 等高階輸入源,需要依賴獨立的庫(jar 檔案)。按照我們前面安裝好 的 Spark 版本,這些 jar 包都不在裡面,為了證明這一點,我們現在可以測試一下。請開啟 一個新的終端,然後啟動 spark-shell:

啟動成功後,在 spark-shell 中執行下面 import 語句:

你可以看到,馬上會報錯,因為找不到相關的 jar 包。所以,現在我們就需要下載 spark-streaming-flume_2.11-2.1.0.jar,其中2.11表示對應的Scala版本號,2.1.0表示Spark 版本號。

現在請在 Linux 系統中,開啟一個火狐瀏覽器,開啟下方的網址http://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume_2.11/2.1.0,裡面有提供 spark-streaming-flume_2.11-2.1.0.jar 檔案的下載。

下載後的檔案會被預設儲存在當前 Linux 登入使用者的下載目錄下,本教程統一使用 hadoop 使用者名稱登入 Linux 系統,所以,檔案下載後會被儲存到“/home/hadoop/下載”目錄下面。現在,我們在“/usr/local/spark/jars”目錄下新建一個“flume”目錄,就把這個檔案複製到 Spark 目錄的“/usr/local/spark/jars/flume”目錄下。請新開啟一個終端,輸入下面命令:

我們就成功地把 spark-streaming-flume_2.11-2.1.0.jar 檔案拷貝到了 “/usr/local/spark/jars/flume”目錄下。

下面還要繼續把 Flume 安裝目錄的 lib 目錄下的所有 jar 檔案複製到 “/usr/local/spark/jars/flume”目錄下,請在終端中執行下面命令:

這樣,我們就已經準備好了 Spark 環境,它可以支援 Flume 相關程式設計了。

⑶編寫 Spark 程式使用 Flume 資料來源

下面,我們就可以進行程式編寫了。請新開啟一個終端,然後,執行命令建立程式碼目錄:

請在 FlumeEventCount.scala 程式碼檔案中輸入以下程式碼

package org.apache.spark.examples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam
object FlumeEventCount {
 def main(args: Array[String]) {
 if (args.length < 2) {
 System.err.println(
 "Usage: FlumeEventCount <host> <port>")
 System.exit(1)
 }
 StreamingExamples.setStreamingLogLevels()
 val Array(host, IntParam(port)) = args
 val batchInterval = Milliseconds(2000)
 // Create the context and set the batch size
 val sparkConf = new SparkConf().setAppName("FlumeEventCount").setMaster("local
[2]")
 val ssc = new StreamingContext(sparkConf, batchInterval)
 // Create a flume stream
 val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_S
ER_2)
 // Print out the count of events received from this server in each batch
 stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
 ssc.start()
 ssc.awaitTermination()
 }
}

儲存 FlumeEventCount.scala 檔案並退出 vim 編輯器。FlumeEventCount.scala 程式在 編譯後執行時,需要我們提供 host 和 port 兩個引數,程式會對指定的 host 和指定的 port 進行監聽,Milliseconds(2000)設定了時間間隔為 2 秒,所以,該程式每隔 2 秒就會從指定 的埠中獲取由 Flume Sink 發給該埠的訊息,然後進行處理,對訊息進行統計,打印出 “Received 0 flume events.”這樣的資訊。

然後再使用 vim 編輯器新建 StreamingExamples.scala 檔案,輸入如下程式碼,用於控制日誌輸出格式:

package org.apache.spark.examples.streaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.internal.Logging
object StreamingExamples extends Logging {
 /** Set reasonable logging levels for streaming if the user has not configured log4
j. */
 def setStreamingLogLevels() {
 val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
 if (!log4jInitialized) {
 // We first log something to initialize Spark's default logging, then we overri
de the
 // logging level.
 logInfo("Setting log level to [WARN] for streaming example." +
 " To override add a custom log4j.properties to the classpath.")
 Logger.getRootLogger.setLevel(Level.WARN)
 }
 }
}

儲存 StreamingExamples.scala 檔案並退出 vim 編輯器。

這樣,我們在“/usr/local/spark/mycode/flume/src/main/scala”目錄下,就有了如下兩個 程式碼檔案:

FlumeEventCount.scala

StreamingExamples.scala

然後,新建一個 simple.sbt 檔案:

在 simple.sbt 中輸入以下程式碼:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.11" % "2.1.0" 

儲存檔案退出 vim 編輯器。然後執行下面命令,進行打包編譯:

打包成功後,就可以執行程式測試效果了。

⑷測試程式效果

關閉之前開啟的所有終端。首先,請新建第 1 個 Linux 終端,啟動 Spark Streaming 應 用程式,命令如下:

1.cd /usr/local/spark

2. ./bin/spark-submit --driver-class-path /usr/local/spark/jars/*:/usr/local/spark/jars/flume/* --class "org.apache.spark.examples.streaming.FlumeEventCount" /usr/local/spark/mycode/flume/target/scala-2.11/simpleproject_2.11-1.0.jar localhost 44444

通過上面命令,我們為應用程式提供 host 和 port 兩個引數的值分別為 localhost 和 44444,程式會對 localhost 的 44444 埠進行監聽,Milliseconds(2000)設定了時間間隔為 2 秒,所以,該程式每隔 2 秒就會從指定的埠中獲取由 Flume Sink 發給該埠的訊息, 然後進行處理,對訊息進行統計,打印出“Received 0 flume events.”這樣的資訊。

執行該命令後,螢幕上會顯示程式執行的相關資訊,並會每隔 2 秒鐘重新整理一次資訊, 大量資訊中會包含如下重要資訊:

-------------------------------------------

Time: 1488029430000 ms

-------------------------------------------

Received 0 flume events.

因為目前 Flume 還沒有啟動,沒有給 FlumeEventCount 傳送任何訊息,所以 Flume Events 的數量是 0。第 1 個終端不要關閉,讓它一直處於監聽狀態。

現在,我們可以再另外新建第 2 個終端,在這個新的終端中啟動 Flume Agent,命令如 下:

1.cd /usr/local/flume

2.bin/flume-ng agent --conf ./conf --conf-file ./conf/flume-to-spark.conf --name a1 -Dflume.root.logger=INFO,console

啟動 agent 以後,該 agent 就會一直監聽 localhost 的 33333 埠,這樣,我們下面就 可以通過“telnet localhost 33333”命令向 Flume Source 傳送訊息。第 2 個終端也不要關閉, 讓它一直處於監聽狀態。

請另外新建第 3 個終端,執行如下命令:

1.telnet localhost 33333

執行該命令以後,就可以在這個窗口裡面隨便敲入若干個字元和若干個回車,這些訊息都會被 Flume 監聽到,Flume 把訊息採集到以後彙集到 Sink,然後由 Sink 傳送給 Spark 的 FlumeEventCount 程式進行處理。然後,你就可以在執行 FlumeEventCount 的前面那個終端視窗內看到類似如下的統計結果:

-------------------------------------------

Time: 1488029430000 ms

-------------------------------------------

Received 0 flume events.

#這裡省略了其他螢幕資訊

-------------------------------------------

Time: 1488029432000 ms

-------------------------------------------

Received 8 flume events.

#這裡省略了其他螢幕資訊

-------------------------------------------

Time: 1488029434000 ms

-------------------------------------------

Received 21 flume events.

從螢幕資訊中可以看出,我們在 telnet 那個終端內傳送的訊息,都被成功傳送到 Spark 進行處理了。

至此,本實驗順利完成。

實驗結束後,要關閉各個終端,只要切換到該終端視窗,然後 按鍵盤的 Ctrl+C 組合鍵,就可以結束程式執行。