Spark Java程式案例入門
local(本地模式):常用於本地開發測試,本地還分為local單執行緒和local-cluster多執行緒
standalone(叢集模式):典型的Mater/slave模式,不過也能看出Master是有單點故障的;Spark支援ZooKeeper來實現 HA
on yarn(叢集模式): 執行在 yarn 資源管理器框架之上,由 yarn 負責資源管理,Spark 負責任務排程和計算
on mesos(叢集模式): 執行在 mesos 資源管理器框架之上,由 mesos 負責資源管理,Spark 負責任務排程和計算
on cloud(叢集模式):比如 AWS 的 EC2,使用這個模式能很方便的訪問 Amazon的 S3;Spark 支援多種分散式儲存系統:HDFS 和 S3
安裝:
1.spark standalone模式 需要hadoop 的HDFS作為持久層
jdk1.6以上
安裝hadoop叢集請參考:http://blog.csdn.net/m0_37739193/article/details/71222673
2.安裝scala(三臺都要安裝)
[[email protected] ~]$ tar -zxvf scala-2.10.6.tgz
[[email protected] ~]$ tar -zxvf scala-2.10.6.tgz
[[email protected] ~]$ tar -zxvf scala-2.10.6.tgz
3.安裝spark
[
[[email protected] ~]$ vi .bash_profile
[[email protected] ~]$ source .bash_profile[[email protected] ~]$ vi .bash_profile export SPARK_HOME=/home/hadoop/spark-1.3.1-bin-hadoop2.6 export SCALA_HOME=/home/hadoop/scala-2.10.6 export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin:$SCALA_HOME/bin
4.配置spark的 configuration檔案
[[email protected] ~]$ cd spark-1.1.0/conf
[[email protected] conf]$ cp spark-env.sh.template spark-env.sh
[[email protected] conf]$ vi spark-env.sh
新增:
export JAVA_HOME=/usr/jdk1.7.0_25/
export SPARK_MASTER_IP=h40
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_CORES=1
export SPARK_WORKER_INSTANCES=1
#在spark-1.6.0-bin-hadoop2.6和spark-1.5.0-cdh5.5.2版本中為export SPARK_EXECUTOR_INSTANCES=1
export SPARK_WORKER_MEMORY=1g
5.配置slaves
[[email protected] conf]$ vi slaves
h41
h42
6.同步到其他節點
[[email protected] ~]$ scp -r spark-1.1.0 h41:/home/hadoop
[[email protected] ~]$ scp -r spark-1.1.0 h42:/home/hadoop
7.啟動spark
[[email protected] spark-1.3.1-bin-hadoop2.6]$ sbin/start-all.sh
8.驗證
[[email protected] ~]$ jps
主節點有 master程序
8861 Master
[[email protected] ~]$ jps
[[email protected] ~]$ jps
從節點有 Worker程序
8993 Worker
案例一:(spark-1.3.1-bin-hadoop2.6版本自帶的WordCount例子)
[[email protected] examples]$ pwd
/home/hadoop/spark-1.3.1-bin-hadoop2.6/examples/src/main/java/org/apache/spark/examples
[[email protected] examples]$ ls
JavaHdfsLR.java JavaLogQuery.java JavaPageRank.java JavaSparkPi.java JavaStatusTrackerDemo.java JavaTC.java JavaWordCount.java ml mllib sql streaming
[[email protected] examples]$ cat JavaWordCount.java
package org.apache.spark.examples;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
public final class JavaWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Usage: JavaWordCount <file>");
System.exit(1);
}
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = ctx.textFile(args[0], 1);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) {
return Arrays.asList(SPACE.split(s));
}
});
JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}
ctx.stop();
}
}
在hdfs中建立相應檔案:
[[email protected] ~]$ vi hehe.txt
hello world
hello hadoop
hello hive
[[email protected] ~]$ hadoop fs -mkdir /spark
[[email protected] ~]$ hadoop fs -put hehe.txt /spark
然後進入spark的家目錄執行命令:
[[email protected] spark-1.3.1-bin-hadoop2.6]$ bin/spark-submit --master spark://h40:7077 --name JavaWordCountByHQ --class org.apache.spark.examples.JavaWordCount --executor-memory 500m --total-executor-cores 2 lib/spark-examples-1.3.1-hadoop2.6.0.jar hdfs://h40:9000/spark/hehe.txt
。。。(輸出內容太多省略)
hive: 1
hadoop: 1
hello: 3
world: 1
。。。
(這個案例感覺和執行hadoop的mapreduce似的,這個也是離線處理)
案例二(spark streaming):
本來spark也自帶了例子,但我沒有成功,按官方步驟先開啟nc -lk 9999端,再在spark的家目錄下執行bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount h40 9999無法達到想要的結果,一直顯示如下:
17/06/20 21:23:58 INFO dstream.SocketReceiver: Connected to h40:9999
17/06/20 21:23:59 INFO scheduler.JobScheduler: Added jobs for time 1497965039000 ms
17/06/20 21:24:00 INFO scheduler.JobScheduler: Added jobs for time 1497965040000 ms
17/06/20 21:24:01 INFO scheduler.JobScheduler: Added jobs for time 1497965041000 ms
17/06/20 21:24:02 INFO scheduler.JobScheduler: Added jobs for time 1497965042000 ms
17/06/20 21:24:03 INFO scheduler.JobScheduler: Added jobs for time 1497965043000 ms
17/06/20 21:24:04 INFO scheduler.JobScheduler: Added jobs for time 1497965044000 ms
17/06/20 21:24:05 INFO scheduler.JobScheduler: Added jobs for time 1497965045000 ms
17/06/20 21:24:06 INFO scheduler.JobScheduler: Added jobs for time 1497965046000 ms
17/06/20 21:24:07 INFO scheduler.JobScheduler: Added jobs for time 1497965047000 ms
[[email protected] streaming]$ ls
JavaCustomReceiver.java JavaFlumeEventCount.java JavaNetworkWordCount.java JavaQueueStream.java JavaRecoverableNetworkWordCount.java JavaStatefulNetworkWordCount.java
[[email protected] streaming]$ pwd
/home/hadoop/spark-1.3.1-bin-hadoop2.6/examples/src/main/java/org/apache/spark/examples/streaming
[[email protected] streaming]$ cat JavaNetworkWordCount.java
package org.apache.spark.examples.streaming;
import scala.Tuple2;
import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import java.util.regex.Pattern;
/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
*
* Usage: JavaNetworkWordCount <hostname> <port>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999`
*/
public final class JavaNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) {
if (args.length < 2) {
System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
System.exit(1);
}
StreamingExamples.setStreamingLogLevels();
// Create the context with a 1 second batch size
SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
// Create a JavaReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
return Lists.newArrayList(SPACE.split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
ssc.start();
ssc.awaitTermination();
}
}
然後我將該程式碼修改了下才成功:
package org.apache.spark.examples.streaming;
import java.util.regex.Pattern;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.examples.streaming.StreamingExamples;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import com.google.common.collect.Lists;
public final class JavaNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) {
StreamingExamples.setStreamingLogLevels();
SparkConf sparkConf = new SparkConf().setAppName("wordcount").setMaster("local[2]");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
JavaReceiverInputDStream<String> lines = ssc.socketTextStream("h40", 9999);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
return Lists.newArrayList(SPACE.split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
ssc.start();
ssc.awaitTermination();
}
}
開啟一個終端,輸入 命令 nc -lk 9999,然後在myeclipse中將所需的jar包匯入,直接執行該程式:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/Users/huiqiang/Desktop/%e6%96%b0%e5%bb%ba%e6%96%87%e4%bb%b6%e5%a4%b9/spark-1.3.1-bin-hadoop2.6/spark-assembly-1.3.1-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/huiqiang/Desktop/%e6%96%b0%e5%bb%ba%e6%96%87%e4%bb%b6%e5%a4%b9/spark-1.3.1-bin-hadoop2.6/spark-examples-1.3.1-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
17/07/10 09:18:56 INFO StreamingExamples: Setting log level to [WARN] for streaming example. To override add a custom log4j.properties to the classpath.
-------------------------------------------
Time: 1499649550000 ms
-------------------------------------------
-------------------------------------------
Time: 1499649560000 ms
-------------------------------------------
-------------------------------------------
Time: 1499649570000 ms
-------------------------------------------
。。。。。。。。。。。。。。。
然後你在9999埠輸入資料則myeclipse的控制檯中會打印出統計結果。
注意:
1.如果你想在Linux本地中執行該程式的話,需要將程式碼中的StreamingExamples.setStreamingLogLevels();這行程式碼刪除掉,再用myeclipse打成streaming.jar包上傳到Linux本地中去,再執行[[email protected] spark-1.3.1-bin-hadoop2.6]$ bin/spark-submit --class org.apache.spark.examples.streaming.JavaNetworkWordCount streaming.jar
2.import com.google.common.collect.Lists;這個需要google-collections-1.0.jar這個jar包,可是解壓的spark-1.3.1-bin-hadoop2.6.tgz並沒有這個jar包,我從網上找了一個,如果你需要的話,可以去這裡下載:http://download.csdn.net/detail/m0_37739193/9893632
這裡有個怪現象我也不知道是什麼原因造成的:當在Linux本地執行上面的程式的時候,雖然spark的lib目錄下沒有google-collections-1.0.jar,但在spark-1.3.1-bin-hadoop2.6
中可以正常執行,在spark-1.6.3-bin-hadoop2.6中卻報Caused by: java.lang.NoClassDefFoundError: com/google/common/collect/Lists和Caused by: java.lang.ClassNotFoundException: com.google.common.collect.Lists,即使我把google-collections-1.0.jar上傳到spark-1.6.3-bin-hadoop2.6的lib目錄下並在.bash_profile的export CLASSPATH中添加了這個lib目錄,結果還是報上面的錯誤。我以為將google-collections-1.0.jar上傳到spark的lib目錄下並將該目錄新增到.bash_profil的export CLASSPATH中就不會報錯了哈,但在spark-1.3.1-bin-hadoop2.6和spark-1.6.3-bin-hadoop2.6所表現的狀態完全不是我以為的啊,我目前來說不是很明白。
參考:
http://blog.csdn.net/bluejoe2000/article/details/41556979
http://blog.csdn.net/huyangshu87/article/details/52288662
相關推薦
Spark Java程式案例入門
spark 安裝模式: local(本地模式):常用於本地開發測試,本地還分為local單執行緒和local-cluster多執行緒 standalone(叢集模式):典型的Mater/slave模式,不過也能看出Master是有單點故障的;Spark支援ZooKeeper
java程式設計師菜鳥進階(十七)linux基礎入門(五)linux檔案/目錄的許可權和歸屬管理
在linux中的每一個檔案或目錄都包含有訪問許可權,這些訪問許可權決定了誰能訪問和如何訪問這些檔案和目錄。相應的每一個檔案和目錄都有所屬的屬主和屬組,合理的設定檔案和目錄的屬組和屬主在檔案/目錄管理中佔據著很重要的地位,所以,今天我就和大家一起來看一下有關檔案/目錄的許可權和歸屬的相關設定
java程式設計師菜鳥進階(十六)linux基礎入門(四)linux下VIM文字編輯器使用
linux下編寫配置檔案最好的編輯工具莫過於vim了。Vim的功能實在太多太全,Vim的很多功能也許我們很少用得到,真正為大家常用的功能可能只佔到所有功能的冰山一角。Vim終歸只是一個編寫程式碼或編輯文件的工具,所以只要掌握一些足夠我們使用的功能即可。 做個廣告
java程式設計師菜鳥進階(十五)linux基礎入門(三)linux使用者和組管理
我們大家都知道,要登入linux作業系統,我們必須要有一個使用者名稱和密碼。每一個使用者都由一個惟一的身份來標識,這個標識叫做使用者ID.系統中的每一個使用者也至少需要屬於一個"使用者分組".同樣,使用者分組也是由一個惟一的身份來標識的,該標識叫做使用者分組ID(GID).每位使用者的許可
java程式設計師菜鳥進階(十四)linux基礎入門(二)linux檔案及目錄命令管理
大家都知道,熟悉命令操作的開發人員,Linux作業系統命令操作效率要高於圖形介面的操作,所以瞭解和學習linux基本命令操作是學習linux作業系統的首要任務,本文主要介紹以下四個知識點: 1. She
java程式設計師菜鳥進階(十三)linux基礎入門(一)vmvare下安裝linux RedHat圖解(超詳細篇)
對於linux,我從大二就想學習一下,但一直苦於無從下手,所以一直拖到現在,鑑於筆者瞭解很多人在linux入門的困難在何處,所以我認為本套入門基礎文章還是挺適合想學習linux的朋友,本系列文章大約十篇文章左右,近期會不斷更新下來,沒有linux基礎但又想學習linux的朋友可以關注一下本系列
MyCat入門+JAVA程式連線
本文章主要對mycat的配置檔案註釋說明,詳細的安裝使用請戳這個地址:mycat安裝使用 連線mycat時,將原先連線mysql的埠和資料庫改為mycat的埠8066,和mycat的邏輯資料庫TESTDB,使用者名稱和密碼為server.xml表裡設定的user。 jdbc.url
Java程式設計師的職業生涯學習建議——基礎入門篇
這部分主要適用於尚未做過Java工作的同學,包括一些在校生以及剛準備轉行Java的同學。 一、Java基礎 首先去找一個Java的基礎教程學一下(教材或者網路視訊)。 學習Java基礎的時候,應該儘量多動手,很多時候,你想當然的事情,等你寫出來執行一下,你就會發現不是這麼回事兒,不
java+selenium的入門 案例 selenium包 谷歌驅動包 火狐驅動包 IE驅動包 (一)
目錄 前言 selenium是什麼? Selenium的下載使用 Selenium下載 Selenium下載地址 Selenium之谷歌驅動包 chromedriver包下載地址 Selenium使用 前言 在學習selenium之前,要了解selen
MyBatis入門程式案例
mybatis下載 mybaits的程式碼由github.com管理,地址:https://github.com/mybatis/mybatis-3/releases。 可從該地址下載mybatis最新框架。 下載之後解壓縮: 案例需求 1 根據使用者id查詢y一個使
微信小程式從入門第一講 java&&jFinal 小程式登陸功能實現以及獲取唯一標識openid和unionid
在我以前的一篇部落格中講到了獲取openid的方法,這裡就不做過多的講解java獲取微信小程式openid。這裡主要講解微信小程式登陸的實現和獲取unionid。需要提醒的是我後端框架使用的是jFinal,傳值方式跟spring的那套有些許的差別。 1、首先要知道微信小程式的開發本身就是基於
一個程式完全入門Java多執行緒
程式碼只供學習使用,實際開發中建議遵守Java開發規範,合理分包 程式碼所涉及知識點: 什麼是執行緒、Thread方法和Runnable介面的介紹及建立執行緒、執行緒的狀態和生命週期、sleep方法和join方法的使用、執行緒的優先順序、執行緒同步、執行緒間通訊(見另一篇文章
switch語句與三種迴圈語句,JAVA程式設計師程式設計新手入門基礎學習筆記
Java是一種可以撰寫跨平臺應用軟體的面向物件的程式設計語言。Java 技術具有卓越的通用性、高效性、平臺移植性和安全性,廣泛應用於PC、資料中心、遊戲控制檯、科學超級計算機、行動電話和網際網路,同時擁有全球最大的開發者專業社群。 自己整理了-份201 8最全面前端學習資料,從最基礎的HTML+
IDEA執行spark相關程式報陣列越界異常java.lang.ArrayIndexOutOfBoundsException: 10582
筆者執行環境: Win10 + IDEA + spark2.4 + JDK8 程式執行到 sc.textFile("E:/tmp/test.txt"); 報了陣列越界異常,經檢查是paranamer造成的(網上有同行說JDK8得使用paraname
Java程式設計師的Scala入門教程
Java 8擁有了一些初步的函數語言程式設計能力:閉包等,還有新的併發程式設計模型及Stream這個帶高階函式和延遲計算的資料集合。在嘗試了Java 8以後,也許會覺得意猶未盡。是的,你會發現Scala能滿足你在初步嘗試函數語言程式設計後那求知的慾望。 安裝Scala
Spark WordCount簡單案例(java,scala版)
Spark 是什麼?官方文件解釋:Apache Spark™ is a fast and general engine for large-scale data processing.通俗的理解:Spark是基於記憶體計算的大資料平行計算框架。Spark基於記憶體計算,提高了
Java程式設計師的Scala的入門教程
本文是《A Scala Tutorial for Java programmers》英文的翻譯,英文版地址A Scala Tutorial for Java programmers。是Michel Schinz和Philipp Haller編寫,由Bearice成
java程式設計師菜鳥入門之一javaweb專案開發環境
一、java開發環境 1、jdk的下載安裝 JDK:JDK是java語言的軟體開發包,是整個java開發的核心,包含了java的執行環境(jvm+java系統類庫)和java工具,執行java程式的最小環境為jre,開發java程式的最小環境為JDK。 JDK的下載:ora
matlab安裝軟體 Matlab視訊教程李大勇 MATLAB程式開發入門課程 MATLAB神經網路30個案例分析及源程式
數學建模10大演算法詳解_程式原始碼打包 matlab安裝軟體 Matlab視訊教程李大勇 MATLAB程式開發入門課程 MATLAB神經網路30個案例分析及源程式 百度雲連結 http://download.csdn.net/download/wocao1226/10
JProfiler入門教程-簡單的java程式效能調優
推薦文章:JProfiler 入門教程 一、安裝JProfiler 從http://www.ej-technologies.com/下載5.1.2並申請試用序列號 二、主要功能簡介 1.記憶體剖析 Memory profiler JPr