學習進度筆記7
今天完成spark實驗6:Spark Streaming程式設計初級實踐。
1、安裝Flume
Flume 是 Cloudera 提供的一個分散式、可靠、可用的系統,它能夠將不同資料來源的海量 日誌資料進行高效收集、聚合、移動,最後儲存到一箇中心化資料儲存系統中。Flume 的 核心是把資料從資料來源收集過來,再送到目的地。請到 Flume 官網下載 Flume1.7.0 安裝文 件,下載地址如下: http://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz 或者也可以直接到本教程官網的 “ 下 載 專 區 ” 中 的 “ 軟 件 ” 目 錄 中 下 載 apache-flume-1.7.0-bin.tar.gz。 下載後,把 Flume1.7.0 安裝到 Linux 系統的“/usr/local/flume”目錄下,具體安裝和使用方法可以參考教程官網的“實驗指南”欄目中的“日誌採集工具 Flume 的安裝與使用方法”。
2、使用Avro資料來源測試Flume
Avro 可以傳送一個給定的檔案給 Flume,Avro 源使用 AVRO RPC 機制。請對 Flume 的相關配置檔案進行設定,從而可以實現如下功能:在一個終端中新建一個檔案 helloworld.txt(裡面包含一行文字“Hello World),在另外一個終端中啟動 Flume 以後, 可以把 helloworld.txt 中的文字內容顯示出來。
⑴agent 配置檔案
然後,我們在 avro.conf 寫入以下內容
上面 Avro Source 引數說明如下: Avro Source 的別名是 avro,也可以使用完整類別名 稱 org.apache.flume.source.AvroSource,因此,上面有一行設定是 a1.sources.r1.type = avro,表示資料來源的型別是 avro。bind 繫結的 ip 地址或主機名,使用 0.0.0.0 表示繫結機 器所有的介面。a1.sources.r1.bind = 0.0.0.0,就表示繫結機器所有的介面。port 表示繫結 的埠。a1.sources.r1.port = 4141,表示繫結的埠是 4141。a1.sinks.k1.type = logger, 表示 sinks 的型別是 logger。
⑵啟動 flume agent a1
⑶建立指定檔案
先開啟另外一個終端,在/usr/local/flume 下寫入一個檔案 log.00,內容為 hello,world:
啟動日誌控制檯
我們再開啟另外一個終端,執行:
此時我們可以看到第一個終端(agent 視窗)下的顯示,也就是在日誌控制檯,就會把 log.00 檔案的內容打印出來:
avro source 執行成功!
3、使用netcat資料來源測試Flume
請對 Flume 的相關配置檔案進行設定,從而可以實現如下功能:在一個 Linux 終端(這裡稱為“Flume 終端”)中,啟動 Flume,在另一個終端(這裡稱為“Telnet 終端”)中, 輸入命令“telnet localhost 44444”,然後,在 Telnet 終端中輸入任何字元,讓這些字元可以 順利地在 Flume 終端中顯示出來。
⑴建立 agent 配置檔案
在 example.conf 裡寫入以下內容:
1.#example.conf: A single-node Flume configuration 2.# Name the components on this agent 3.a1.sources = r1 4.a1.sinks = k1 5.a1.channels = c1 6.# Describe/configure the source 7.a1.sources.r1.type = netcat 8.a1.sources.r1.bind = localhost 9.a1.sources.r1.port = 44444 10.#同上,記住該埠名 11.# Describe the sink 12.a1.sinks.k1.type = logger 13.# Use a channel which buffers events in memory 14.a1.channels.c1.type = memory 15.a1.channels.c1.capacity = 1000 16.a1.channels.c1.transactionCapacity = 100 17.# Bind the source and sink to the channel 18.a1.sources.r1.channels = c1 19.a1.sinks.k1.channel = c1
⑵啟動 flume agent (即開啟日誌控制檯):
再開啟一個終端,輸入命令:telnet localhost 44444
然後我們可以在終端下輸入任何字元,第一個終端的日誌控制檯也會有相應的顯示,如 我們輸入”hello,world”,得出
第一個終端的日誌控制檯顯示:
netcatsource 執行成功!
這裡補充一點,flume 只能傳遞英文和字元,不能用中文.
4、使用Flume作為Spark Streaming資料來源
Flume 是非常流行的日誌採集系統,可以作為 Spark Streaming 的高階資料來源。請把 Flume Source 設定為 netcat 型別,從終端上不斷給 Flume Source 傳送各種訊息,Flume 把訊息彙集 到 Sink,這裡把 Sink 型別設定為 avro,由 Sink 把訊息推送給 Spark Streaming,由自己編寫的Spark Streaming 應用程式對訊息進行處理。
⑴配置 Flume 資料來源
請登入 Linux 系統,開啟一個終端,執行如下命令新建一個 Flume 配置檔案 flume-to-spark.conf:
flume-to-spark.conf 檔案中寫入如下內容:
#flume-to-spark.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 33333 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = localhost a1.sinks.k1.port =44444 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000000 a1.channels.c1.transactionCapacity = 1000000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
在上面的配置檔案中,我們把 Flume Source 類別設定為 netcat,繫結到 localhost 的 33333 埠,這樣,我們後面就可以通過“telnet localhost 33333”命令向 Flume Source 發 送訊息。
同時,我們把 Flume Sink 類別設定為 avro,繫結到 localhost 的 44444 埠,這樣, Flume Source 把採集到的訊息彙集到 Flume Sink 以後,Sink 會把訊息推送給 localhost 的 44444 埠,而我們編寫的 Spark Streaming 程式一直在監聽 localhost 的 44444 埠,一 旦有訊息到達,就會被 Spark Streaming 應用程式取走進行處理。
特別要強調的是,上述配置檔案完成以後,暫時“不要”啟動 Flume Agent,如果這個時 候使用“flume-ng agent”命令啟動 agent,就會出現錯誤提示“localhost:44444 拒絕連線”,也 就是 Flume Sink 要傳送訊息給 localhost 的 44444 埠,但是,無法連線上 localhost 的 44444 埠。
為什麼會出現這個錯誤呢?因為,這個時候我們還沒有啟動 Spark Streaming 應用程式,也就沒有啟動 localhost 的44444 埠,所以,Sink 是無法向這個埠傳送訊息 的。
⑵Spark 的準備工作
Kafka 和 Flume 等高階輸入源,需要依賴獨立的庫(jar 檔案)。按照我們前面安裝好 的 Spark 版本,這些 jar 包都不在裡面,為了證明這一點,我們現在可以測試一下。請開啟 一個新的終端,然後啟動 spark-shell:
啟動成功後,在 spark-shell 中執行下面 import 語句:
你可以看到,馬上會報錯,因為找不到相關的 jar 包。所以,現在我們就需要下載 spark-streaming-flume_2.11-2.1.0.jar,其中2.11表示對應的Scala版本號,2.1.0表示Spark 版本號。
現在請在 Linux 系統中,開啟一個火狐瀏覽器,開啟下方的網址http://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume_2.11/2.1.0,裡面有提供 spark-streaming-flume_2.11-2.1.0.jar 檔案的下載。
下載後的檔案會被預設儲存在當前 Linux 登入使用者的下載目錄下,本教程統一使用 hadoop 使用者名稱登入 Linux 系統,所以,檔案下載後會被儲存到“/home/hadoop/下載”目錄下面。現在,我們在“/usr/local/spark/jars”目錄下新建一個“flume”目錄,就把這個檔案複製到 Spark 目錄的“/usr/local/spark/jars/flume”目錄下。請新開啟一個終端,輸入下面命令:
我們就成功地把 spark-streaming-flume_2.11-2.1.0.jar 檔案拷貝到了 “/usr/local/spark/jars/flume”目錄下。
下面還要繼續把 Flume 安裝目錄的 lib 目錄下的所有 jar 檔案複製到 “/usr/local/spark/jars/flume”目錄下,請在終端中執行下面命令:
這樣,我們就已經準備好了 Spark 環境,它可以支援 Flume 相關程式設計了。
⑶編寫 Spark 程式使用 Flume 資料來源
下面,我們就可以進行程式編寫了。請新開啟一個終端,然後,執行命令建立程式碼目錄:
請在 FlumeEventCount.scala 程式碼檔案中輸入以下程式碼
package org.apache.spark.examples.streaming import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.flume._ import org.apache.spark.util.IntParam object FlumeEventCount { def main(args: Array[String]) { if (args.length < 2) { System.err.println( "Usage: FlumeEventCount <host> <port>") System.exit(1) } StreamingExamples.setStreamingLogLevels() val Array(host, IntParam(port)) = args val batchInterval = Milliseconds(2000) // Create the context and set the batch size val sparkConf = new SparkConf().setAppName("FlumeEventCount").setMaster("local [2]") val ssc = new StreamingContext(sparkConf, batchInterval) // Create a flume stream val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_S ER_2) // Print out the count of events received from this server in each batch stream.count().map(cnt => "Received " + cnt + " flume events." ).print() ssc.start() ssc.awaitTermination() } }
儲存 FlumeEventCount.scala 檔案並退出 vim 編輯器。FlumeEventCount.scala 程式在 編譯後執行時,需要我們提供 host 和 port 兩個引數,程式會對指定的 host 和指定的 port 進行監聽,Milliseconds(2000)設定了時間間隔為 2 秒,所以,該程式每隔 2 秒就會從指定 的埠中獲取由 Flume Sink 發給該埠的訊息,然後進行處理,對訊息進行統計,打印出 “Received 0 flume events.”這樣的資訊。
然後再使用 vim 編輯器新建 StreamingExamples.scala 檔案,輸入如下程式碼,用於控制日誌輸出格式:
package org.apache.spark.examples.streaming import org.apache.log4j.{Level, Logger} import org.apache.spark.internal.Logging object StreamingExamples extends Logging { /** Set reasonable logging levels for streaming if the user has not configured log4 j. */ def setStreamingLogLevels() { val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements if (!log4jInitialized) { // We first log something to initialize Spark's default logging, then we overri de the // logging level. logInfo("Setting log level to [WARN] for streaming example." + " To override add a custom log4j.properties to the classpath.") Logger.getRootLogger.setLevel(Level.WARN) } } }
儲存 StreamingExamples.scala 檔案並退出 vim 編輯器。
這樣,我們在“/usr/local/spark/mycode/flume/src/main/scala”目錄下,就有了如下兩個 程式碼檔案:
FlumeEventCount.scala
StreamingExamples.scala
然後,新建一個 simple.sbt 檔案:
在 simple.sbt 中輸入以下程式碼:
name := "Simple Project" version := "1.0" scalaVersion := "2.11.8" libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0" libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0" libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.11" % "2.1.0"
儲存檔案退出 vim 編輯器。然後執行下面命令,進行打包編譯:
打包成功後,就可以執行程式測試效果了。
⑷測試程式效果
關閉之前開啟的所有終端。首先,請新建第 1 個 Linux 終端,啟動 Spark Streaming 應 用程式,命令如下:
1.cd /usr/local/spark
2. ./bin/spark-submit --driver-class-path /usr/local/spark/jars/*:/usr/local/spark/jars/flume/* --class "org.apache.spark.examples.streaming.FlumeEventCount" /usr/local/spark/mycode/flume/target/scala-2.11/simpleproject_2.11-1.0.jar localhost 44444
通過上面命令,我們為應用程式提供 host 和 port 兩個引數的值分別為 localhost 和 44444,程式會對 localhost 的 44444 埠進行監聽,Milliseconds(2000)設定了時間間隔為 2 秒,所以,該程式每隔 2 秒就會從指定的埠中獲取由 Flume Sink 發給該埠的訊息, 然後進行處理,對訊息進行統計,打印出“Received 0 flume events.”這樣的資訊。
執行該命令後,螢幕上會顯示程式執行的相關資訊,並會每隔 2 秒鐘重新整理一次資訊, 大量資訊中會包含如下重要資訊:
-------------------------------------------
Time: 1488029430000 ms
-------------------------------------------
Received 0 flume events.
因為目前 Flume 還沒有啟動,沒有給 FlumeEventCount 傳送任何訊息,所以 Flume Events 的數量是 0。第 1 個終端不要關閉,讓它一直處於監聽狀態。
現在,我們可以再另外新建第 2 個終端,在這個新的終端中啟動 Flume Agent,命令如 下:
1.cd /usr/local/flume
2.bin/flume-ng agent --conf ./conf --conf-file ./conf/flume-to-spark.conf --name a1 -Dflume.root.logger=INFO,console
啟動 agent 以後,該 agent 就會一直監聽 localhost 的 33333 埠,這樣,我們下面就 可以通過“telnet localhost 33333”命令向 Flume Source 傳送訊息。第 2 個終端也不要關閉, 讓它一直處於監聽狀態。
請另外新建第 3 個終端,執行如下命令:
1.telnet localhost 33333
執行該命令以後,就可以在這個窗口裡面隨便敲入若干個字元和若干個回車,這些訊息都會被 Flume 監聽到,Flume 把訊息採集到以後彙集到 Sink,然後由 Sink 傳送給 Spark 的 FlumeEventCount 程式進行處理。然後,你就可以在執行 FlumeEventCount 的前面那個終端視窗內看到類似如下的統計結果:
-------------------------------------------
Time: 1488029430000 ms
-------------------------------------------
Received 0 flume events.
#這裡省略了其他螢幕資訊
-------------------------------------------
Time: 1488029432000 ms
-------------------------------------------
Received 8 flume events.
#這裡省略了其他螢幕資訊
-------------------------------------------
Time: 1488029434000 ms
-------------------------------------------
Received 21 flume events.
從螢幕資訊中可以看出,我們在 telnet 那個終端內傳送的訊息,都被成功傳送到 Spark 進行處理了。
至此,本實驗順利完成。
實驗結束後,要關閉各個終端,只要切換到該終端視窗,然後 按鍵盤的 Ctrl+C 組合鍵,就可以結束程式執行。