spark 指令碼示例
一、封裝spark的處理類
SparkSession:
其為使用者提供了一個統一的切入點來使用Spark的各項功能,並且允許使用者通過它呼叫DataFrame和Dataset相關API來編寫Spark程式。
SparkSession: SparkSession實質上是SQLContext和HiveContext的組合(未來可能還會加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的。
(如果需要支援Hive(HiveContext):enableHiveSupport() )
##建立一個SparkSession
spark=SparkSession.builder\ .master('spark://master:7077')\ .appName("just-test")\ .config("spark.executor.memory", '4g') \ .getOrCreate()
關於配置SparkConf:
from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession def create_sc(): sc_conf = SparkConf() sc_conf.setMaster('spark://master:7077') sc_conf.setAppName('my-app') sc_conf.set('spark.executor.memory', '2g') #executor memory是每個節點上佔用的記憶體。每一個節點可使用記憶體 sc_conf.set("spark.executor.cores", '4') #spark.executor.cores:顧名思義這個引數是用來指定executor的cpu核心個數,分配更多的核心意味著executor併發能力越強,能夠同時執行更多的task sc_conf.set('spark.cores.max', 40) #spark.cores.max:為一個application分配的最大cpu核心數,如果沒有設定這個值預設為spark.deploy.defaultCores sc_conf.set('spark.logConf', True) #當SparkContext啟動時,將有效的SparkConf記錄為INFO。 print(sc_conf.getAll()) sc = SparkContext(conf=sc_conf) return sc
框架圖:
addFile(path, recursive=False)
把檔案分發到叢集中每個worker節點,然後worker會把檔案存放在臨時目錄下,spark的driver和executor可以通過pyspark.SparkFiles.get()方法來獲取檔案的路徑,從而能夠保證driver和每個worker都能正確訪問到檔案。因此,比較適合用於檔案比較小,但是每個worker節點都需要訪問的情況,檔案比較大的情況下網路傳送的消耗時間會比較長。
path:可以是單個本地檔案,HDFS檔案,或者HTTP地址,HTTPS地址,FTP URI。要在spark job中獲取檔案,使用pyspark.SparkFiles.get(filename),通過指定檔名filename獲取檔案路徑。
>>> from pyspark import SparkFiles >>> path = os.path.join(tempdir, "test.txt") >>> sc.addFile(path) >>> res_rdd = sc.textFile(SparkFiles.get(path))
addPyFile(path)
為SparkContext上執行的所有任務增加.py或者.zip檔案依賴。path可以是本地檔案,HDFS檔案,或者HTTP地址,HTTPS地址,FTP URI。
程式示例:
from pyspark.sql import SparkSession from pyspark import HiveContext import os import datetime class sparkTask: def __init__(self, app_name="pickup_scene_order"): self.ss = SparkSession.builder.appName("hankaiming_" + app_name)\ .config("spark.dynamicAllocation.enabled", "true")\ .config("spark.dynamicAllocation.maxExecutors", 150)\ .enableHiveSupport()\ .config("spark.executor.cores", 2)\ .config("spark.executor.memory", "13g")\ .getOrCreate() self._addPyFile() print "current time: %s" % str(datetime.datetime.now()) def getSparkContext(self): return self.ss.sparkContext def getHiveContext(self): return HiveContext(self.getSparkContext()) def getSparkSession(self): return self.ss def _addPyFile(self): current_path = os.getcwd() current_file_name = os.getcwd().split("/")[-1] while current_file_name != "pickup_log_order" : current_path = os.path.abspath(os.path.join(current_path, "..")) print current_path if current_file_name == "": raise Exception("project file name error : %s" % "pickup_log_order") current_file_name = current_path.split("/")[-1] self._sendFilesUnderPath(self.getSparkContext(), current_path) return def _sendFileToSpark(self, sc, path): if path.endswith('.py') or path.endswith('-remote') or path.endswith('.ini'): sc.addPyFile(path) print 'spark add file : %s' % path.split("/", 4)[-1] return def _sendFilesUnderPath(self, sc, root): if os.path.isfile(root): self._sendFileToSpark(sc, root) return if os.path.isdir(root): path_list = os.listdir(root) for path in path_list: if path in ["pickup_recommend", "pickup_recall"]: continue path = os.path.join(root, path) self._sendFilesUnderPath(sc, path) return def stop(self): print "stop time: %s" % str(datetime.datetime.now()) self.getSparkSession().stop()