Spark執行模式(一)-----Spark獨立模式
除了可以在Mesos或者YARN叢集管理器上執行Spark外,Spark還提供了獨立部署模式。你可以通過手動啟動一個master和workers,或者使用提供的指令碼來手動地啟動單獨的叢集模式。你也可以在一臺單獨的機器上啟動這些程序用來測試。
以獨立模式安裝Spark叢集
你只需要將編譯好的Spark版本拷貝到叢集中的每臺節點上。
手動啟動叢集
你可以通過執行以下指令碼單獨模式的master
./sbin/start-master.sh
啟動之後,master將會列印spark://HOST:PORT
。你可以通過它連線workers,或者向SparkContext傳遞"master"引數。你也可以通過master的WEB UI來找到它,webUI的預設埠是8080.下面是webUI中的顯示:
類似地,你可以使用下面的指令碼啟動一個或多個連線到master的worker
./sbin/start-slave.sh <master-spark-URL>
我分別在兩臺機器上啟動worker,並指定master的URL.[email protected]:/usr/local/spark/spark-1.4.1-bin-hadoop2.6/sbin# ./start-slave.sh 192.168.181.128:7077
啟動了worker之後,在master的webUI上就可以看到新的node列表了。
下面是一些可以傳遞給啟動指令碼的引數和含義:
引數 | 含義 |
---|---|
-h HOST --host HOST |
監聽主機名 |
-i HOST , --ip HOST |
監聽主機名 (已過時, 使用 -h或者--host) |
-p PORT , --port PORT |
服務監聽埠 (master預設: 7077, worker是隨機的) |
--webui-port PORT |
web UI埠 (master預設: 8080 ; worker預設 8081) |
-c CORES , --cores CORES |
允許Spark應用程式使用的cpu核心數 (預設:所有cpu核心); 只能對worker設定 |
-m MEM , --memory MEM |
允許Spark應用程式使用的記憶體數量。格式如:1000M or 2G (預設是 1 GB);只能對worker設定 |
-d DIR , --work-dir DIR |
為job輸出日誌劃分的目錄 (預設為: SPARK_HOME/work); 只能對worker設定 |
--properties-file FILE |
Spark配置檔案載入路徑 (預設檔案為: conf/spark-defaults.conf) |
叢集啟動指令碼
要想通過指令碼啟動Spark的獨立模式,你需要在Spark目錄下建立一個conf/slaves檔案。裡面包含所有你希望作為worker啟動的機器的hostname,每臺機器一行。如果這個檔案不存在,啟動指令碼預設只啟動一個機器,即localhost,這樣僅僅用來測試。
注意:master機器通過ssh訪問每臺worker。通過ssh訪問worker是並行進行的,需要設定無密碼訪問(使用私鑰)。如果你不設定無密碼訪問,你可以設定SPARK_SSH_FOREGROUND環境變數,然後一個一個地為worker輸入密碼。
一旦你設定好了這些檔案,你就可以通過指令碼啟動或停止你的叢集,這些指令碼基於hadoop部署指令碼。在SPARK_HOME/bin目錄下:
sbin/start-master.sh
- 在本機上啟動一個master例項。sbin/start-slaves.sh
- 啟動在conf/slaves
檔案中指定的第個worker例項。sbin/start-slave.sh
- 在本機上啟動一個worker例項。sbin/start-all.sh
- 啟動所有的master和workers。sbin/stop-master.sh
- 通過bin/start-master.sh
指令碼停止master。sbin/stop-slaves.sh
-停止conf/slaves
檔案中指定的所有workers。sbin/stop-all.sh
- 停止所有的master和workers。
你還可以通過在conf/spark-env.sh定義環境變數來進一步配置你的叢集。通過conf/spark-env.template模版來建立這個檔案,然後將它拷貝到Spark機器上使其生效。
下面是一些可以設定的配置:
Environment Variable | Meaning |
---|---|
SPARK_MASTER_IP |
Bind the master to a specific IP address, for example a public one. |
SPARK_MASTER_PORT |
Start the master on a different port (default: 7077). |
SPARK_MASTER_WEBUI_PORT |
Port for the master web UI (default: 8080). |
SPARK_MASTER_OPTS |
Configuration properties that apply only to the master in the form "-Dx=y" (default: none). See below for a list of possible options. |
SPARK_LOCAL_DIRS |
Directory to use for "scratch" space in Spark, including map output files and RDDs that get stored on disk. This should be on a fast, local disk in your system. It can also be a comma-separated list of multiple directories on different disks. |
SPARK_WORKER_CORES |
Total number of cores to allow Spark applications to use on the machine (default: all available cores). |
SPARK_WORKER_MEMORY |
Total amount of memory to allow Spark applications to use on the machine, e.g.1000m ,2g (default: total memory minus 1 GB); note that each application'sindividual memory is configured using itsspark.executor.memory
property. |
SPARK_WORKER_PORT |
Start the Spark worker on a specific port (default: random). |
SPARK_WORKER_WEBUI_PORT |
Port for the worker web UI (default: 8081). |
SPARK_WORKER_INSTANCES |
Number of worker instances to run on each machine (default: 1). You can make this more than 1 if you have have very large machines and would like multiple Spark worker processes. If you do set this, make sure to also setSPARK_WORKER_CORES explicitly
to limit the cores per worker, or else each worker will try to use all the cores. |
SPARK_WORKER_DIR |
Directory to run applications in, which will include both logs and scratch space (default: SPARK_HOME/work). |
SPARK_WORKER_OPTS |
Configuration properties that apply only to the worker in the form "-Dx=y" (default: none). See below for a list of possible options. |
SPARK_DAEMON_MEMORY |
Memory to allocate to the Spark master and worker daemons themselves (default: 512m). |
SPARK_DAEMON_JAVA_OPTS |
JVM options for the Spark master and worker daemons themselves in the form "-Dx=y" (default: none). |
SPARK_PUBLIC_DNS |
The public DNS name of the Spark master and workers (default: none). |
這些啟動指令碼還不支援Windows。要在Windows上啟動叢集,只能手動啟動。
上面的SPARK_MASTER_OPTS變數可以設定如下屬性:
Property Name | Default | Meaning |
---|---|---|
spark.deploy.retainedApplications |
200 | The maximum number of completed applications to display. Older applications will be dropped from the UI to maintain this limit. |
spark.deploy.retainedDrivers |
200 | The maximum number of completed drivers to display. Older drivers will be dropped from the UI to maintain this limit. |
spark.deploy.spreadOut |
true | Whether the standalone cluster manager should spread applications out across nodes or try to consolidate them onto as few nodes as possible. Spreading out is usually better for data locality in HDFS, but consolidating is more efficient for compute-intensive workloads. |
spark.deploy.defaultCores |
(infinite) | Default number of cores to give to applications in Spark's standalone mode if they don't setspark.cores.max . If not set, applications always get all available cores unless they configurespark.cores.max themselves. Set this lower
on a shared cluster to prevent users from grabbing the whole cluster by default. |
spark.worker.timeout |
60 | Number of seconds after which the standalone deploy master considers a worker lost if it receives no heartbeats. |
SPARK_WORKER_OPTS變數可以設定如下屬性:
Property Name | Default | Meaning |
---|---|---|
spark.worker.cleanup.enabled |
false | Enable periodic cleanup of worker / application directories. Note that this only affects standalone mode, as YARN works differently. Only the directories of stopped applications are cleaned up. |
spark.worker.cleanup.interval |
1800 (30 minutes) | Controls the interval, in seconds, at which the worker cleans up old application work dirs on the local machine. |
spark.worker.cleanup.appDataTtl |
7 * 24 * 3600 (7 days) | The number of seconds to retain application work directories on each worker. This is a Time To Live and should depend on the amount of available disk space you have. Application logs and jars are downloaded to each application work dir. Over time, the work dirs can quickly fill up disk space, especially if you run jobs very frequently. |
在叢集上執行應用程式:
只需給SparkContext傳遞Master的URL spark://IP:PORT,就可以在叢集上執行應用程式。
要在叢集上執行互動式shell,執行如下命令:
./bin/spark-shell --master spark://IP:PORT
你還可以通過--total-executor-cores <numCores>
選項來控制spark-shell使用的叢集cpu核心數。
啟動Spark應用程式
spark-submit指令碼提供了向叢集提交編譯好的應用程式的簡單方法。對於獨立模式,Spark目前支援2種部署模式。
client-mode:在客戶模式上,driver與提交應用程式的客戶端執行在同一程序中。
cluster-mode:在叢集模式中,driver是從叢集中的一個worker程序中啟動的,這個程序只要完成了提交作業任務就會退出,不會等待提交的應用程式的完成。
如果你通過spark-submit來提交程式,應用程式的jar包會自動地部署到所有的worker nodes。對於應用程式依懶的其它jar包,你需要通過--jars選項來指定,並將jar包以","分隔。形如:--jars jar1,jar2
. 如果你想控制應用程式的執行環境,可以檢視Spark配置。
另外,如果你的應用程式以非0狀態退出,獨立叢集模式支援重啟程式。要支援自動重啟,需要向spark-submit傳遞--supervise
標誌。
如果你想殺掉一個重複失敗的應用程式,你可以使用如下方式:
./bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>
你可以找到driver ID在 Master的 web UI上 http://<master url>:8080
.
你也可以在web UI上通過點選"Kill"終止程式,如下圖所示:
資源排程
獨立模式目前只支援FIFO的排程策略。然而,為了允許多個併發的使用者,你可以控制每個應用程式使用的最大資源。預設情況下,它會獲取叢集中的所有CPU核心,這隻對你僅執行一個應用程式有意義。你可以通過在SparkContext上設定“spark.cores.max”來控制應用程式使用的cpu核心上限。
val conf = new SparkConf()
.setMaster(...)
.setAppName(...)
.set("spark.cores.max", "10")
val sc = new SparkContext(conf)
另外,你可以通過在master上配置spark.deploy.defaultCores
來改變那些沒有設定“”spark.cores.max“”上限的應用程式的預設cpu核心數。這通過在conf/spark-env.sh裡修改實現:
export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>"
這對於共享叢集比較有用,在共享叢集環境中,通常個人最大CPU核心數沒有限制。
監控和日誌
spark的獨立模式為管理叢集提供了基於web的使用者介面。master和每個worker都有自己的webUI來展示叢集和job資料。master的webUI預設為8080埠。可以通過配置檔案或者命令列引數來更改這個埠。
另外,每個job都有詳細的日誌輸出寫到slave節點(預設路徑為SPARK_HOME/work)。你將會看到每個job都有stderr,stdout兩個檔案,這裡麵包含了所有寫到控制檯的輸出資訊。
與hadoop一起執行
你可以將Spark與現存的hadoop叢集一起執行。你只需要在hadoop的機器上以獨立服務啟動Spark。要想通過spark訪問hadoop的資料,你只需要使用一個hdfs:// URL (典型的URL形如:hdfs://<namenode>:9000/path
。你可以在hadoop的webUI上面找到準確的URL。
或者,你也可以安裝完全獨立的spark叢集,然後通過網路訪問hdfs,這將比以本地磁碟方式訪問要慢些。但是,如果你在本地區域網內執行,影響會較小(比如你把把一些spark機器部署在一些hadoop的機架上)。
為網路安全配置埠
spark會頻繁地使用網路,一些使用防火牆配置的場合對環境有嚴格的要求。可以通過安全配置連結檢視完整的埠配置列表。
高可靠性
預設情況下,獨立模式的Spark排程叢集對失敗的任務具有彈性(spark通過把丟失的工作移動到其他worker上來提供彈性)。
然而,只有master一個節點進行排程決策,這就會有單點故障。一旦master宕機,就不能建立新的程式。
我們有兩種高可靠策略來避免這種問題:
1.通過ZooKeeper提供備用Master
概述
利用ZooKeeper提供master選舉和狀態儲存功能。你可以在你的叢集裡啟動多個Master連線到同一個Zookeeper例項。其中的一臺機器會被選舉為master,剩下的將會執行在StandBy模式。如果master宕機,新的master會被選舉出來並恢復狀態,從而繼續提供排程服務。整個恢復過程(從第一個master宕機開始)需要1-2分鐘。注意,這隻會影響恢復過程中提交的新作業,而在宕機之前提交的作業不會受影響。
為了使能這種恢復模式,你可以在spark-env裡通過以下引數設定SPARK_DAEMON_JAVA_OPTS:
系統屬性 | 含義 |
---|---|
spark.deploy.recoveryMode |
設為ZOOKEEPER來使能standby Master恢復模式 (預設: NONE). |
spark.deploy.zookeeper.url |
ZooKeeper叢集的url (e.g., 192.168.1.100:2181,192.168.1.101:2181). |
spark.deploy.zookeeper.dir |
ZooKeeper儲存恢復狀態的路徑 (預設: /spark). |
可能存在的問題:如果你在叢集中有多個Master,但是沒有正確地配置如何使用Zookeeper。所有的Master將不能發現彼此,從而認為自己是leader。這將會導致不健康的狀態(每個Master獨立的進行排程)。
細節
安裝了Zookeeper集群后,使能高可靠性功能。只需簡單地在多臺機器上以相同配置(ZooKeeper URL和目錄)啟動多個Master程序。可以在任何時間新增或刪除Master。
為了在叢集中排程新的程式或者向叢集中新增新的worker,它們都需要知道當前leader的IP地址。你可以像只有單個節點時那樣,傳遞一個Master列表。比如,你可以啟動你的SparkContext指向spark://host1:port1,host2:port2
。Spark會依次嘗試向Master註冊,如果host1失敗,向host2註冊。
向Master註冊和普通操作之間是有區別的,只有在啟動的時候,應用程式和Workers才需要向叢集中的leader註冊。一旦註冊成功,這些狀態就會儲存在系統中(儲存在Zookeeper中)。當發生故障時,新的leader會向所有先前註冊的程式和workers通知leader的變化,所以在啟動時,它們甚至不需要知道新leader的存在。
因為這個特性,新的Master可以在任何時刻建立。你僅僅需要關心新的程式和Workers在新leader產生時,能夠找到並向它註冊。一旦註冊成功,你就不需要在關心了。
2.基於本地檔案系統的單節點恢復
概述
Zookeeper是生產環境中高可靠性的最佳方案。如果你僅僅是想在master宕機後重啟它,檔案系統模式也可以滿足你。當應用程式和Workers註冊時,將足夠的狀態寫到檔案中,這樣它們就能在Master重啟時恢復。
配置
為了使能這種恢復模式,你可以在spark-evn中用如下引數配置SPARK_DAEMON_JAVA_OPTS:
系統屬性 | 含義 |
---|---|
spark.deploy.recoveryMode |
設定為 FILESYSTEM 來使能單節點恢復模式 (預設: NONE). |
spark.deploy.recoveryDirectory |
儲存恢復資料的目錄, Master需要可以訪問. |
細節
這種模式可以與程序監控管理工具比如monitor配合使用,或者僅僅是為了能夠通過重啟來手工恢復。
儘管檔案系統模式看起來比什麼恢復也不做要好。但是對於開發或實現環境來說,也許並不好。通常情況下,通過stop-daemon.sh來殺死master程序並不會清除它的恢復狀態,所以不管你什麼時候再啟動一個新的Master,它都會進入恢復模式。這可能會將啟動時間延長1分鐘,因為它要等待所有先前註冊的Workers/Clients超時。
儘管沒有官方支援,你可以mount一個NFS目錄作為恢復路徑。如果原來的Master宕機的話,你可以在一個不同的節點上啟動一個Master,這樣它就能正確地恢復原來註冊的所有Workers和應用程式(和Zookeeper的恢復模式等價)。後面提交的應用程式為了能夠註冊,必須能夠找到新的Master。