1. 程式人生 > >強者聯盟——Python語言結合Spark框架

強者聯盟——Python語言結合Spark框架

文本 reduce tom 一個數 rst oca b2c war ati

引言:Spark由AMPLab實驗室開發,其本質是基於內存的高速叠代框架,“叠代”是機器學習最大的特點,因此很適合做機器學習。

得益於在數據科學中強大的表現,Python語言的粉絲遍布天下,現在又遇上強大的分布式內存計算框架Spark,兩個領域的強者走到一起,自然能碰出更加強大的火花(Spark能夠翻譯為火花)。因此本文主要講述了PySpark。


本文選自《全棧數據之門》。

全棧框架

  Spark由AMPLab實驗室開發,其本質是基於內存的高速叠代框架,“叠代”是機器學習最大的特點。因此很適合做機器學習。
  框架由Scala語言開發。原生提供4種API,Scala、Java、Python以及近期版本號開始支持的R。Python不是Spark的“親兒子”。在支持上要略差一些,但基本上經常使用的接口都支持。

得益於在數據科學中強大的表現,Python語言的粉絲遍布天下。現在又遇上強大的分布式內存計算框架Spark,兩個領域的強者走到一起。自然能碰出更加強大的火花(Spark能夠翻譯為火花),因此PySpark是本節的主角。
  在Hadoop發行版中,CDH5和HDP2都已經集成了Spark,僅僅是集成的版本號比官方的版本號要略低一些。

當前最新的HDP2.4已經集成了1.6.1(官方最新為2.0),能夠看出。Hortonworks的更新速度很快。緊跟上遊的步伐。
  除Hadoop的Map-Reduce計算框架之外,Spark能異軍突起,並且慢慢地建立自己的全棧生態。那還真得了解下Spark究竟提供了哪些全棧的技術。Spark眼下主要提供了下面6大功能。

  1. Spark Core: RDD及其算子。
  2. Spark-SQL: DataFrame與SQL。
  3. Spark ML(MLlib): 機器學習框架。
  4. Spark Streaming: 實時計算框架。

  5. Spark GraphX: 圖計算框架。
  6. PySpark(SparkR): Spark之上的Python與R框架。

從RDD的離線計算到Streaming的實時計算。從DataFrame及SQL的支持,到MLlib機器學習框架;從GraphX的圖計算到對統計學家最愛的R的支持,能夠看出Spark在構建自己的全棧數據生態。從當前學術界與工業界的反饋來看,Spark也已經做到了。

環境搭建

  是騾子是馬。拉出來遛一遛就知道了。

要嘗試使用Spark是很easy的事情,一臺機器就能夠做測試和開發了。
  訪問站點http://spark.apache.org/downloads.html,下載預編譯好的版本號,解壓即能夠使用。

選擇最新的穩定版本號。註意選擇“Pre-built”開頭的版本號。比方當前最新版本號是1.6.1,通常下載spark-1.6.1-bin-hadoop2.6.tgz文件。文件名稱中帶“-bin-”即是預編譯好的版本號,不須要另外安裝Scala環境。也不須要編譯。直接解壓到某個文件夾就可以。


  如果解壓到文件夾/opt/spark。那麽在$HOME文件夾的.bashrc文件裏加入一個PATH:

技術分享
  記得source一下.bashrc文件,讓環境變量生效:
技術分享
  接著運行命令pyspark或者spark-shell,假設看到了Spark那帥帥的文本Logo和對應的命令行提示符>>>。則說明成功進入交互式界面,即配置成功。


  pyspark與spark-shell都能支持交互式測試。此時便能夠進行測試了。相比於Hadoop來說,基本上是零配置即能夠開始測試。
  spark-shell測試:
技術分享
  pyspark測試:
技術分享

分布式部署

  上面的環境測試成功,證明Spark的開發與測試環境已經配置好了。可是說好的分布式呢?我把別人的庫都拖下來了,就是想嘗試Spark的分布式環境,你就給我看這個啊?
  上面說的是單機的環境部署,可用於開發與測試,僅僅是Spark支持的部署方式的當中一種。這樣的是local方式,優點是用一臺筆記本電腦就能夠執行程序並在上面進行開發。盡管是單機,但有一個很實用的特性。那就是能夠實現多進程。比方8核的機器。僅僅須要執行代碼的時候指定–master local[],就能夠用8個進程的方式執行程序。

代表使用所有CPU核心,也能夠使用如local[4],意為僅僅使用4個核心。
  單機的local模式寫的代碼,僅僅須要做少量的改動就可以執行在分布式環境中。

Spark的分布式部署支持好幾種方式,例如以下所看到的。

  Standalone:本身自帶的集群(方便測試和Spark本身框架的推廣)。
  Mesos:一個新的資源管理框架。
  YARN:Hadoop上新生的資源與計算管理框架,能夠理解為Hadoop的操作系統,
  能夠支持各種不同的計算框架。


  EC2:亞馬遜的機器環境的部署。


  從難易程度上來說。Standalone分布式最簡單。直接把解壓好的包拷貝到各臺機器上去,配置好master文件和slave文件,指示哪臺機器做master。哪些機器做salve。然後在master機器上。通過自帶的腳本啟動集群就可以。
  從使用率上來說。應該是YARN被使用得最多,由於一般是直接使用發行版本號中的Spark集成套件。CDH和HDP中都已經把Spark和YARN集成了,不用特別關註。


  分布式的優勢在於多CPU與更大的內存,從CPU的角度再來看Spark的三種方式。

  • 本機單CPU:“local”。數據文件在本機。
  • 本機多CPU:“local[4]”,數據文件在本機。
  • Standalone集群多CPU:“spark://master-ip:7077”,須要每臺機器都能訪問數據文件。
      
      YARN集群多CPU:使用“yarn-client”提交。須要每臺機器都能訪問到數據文件。
      交互式環境的部署也與上面的部署有關系,直接使用spark-shell或者pyspark是local的方式啟動,假設須要啟動單機多核或者集群模式,須要指定–master參數。例如以下所看到的。

技術分享
  假設使用pyspark,而且習慣了IPython的交互式風格,還能夠加上環境變量來啟動IPython的交互式,或者使用IPython提供的Notebook:

技術分享
  IPython風格例如以下所看到的:

技術分享

演示樣例分析

  環境部署是新手最頭痛的問題,前面環境已經部署好了,接下來才是正題。由於Scala較Python復雜得多,因此先學習使用PySpark來敲代碼。
  Spark有兩個最基礎的概念,sc與RDD。

sc是SparkContext的縮寫,顧名思義,就是Spark上下文語境,sc連接到集群並做對應的參數配置。後面全部的操作都在這個上下文語境中進行,是一切Spark的基礎。在啟動交互式界面的時候,註意有一句提示:

SparkContext available as sc, HiveContext available as sqlContext.

  
  意思是。sc這個變量代表了SparkContext上下文,能夠直接使用,在啟動交互式的時候,已經初始化好了。
假設是非交互式環境。須要在自己的代碼中進行初始化:

技術分享
  RDD是Resilient Distributed Datasets(彈性分布式數據集)的縮寫,是Spark中最基本的數據處理對象。生成RDD的方式有非常多種,當中最基本的一種是通過讀取文件來生成:

技術分享
  讀取joy.txt文件後,就是一個RDD,此時的RDD的內容就是一個字符串。包括了文件的所有內容。
  還記得前面使用Python來編寫的WordCount代碼嗎?通過Hadoop的Streaming接口提到Map-Reduce計算框架上運行。那段代碼可不太好理解,如今簡單的版本號來了。
  WordCount樣例的代碼例如以下所看到的:
技術分享
  在上面的代碼中,我個人喜歡用括號的閉合來進行分行,而不是在行尾加上續行符。
  PySpark中大量使用了匿名函數lambda。由於通常都是很easy的處理。核心代碼解讀例如以下。

  1. flatMap:對lines數據中的每行先選擇map(映射)操作,即以空格切割成一系列單詞形成一個列表。

    然後運行flat(展開)操作。將多行的列表展開,形成一個大列表。此時的數據結構為:[‘one’,’two’,’three’,…]。

  2. map:對列表中的每一個元素生成一個key-value對,當中value為1。

    此時的數據結構為:[(‘one’, 1), (‘two’,1), (‘three’,1),…],當中的’one’、’two’、’three’這種key,可能會出現反復。

  3. reduceByKey:將上面列表中的元素按key同樣的值進行累加,其數據結構為:[(‘one’, 3), (‘two’, 8),
    (‘three’, 1), …],當中’one’, ‘two’,’three’這種key不會出現反復。

最後使用了wc.collect()函數,它告訴Spark須要取出全部wc中的數據,將取出的結果當成一個包括元組的列表來解析。
相比於用Python手動實現的版本號,Spark實現的方式不僅簡單,並且非常優雅。

兩類算子

  Spark的基礎上下文語境為sc,基礎的數據集為RDD,剩下的就是對RDD所做的操作了。
  對RDD所做的操作有transform與action。也稱為RDD的兩個基本算子。


  transform是轉換、變形的意思。即將RDD通過某種形式進行轉換,得到另外一個RDD,比方對列表中的數據使用map轉換。變成另外一個列表。
  當然,Spark能在Hadoop的Map-Reduce模型中脫穎而出的一個重要因素就是其強大的算子。

Spark並沒有強制將其限定為Map和Reduce模型。而是提供了更加強大的變換能力,使得其代碼簡潔而優雅。
  以下列出了一些經常使用的transform。

  • map(): 映射,類似於Python的map函數。

  • filter(): 過濾,類似於Python的filter函數。
  • reduceByKey(): 按key進行合並。
  • groupByKey(): 按key進行聚合。

RDD一個很重要的特性是惰性(Lazy)原則。

在一個RDD上執行一個transform後。並不馬上執行,而是遇到action的時候,才去一層層構建執行的DAG圖。DAG圖也是Spark之所以快的原因。

  • first(): 返回RDD裏面的第一個值。

  • take(n): 從RDD裏面取出前n個值。

  • collect(): 返回所有的RDD元素。

  • sum(): 求和。
  • count(): 求個數。

回到前面的WordCount樣例,程序僅僅有在遇到wc.collect()這個須要取所有數據的action時才運行前面RDD的各種transform,通過構建運行依賴的DAG圖,也保證了運行效率。

map與reduce

  初始的數據為一個列表。列表裏面的每個元素為一個元組,元組包括三個元素。分別代表id、name、age字段。

RDD正是對這種基礎且又復雜的數據結構進行處理。因此能夠使用pprint來打印結果,方便更好地理解數據結構。其代碼例如以下:

技術分享
  parallelize這個算子將一個Python的數據結構序列化成一個RDD,其接受一個列表參數,還支持在序列化的時候將數據分成幾個分區(partition)。分區是Spark執行時的最小粒度結構,多個分區會在集群中進行分布式並行計算。


  使用Python的type方法打印數據類型,可知base為一個RDD。在此RDD之上,使用了一個map算子,將age添加3歲,其它值保持不變。

map是一個高階函數。其接受一個函數作為參數。將函數應用於每個元素之上,返回應用函數用後的新元素。此處使用了匿名函數lambda,其本身接受一個參數v。將age字段v[2]添加3。其它字段原樣返回。從結果來看。返回一個PipelineRDD,其繼承自RDD,能夠簡單理解成是一個新的RDD結構。
  要打印RDD的結構,必須用一個action算子來觸發一個作業。此處使用了collect來獲取其所有的數據。

  接下來的操作,先使用map取出數據中的age字段v[2],接著使用一個reduce算子來計算全部的年齡之和。

reduce的參數依舊為一個函數,此函數必須接受兩個參數,分別去叠代RDD中的元素,從而聚合出結果。效果與Python中的reduce同樣,最後僅僅返回一個元素。此處使用x+y計算其age之和,因此返回為一個數值,運行結果例如以下圖所看到的。

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvYnJvYWR2aWV3MjAwNg==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast" target="_blank" style="color:rgb(0,101,126); outline:0px; background:0px 0px">技術分享

AMPLab的野心

  AMPLab除了最著名的Spark外,他們還希望基於內存構建一套完整的數據分析生態系統,能夠參考https://amplab.cs.berkeley.edu/software/上的介紹。


  他們的目的就是BDAS(Berkeley Data Analytics Stack),基於內存的全棧大數據分析。前面介紹過的Mesos是集群資源管理器。還有Tachyon,是基於內存的分布式文件系統,類似於Hadoop的HDFS文件系統,而Spark Streaming則類似於Storm實時計算。
  強大的全棧式Spark。撐起了大數據的半壁江山。

  本文選自《全棧數據之門》,點此鏈接可在博文視點官網查看此書。
                    

技術分享

  想及時獲得很多其它精彩文章,可在微信中搜索“博文視點”或者掃描下方二維碼並關註。
                         技術分享

強者聯盟——Python語言結合Spark框架