1. 程式人生 > 實用技巧 >spark 指令碼示例

spark 指令碼示例

一、封裝spark的處理類

SparkSession:

其為使用者提供了一個統一的切入點來使用Spark的各項功能,並且允許使用者通過它呼叫DataFrame和Dataset相關API來編寫Spark程式。

SparkSession: SparkSession實質上是SQLContext和HiveContext的組合(未來可能還會加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的。

(如果需要支援Hive(HiveContext):enableHiveSupport() )

##建立一個SparkSession

spark=SparkSession.builder\
    .master('spark://master:7077')\
    .appName("just-test")\
    .config("spark.executor.memory", '4g') \
    .getOrCreate()

關於配置SparkConf:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession


def create_sc():
    sc_conf = SparkConf()
    sc_conf.setMaster(
'spark://master:7077') sc_conf.setAppName('my-app') sc_conf.set('spark.executor.memory', '2g') #executor memory是每個節點上佔用的記憶體。每一個節點可使用記憶體 sc_conf.set("spark.executor.cores", '4') #spark.executor.cores:顧名思義這個引數是用來指定executor的cpu核心個數,分配更多的核心意味著executor併發能力越強,能夠同時執行更多的task sc_conf.set('spark.cores.max
', 40) #spark.cores.max:為一個application分配的最大cpu核心數,如果沒有設定這個值預設為spark.deploy.defaultCores sc_conf.set('spark.logConf', True) #當SparkContext啟動時,將有效的SparkConf記錄為INFO。 print(sc_conf.getAll()) sc = SparkContext(conf=sc_conf) return sc

框架圖:

addFile(path, recursive=False)

把檔案分發到叢集中每個worker節點,然後worker會把檔案存放在臨時目錄下,spark的driver和executor可以通過pyspark.SparkFiles.get()方法來獲取檔案的路徑,從而能夠保證driver和每個worker都能正確訪問到檔案。因此,比較適合用於檔案比較小,但是每個worker節點都需要訪問的情況,檔案比較大的情況下網路傳送的消耗時間會比較長。

path:可以是單個本地檔案,HDFS檔案,或者HTTP地址,HTTPS地址,FTP URI。要在spark job中獲取檔案,使用pyspark.SparkFiles.get(filename),通過指定檔名filename獲取檔案路徑。

>>> from pyspark import SparkFiles
>>> path = os.path.join(tempdir, "test.txt")
>>> sc.addFile(path)
>>> res_rdd = sc.textFile(SparkFiles.get(path))

addPyFile(path)

為SparkContext上執行的所有任務增加.py或者.zip檔案依賴。path可以是本地檔案,HDFS檔案,或者HTTP地址,HTTPS地址,FTP URI。

程式示例:

from pyspark.sql import SparkSession
from pyspark import HiveContext
import os
import datetime


class sparkTask:
    def __init__(self, app_name="pickup_scene_order"):
        self.ss = SparkSession.builder.appName("hankaiming_" + app_name)\
            .config("spark.dynamicAllocation.enabled", "true")\
            .config("spark.dynamicAllocation.maxExecutors", 150)\
            .enableHiveSupport()\
            .config("spark.executor.cores", 2)\
            .config("spark.executor.memory", "13g")\
            .getOrCreate()
        self._addPyFile()
        print "current time: %s" % str(datetime.datetime.now())

    def getSparkContext(self):
        return self.ss.sparkContext

    def getHiveContext(self):
        return HiveContext(self.getSparkContext())

    def getSparkSession(self):
        return self.ss

    def _addPyFile(self):
        current_path = os.getcwd()
        current_file_name = os.getcwd().split("/")[-1]
        while current_file_name != "pickup_log_order" :
            current_path = os.path.abspath(os.path.join(current_path, ".."))
            print current_path
            if current_file_name == "":
                raise Exception("project file name error : %s" % "pickup_log_order")
            current_file_name = current_path.split("/")[-1]
        self._sendFilesUnderPath(self.getSparkContext(), current_path)
        return

    def _sendFileToSpark(self, sc, path):
        if path.endswith('.py') or path.endswith('-remote') or path.endswith('.ini'):
            sc.addPyFile(path)
            print 'spark add file : %s' % path.split("/", 4)[-1]
        return

    def _sendFilesUnderPath(self, sc, root):
        if os.path.isfile(root):
            self._sendFileToSpark(sc, root)
            return
        if os.path.isdir(root):
            path_list = os.listdir(root)
            for path in path_list:
                if path in ["pickup_recommend", "pickup_recall"]:
                    continue
                path = os.path.join(root, path)
                self._sendFilesUnderPath(sc, path)
        return

    def stop(self):
        print "stop time: %s" % str(datetime.datetime.now())
        self.getSparkSession().stop()