1. 程式人生 > 實用技巧 >spark安裝與使用(入門)

spark安裝與使用(入門)

一:在linux下安裝java環境(自行安裝jdk)

二:安裝Scala2.9.3

$ tar -zxf scala-2.9.3.tgz
$ sudo mv scala-2.9.3 /usr/lib
$ sudo vim /etc/profile
# add the following lines at the end
export SCALA_HOME=/usr/lib/scala-2.9.3
export PATH=$PATH:$SCALA_HOME/bin
# save and exit vim
#make the bash profile take effect immediately
source 
/etc/profile # test $ scala -version

三:安裝spark

從官網下載最新版本的spark,截止目前最新版的是1.5.1.下載地址:http://spark.apache.org/downloads.html

記住選擇預編譯好的檔案下載,選擇Pre-build for Hadoop 2.6 and later,下載的檔案為spark-1.5.1-bin-hadoop2.6.tgz

解壓

$ tar -zxf spark-1.5.1-bin-hadoop2.6.tgz

設定SPARK_EXAMPLES_JAR 環境變數

$ vim ~/.bash_profile
# add the following lines at the end
export SPARK_EXAMPLES_JAR
=$HOME/spark-0.7.2/examples/target/scala-2.9.3/spark-examples_2.9.3-0.7.2.jar # save and exit vim #make the bash profile take effect immediately $ source /etc/profile

這一步其實最關鍵,很不幸的是,官方文件和網上的部落格,都沒有提及這一點。我是偶然看到了這兩篇帖子,Running SparkPi,Null pointer exception when running ./run spark.examples.SparkPi local,才補上了這一步,之前死活都無法執行SparkPi。

可選)設定 SPARK_HOME環境變數,並將SPARK_HOME/bin加入PATH

可選)設定 SPARK_HOME環境變數,並將SPARK_HOME/bin加入PATH

$ vim ~/.bash_profile
# add the following lines at the end
export SPARK_HOME=$HOME/spark-0.7.2
export PATH=$PATH:$SPARK_HOME/bin
# save and exit vim
#make the bash profile take effect immediately
$ source /etc/profile

四:Spark配置

配置Spark環境變數

cd $SPARK_HOME/conf
cp spark-env.sh.template spark-env.sh

vi spark-env.sh 新增以下內容:

export JAVA_HOME=/usr/local/java-1.7.0
export HADOOP_HOME=/opt/hadoop-2.3.0-cdh5.0.0
export HADOOP_CONF_DIR=/etc/hadoop/conf
export SCALA_HOME=/usr/local/scala-2.11.4
export SPARK_HOME=/home/lxw1234/spark-1.3.1-bin-hadoop2.3
export SPARK_MASTER_IP=127.0.0.1
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_WEBUI_PORT=8099
 
export SPARK_WORKER_CORES=3 //每個Worker使用的CPU核數
export SPARK_WORKER_INSTANCES=1 //每個Slave中啟動幾個Worker例項
export SPARK_WORKER_MEMORY=10G //每個Worker使用多大的記憶體
export SPARK_WORKER_WEBUI_PORT=8081 //Worker的WebUI埠號
export SPARK_EXECUTOR_CORES=1 //每個Executor使用使用的核數
export SPARK_EXECUTOR_MEMORY=1G //每個Executor使用的記憶體
 
export SPARK_CLASSPATH=/opt/hadoop-lzo/current/hadoop-lzo.jar //由於要用到lzo,因此需要配置
export SPARK_CLASSPATH=$SPARK_CLASSPATH:$CLASSPATH
export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$HADOOP_HOME/lib/native
  • 配置Slave

cp slaves.template slaves
vi slaves 新增以下內容:
localhost

五、配置免密碼ssh登陸

因為Master和Slave處於一臺機器,因此配置本機到本機的免密碼ssh登陸,如有其他Slave,都需要配置Master到Slave的無密碼ssh登陸。

cd ~/
ssh-keygen (一路回車)
cd .ssh/
cat id_rsa.pub >> authorized_keys
chmod 600 authorized_keys

六、啟動Spark Master

cd $SPARK_HOME/sbin/
./start-master.sh

啟動日誌位於 $SPARK_HOME/logs/目錄下,正常啟動的日誌如下:

15/06/05 14:54:16 INFO server.AbstractConnector: Started SelectChannelConnector@localhost:6066
15/06/05 14:54:16 INFO util.Utils: Successfully started service on port 6066.
15/06/05 14:54:16 INFO rest.StandaloneRestServer: Started REST server for submitting applications on port 6066
15/06/05 14:54:16 INFO master.Master: Starting Spark master at spark://127.0.0.1:7077
15/06/05 14:54:16 INFO master.Master: Running Spark version 1.3.1
15/06/05 14:54:16 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/06/05 14:54:16 INFO server.AbstractConnector: Started [email protected]:8099
15/06/05 14:54:16 INFO util.Utils: Successfully started service ‘MasterUI’ on port 8099.
15/06/05 14:54:16 INFO ui.MasterWebUI: Started MasterWebUI at http://127.1.1.1:8099
15/06/05 14:54:16 INFO master.Master: I have been elected leader! New state: ALIVE

七、啟動Spark Slave

cd $SPARK_HOME/sbin/
./start-slaves.sh


會根據$SPARK_HOME/conf/slaves檔案中配置的主機,逐個ssh過去,啟動Spark Worker

成功啟動後,在WebUI介面上可以看到,已經有Worker註冊上來了,如圖:


在瀏覽器輸入:http://192.168.1.84:8080/ (前面為master的ip地址)

八、簡單小例項(統計檔案中出現最多的50個單詞)

在bin目錄下直接執行./spark-shell
hadoop@Master:/usr/local/spark-1.5.1-bin-hadoop2.6/bin$ ./spark-shell
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.5.1
      /_/

Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_79)
Type in expressions to have them evaluated.
Type :help for more information.
15/10/13 19:12:16 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
Spark context available as sc.
15/10/13 19:12:18 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
15/10/13 19:12:19 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
15/10/13 19:12:35 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
15/10/13 19:12:35 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
15/10/13 19:12:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/10/13 19:12:39 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
15/10/13 19:12:39 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
SQL context available as sqlContext.


沒注意這麼多warn是怎麼回事,接著進入spark-shell,依次輸入:

varsrcFile=sc.textFile("/usr/local/kern.log")

vara=srcFile.flatMap(line=>line.split(" ")).map(word=>(word,1)).reduceByKey(_+_)

a.map(word=>(word._2,word._1)).sortByKey(false).map(word=>(word._2,word._1)).take(50).foreach(println)

結果列印在終端:

在4040埠可檢視job的情況http://192.168.1.84:4040/jobs/

八、Spark Java programming (Spark and Spark Streaming)

1:spark批處理:統計一個檔案中出現a和出現b的單詞數:SimpleApp.java
package org.apache.eagle.spark_streaming_kafka;
 
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
 
public class SimpleApp {
 
    public static void main(String[] args) {
        String logFile = "/var/log/boot.log"; // Should be some file on your system  
        SparkConf conf = new SparkConf().setAppName("Simple Application");  
        JavaSparkContext sc = new JavaSparkContext(conf);  
        JavaRDD<String> logData = sc.textFile(logFile).cache();  
      
        long numAs = logData.filter(new Function<String, Boolean>() {  
          /**
             * 
             */
            private static final long serialVersionUID = 1L;
 
        public Boolean call(String s) { return s.contains("a"); }  
        }).count();  
      
        long numBs = logData.filter(new Function<String, Boolean>() {  
            
      
        public Boolean call(String s) { return s.contains("b"); }  
        }).count();  
      
        System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);  
 
    }
 
}

2:Spark Streaming, 讀取kafka資料做單詞統計。

package org.apache.eagle.spark_streaming_kafka;
 
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
 
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
 
import com.google.common.collect.Lists;
 
import scala.Tuple2;
 
 
/**
 * spark-streaming-kafka
 *
 */
public class JavaKafkaWordCount 
{
    private static final Pattern SPACE = Pattern.compile(" ");
 
      private JavaKafkaWordCount() {
      }
      
    public static void main( String[] args )
    {
        
        String zkQuorum = "10.64.255.161";  
        String group = "test-consumer-group";  
        SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
        // Create the context with 2 seconds batch size
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
        Map<String, Integer> topicMap = new HashMap<String, Integer>();
        topicMap.put("noise",1);
        JavaPairReceiverInputDStream<String, String> messages =
                KafkaUtils.createStream(jssc, zkQuorum, group, topicMap);;
        
        JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
            public String call(Tuple2<String, String> tuple2) {
              return tuple2._2();
            }
          });
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            public Iterable<String> call(String x) {
              return Lists.newArrayList(SPACE.split(x));
            }
          });
 
          JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
            new PairFunction<String, String, Integer>() {
              public Tuple2<String, Integer> call(String s) {
                return new Tuple2<String, Integer>(s, 1);
              }
            }).reduceByKey(new Function2<Integer, Integer, Integer>() {
              public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
              }
            });
 
          wordCounts.print();
          jssc.start();
          jssc.awaitTermination();
    }
}
注意幾點: 1:環境:要確保spark在本機中正確安裝,安裝步驟如上所述。zookeeper叢集和kafka叢集要安裝好,kafka的topic要新建好。 2:之前執行遇到找不到jar的情況(kafkaUtil),原因沒有把所有依賴的jar包都打包到最終的jar包裡去。應在pom.xml中新增一下:
<build>
    <sourceDirectory>src/main/java</sourceDirectory>
    <testSourceDirectory>src/test/java</testSourceDirectory>
    <plugins>
      <!--
                   Bind the maven-assembly-plugin to the package phase
        this will create a jar file without the storm dependencies
        suitable for deployment to a cluster.
       -->
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
          <archive>
            <manifest>
              <mainClass>org.apache.eagle.spark_streaming_kafka.JavaKafkaWordCount</mainClass>
            </manifest>
          </archive>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
</build> 
將所需的jar包一同打包,所以生成的檔案會很大。 3:如何提交任務?spark和spark streaming提交的方式都一樣,用$SPARK_HOME/bin/soark-submit指令碼提交,進入bin目錄下, 以下是spark streaming任務提交,具體如下:
./spark-submit  --master local[8] /home/zqin/workspace/spark-streaming-kafka/target/spark-streaming-kafka-0.0.1-SNAPSHOT-jar-with-dependencies.jar

由於在pom.xml中指明瞭入口類,因此不用加--class,如果沒有指明,在命令中要用--class 指明入口。 以下是spark任務提交:
./spark-submit  --class org.apache.eagle.spark_streaming_kafka.SimpleApp --master local[8] /home/zqin/workspace/spark-streaming-kafka/target/spark-streaming-kafka-0.0.1-SNAPSHOT-jar-with-dependencies.jar
需要指明程式main入口。 4:在執行spark streaming時,控制檯滿屏日誌,不好檢視結果,在Spark的conf目錄下,把log4j.properties.template修改為log4j.properties,把log4j.rootCategory=INFO, console改為log4j.rootCategory=WARN, console即可抑制Spark把INFO級別的日誌打到控制檯上。如果要顯示全面的資訊,則把INFO改為DEBUG。

九、關閉spark

在spark目錄下輸入:sbin/stop-all.sh