1. 程式人生 > >資料科學中的 Spark 入門

資料科學中的 Spark 入門

Apache Spark 為資料科學提供了許多有價值的工具。隨著 Apache Spark 1.3.1 技術預覽版的釋出,強大的 Data Frame API 也可以在 HDP 上使用資料科學家使用資料探勘和視覺化來幫助構造問題架構並對學習進行微調。Apache Zeppelin 正好能夠幫他們做到這些。

Zeppelin 是一個基於 Web 的 notebook 伺服器。它基於一個直譯器的概念,這個直譯器可以繫結到任何語言或資料處理後端。作為 Zeppelin 後端的一種,Zeppelin 實現了 Spark 直譯器。其他直譯器實現,如 Hive、Markdown、D3 等,也同樣可以在 Zeppelin 中使用。

我們將通過一系列的部落格文章來描述如何結合使用 Zeppelin、Spark SQL 和 MLLib 來使探索性資料科學簡單化。作為這個系列的第一篇文章,我們描述瞭如何為 HDP2.2 安裝/構建 Zeppelin,並揭示一些 Zeppelin 用來做資料探勘的基本功能。

以下假設 HDP 2.2 和 Spark 已經安裝在叢集上。

Spark 可以使用 Ambari 2.0 安裝成一個 service,或者按照這篇文章的描述下載和配置。

無論使用哪種方法安裝,本文將 spark.home 代指 Spark 安裝的根目錄。

構建 Zeppelin

如果可以的話,在一個非 datanode 或 namenode 的叢集節點上構建和執行 Zeppelin。這是為了確保在那個節點上 Zeppelin 有足夠的計算資源。

從 github 獲取 Zeppelin:

12 git clonehttps://github.com/apache/incubator-zeppelin.gitcd incubator-zeppelin

使用如下命令構建 Spark 1.3.1 可用的 Zeppelin:

1 mvn clean install-DskipTests-Pspark-1.3-Dspark.version=1.3.1-Phadoop-2.6-Pyarn

使用如下命令構建 Spark 1.2.1 可用的 Zeppelin:

1 mvn clean install-DskipTests-Pspark-1.2-Phadoop-2.6-Pyarn

在之前的步驟中,Zeppelin、Spark 1.3.1 和 Hadoop 2.6 已經構建好了。現在先確定正在使用的 HDP 的版本:

1 hdp-select status hadoop-client|sed's/hadoop-client - (.*)/1/'

這個命令應該輸出類似這樣的版本號:

1 2.2.4.2-2

將這個引數記為 hdp.version

編輯 conf/zeppelin-env.sh 檔案新增以下幾行:

123 export HADOOP_CONF_DIR=/etc/hadoop/confexport ZEPPELIN_PORT=10008export ZEPPELIN_JAVA_OPTS="-Dhdp.version=$hdp.version"

複製 /etc/hive/conf/hive-site.xmlconf/ 資料夾下。

為執行 Zeppelin(比如 zeppelin)的使用者在 HDFS 上建立一個目錄:

Shell
12 su hdfshdfs dfs-mkdir/user/zeppelin;hdfs dfs-chownzeppelin:hdfs/user/zeppelin>

使用以下命令執行 Zeppelin:

Shell
1 bin/zeppelin-daemon.shstart

這行命令會啟動一個 notebook 伺服器並通過埠 10008 提供一個 Web UI。

開啟 http://$host:10008 訪問 notebooks。點選 Interpreter 標籤切換到 Interpreter 頁面設定一些屬性。

配置Zeppelin

為了在YARN客戶端模式下執行直譯器,需要在 $SPARK_HOME/conf/spark-defaults.conf 重寫以下這些屬性:

12345 master yarn-clientspark.driver.extraJavaOptions-Dhdp.version=$hdp.versionspark.home$spark.homespark.yarn.am.extraJavaOptions-Dhdp.version=$hdp.versionspark.yarn.jar$zeppelin.home/interpreter/spark/zeppelin-spark-0.5.0-SNAPSHOT.jar

一旦這些配置更新,Zeppelin 會彈框提醒重啟直譯器。確認重啟後直譯器會重新載入配置。

至此,準備工作完成,可以開始使用 Zeppelin notebook 了。

開啟 http://$host:10008 你將看到像截圖一樣的介面:

點選 Create new note 來開啟一個新的 notebook。

在Notebook中編寫Scala

在任一 Ambari 管理的叢集上,ambari-agent 日誌都寫在 /var/log/ambari-agent/ambari-agent.log

我們將在 Zeppelin 上寫一點 Scala 程式碼來視覺化這些日誌,從中抽取資訊。

為了能看到這些日誌的內容並隨後處理他們,我們將從這個日誌檔案建立一個 RDD。

Scala
1 valambariLogs=sc.textFile("file:///var/log/ambari-agent/ambari-agent.log")

上面的程式碼將文字檔案的內容連結到一個由變數 ambariLogs 代表的 RDD 上。

為了能更好地看到日誌的內容,使用以下程式碼 dump 幾行文字到直譯器終端看看:

Scala
1 ambariLogs.take(10).mkString("n")

這行程式碼的輸出會像這樣:

使用Spark SQL

為了進一步分析這些日誌,最好將他們與一個 schema 連結起來,並使用 Spark 強大的 SQL 查詢功能。

Spark SQL 有一個強大的功能,就是它能夠以程式設計方式把 schema 連線到一個 Data Source,並對映到 Scala 條件類。Scala 條件類能夠以型別安全的方式操縱和查詢。

對於當前的分析,ambari 日誌的每一行可以認為是由以空格隔開的四個基本元件組成的。

  • 日誌級別(INFO、DEBUG、WARN等)
  • 日期(YYYY-mm-dd)
  • 時間(HH:mm:ss,SSS格式)
  • 檔名

建立一個條件類來連結這個 schema:

Scala
123456789 // sc is an existing SparkContext.valsqlContext=neworg.apache.spark.sql.SQLContext(sc)// this is used to implicitly convert an RDD to a DataFrame.importsqlContext.implicits._// Define the schema using a case class.importjava.sql.DatecaseclassLog(level:String,date:Date,fileName:String)

注意:為了方便,這裡將日期和時間合併到一個 Date 物件裡。

Scala
1234567891011 importjava.text.SimpleDateFormatvaldf=newSimpleDateFormat("yyyy-mm-dd HH:mm:ss,SSS")valambari=ambariLogs.map{line=>vals=line.split(" ")vallogLevel=s(0)valdateTime=df.parse(s(1)+" "+s(2))valfileName=s(3).split(":")(0)Log(logLevel,newDate(dateTime.getTime()),fileName)}.toDF()ambari.registerTempTable("ambari")

初始化一個 dataframe 之後,我們可以使用 SQL 在上面做查詢。Dataframes 是用來接收針對他們而寫的 SQL 查詢,並根據需要將查詢優化成一系列的 Spark 任務。

比如,假設我們想要得到不同日誌級別的事件數量,查詢寫成 SQL 會是這樣的形式:

MySQL
1 SELECTlevel,COUNT(1)fromambariGROUPBYlevel

但是使用Scala Data Frame API 可以寫成:

Scala
1 ambari.groupBy("level").count()

這時,我們可以使用非常接近原生 SQL 的查詢:

Scala
1 sqlContext.sql("SELECT level, COUNT(1) from ambari group by level")

這個查詢返回的資料結構是根 DataFrame API 返回的是相同的。返回的資料結構本身是一個 data frame。

這個時候並沒有任何操作被執行:data frames 上的操作都對映到 RDD 相應的操作(在這個例子中):

Scala
1 RDD.groupBy(...).aggregateByKey(...))

我們可以通過使用 collect() 強制執行這個任務,將結果傳送到 driver 的記憶體中。

使用 Zeppelin 做視覺化

Zeppelin Notebook 有一個強大的功能,那就是你可以在同一個框架裡看到上一個片段的結果集。Zeppelin 的顯示系統接通了標準輸出。

任何以 %table、%img、%html 等直譯器命令為開頭,通過println輸出到標準輸出的字串,都可以被 Zeppelin 的顯示系統所解析。

在我們的例子中,我們想要將每種日誌級別的日誌個數輸出成一個表,所以使用以下程式碼:

Scala
123456 importorg.apache.spark.sql.Rowvalresult=sqlContext.sql("SELECT level, COUNT(1) from ambari group by level").map{caseRow(level:String,count:Long)=>{level+"t"+count}}.collect()

這段程式碼將 groupby 的輸出整合成表直譯器可以渲染的格式。

%table 要求每行資料都以 n(換行符)分隔,每一列均以 t(製表符)分開,如下所示:

Scala
1 println("%table Log LeveltCountn"+result.mkString("n"))

通過這行程式碼打印出來的結果會是:

總結

資料科學家們使用許多種工具進行工作。Zeppelin 為他們提供了一個新工具來構建出更好的問題。在下一篇文章中,我們將深入討論一個具體的資料科學問題,並展示如何使用 Zeppelin、Spark SQL 和 MLLib 來建立一個使用 HDP、Spark 和 Zeppelin 的資料科學專案。