Spark Pipe使用方法(外部程式呼叫方法)
寫在前面:
1、我們使用的是Hadoop2.2.0,Spark 1.0。
2、這裡使用的樣例是經典的求pai程式來演示這個開發過程。
3、我們暫時使用java程式來開發,按照需要後面改用scala來開發。
4、我們使用的IDE是IntelliJ IDEA,採用maven來做專案管理。
一、專案建立
1.1 執行IDE,通過下面命令 ~/idea-IC-133.696/idea.sh
1.2 建立一個maven專案。
1.2 新建的專案新增庫檔案。
1) scala中lib的安裝路徑,如我們的路徑在/usr/share/scala/lib
2) spark的lib檔案,比如我們的檔案在~/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop2.2.0.jar
我們需要在IDE中新增這兩個庫檔案。
1)按ctrl+alt+shift+s快捷鍵,選中global libraries,出現如下視窗,把上面兩個目錄新增進去,最後如下。
二、程式碼編寫
1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership.5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 import org.apache.spark.SparkConf; 18 import org.apache.spark.api.java.JavaRDD; 19 import org.apache.spark.api.java.JavaSparkContext; 20 import org.apache.spark.api.java.function.Function; 21 import org.apache.spark.api.java.function.Function2; 22 import org.apache.spark.util.FloatVector; 23 24 import java.util.ArrayList; 25 import java.util.List; 26 27 public final class GPUPi { 28 29 30 public static void main(String[] args) throws Exception { 31 SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi"); 32 JavaSparkContext jsc = new JavaSparkContext(sparkConf); 33 int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2; 34 int n = slices; 35 int t = 100000000; 36 List<Integer> l = new ArrayList<Integer>(n); 37 for (int i = 0; i < n; i++) { 38 l.add(t); 39 } 40 String s = "./pi " + new Integer(n / slices).toString(); 41 int count = jsc.parallelize(l, slices) 42 .pipe(s) 43 .map( 44 new Function<String, Integer>() { 45 @Override 46 public Integer call(String line) { 47 return Integer.parseInt(line); 48 } 49 } 50 ).reduce(new Function2<Integer, Integer, Integer>() { 51 @Override 52 public Integer call(Integer integer, Integer integer2) { 53 return integer + integer2; 54 } 55 }); 56 System.out.println("Pi is roughly " + 4.0 * count / n / t); 57 } 58 }
這段程式碼通過RDDPipe,呼叫一個外部程式來計算,最後通過reduce+操作,獲得幾個外部程式的計算結果,這樣一個介面,可以使得外部程式完全獨立,和spark不會有太大的關係,甚至可以在外部程式中使用cuda等來加速。
這裡需要說明一下pipe介面,這是因為在spark1.0中,我們依然沒有在example樣例中看到演示這個介面的任何程式碼。pipe接受一個cmd指令,然後在外部執行它,如“./pi"就是執行一個叫pi的可執行檔案,所不同的是,這個外部程式所有的輸入流都是由spark中的RDD傳送給他的,同時,外部程式的輸出,會形成一個新的RDD。
我們對應的c語言程式碼如下:
#include <stdio.h> #include <stdlib.h> #include <time.h> int main(int argc, char *argv[]) { int num = 0, count = 0,t; double z = RAND_MAX; z = z * z; t = atoi(argv[1]); for(int i = 0; i < t; i++){ scanf("%d",&num); for(int j = 0; j < num; j++){ double x = rand(); double y = rand(); if(x * x + y * y <= z){ count++; } } } printf("%d\n",count); return 0; }
三、編譯
由於專案已經採用maven來管理了,這裡也就使用maven來打包了。命令是mvn package,這樣就會在target目錄下生成gpu-1.0-SNAPSHOT.jar檔案。
四、作業提交。
mvn package spark-submit \ --class GPUPi \ --master yarn-cluster \ --executor-memory 2G \ --num-executors 4 \ --files /home/yarn/cuda-workspace/pi/Release/pi \ target/gpu-1.0-SNAPSHOT.jar 4
--files把可執行檔案pi傳送到每一臺機器上面。
--master指定執行的模式,一般都是選yarn-cluster模式,讓spark跑在yarn上面,其他可以參考文件說明。
轉自:http://www.cnblogs.com/zhxfl/p/3792949.html?utm_source=tuicool