Spark中使用Java編程的常用方法
阿新 • • 發佈:2017-12-22
廣播 新的 json lambda表達式 aslist rom collect spl nal
二、使用parallelize方法
創建RDD最簡單的方式就是把程序中一個已有的集合傳給SparkContext的parallelize()方法
JavaRDD lines = context.parallelize(Arrays.asList("pandas", "i like pandas"));
System.out.println(lines.collect());
輸出:[pandas, i like pandas]
三、RDD操作(filter方法)
RDD支持兩種操作:轉化操作和行動操作。RDD的轉化操作是返回一個新的RDD的操作,比如map()和filter(),而行動操作則是想驅動器程序返回結果或把結果寫入外部系統的操作,會觸發實際的計算,比如count()和first()。
JavaRDD inputRDD = context.textFile("D:\\log\\521.txt");
JavaRDD errorsRDD = inputRDD.filter(
new Function(){
@Override
public Boolean call(String x) throws Exception {
// TODO Auto-generated method stub
return x.contains("error");
}
});
System.out.println("errors顯示為:" + errorsRDD.collect());
System.out.println("errors個數為:" + errorsRDD.count());
其中521.log為android的logcat文件,裏面包含很多錯誤信息。
四、使用lambda表達式
Java8 開始支持lambda表達式,可以簡潔地實現函數接口。
JavaRDD inputRDD = context.textFile("D:\\log\\521.txt");
JavaRDD errors = inputRDD.filter(s -> s.contains("error"));
System.out.println(errors.count());
輸出:23
五、使用map方法
將函數應用於RDD中的每個元素,將返回值構成新的RDD
JavaRDD rdd = context.parallelize(Arrays.asList(1, 3, 5, 7));
JavaRDD result = rdd.map(
new Function(){
@Override
public Integer call(Integer x) throws Exception {
// TODO Auto-generated method stub
return x * x;
}
});
System.out.println(StringUtils.join(result.collect(), ","));
輸出:1,9,25,49
六、使用flatMap方法
將函數應用於RDD中的每個元素,將返回的叠代器的所有內容構成新的RDD,通常用來切分單詞。與map的區別是:這個函數返回的值是list的一個,去除原有的格式
JavaRDD lines = context.parallelize(Arrays.asList("hello world", "hi"));
JavaRDD words = lines.flatMap(
new FlatMapFunction(){
@Override
public Iterable call(String lines) throws Exception {
// TODO Auto-generated method stub
return Arrays.asList(lines.split(" "));
}
});
System.out.println(words.collect());
System.out.println(words.first());
輸出:
[hello, world, hi]
hello
七、使用PairRDD方法
Spark為包含鍵值對類型的RDD提供了一些專有的操作,這些RDD稱為pair RDD。當需要把一個普通的RDD轉為pair RDD時,可以調用map()函數來實現。
JavaRDD lines = context.parallelize(Arrays.asList("hello world", "hangtian is from hangzhou", "hi", "hi"));
PairFunction keyData = new PairFunction(){
@Override
public Tuple2 call(String x) throws Exception {
// TODO Auto-generated method stub
return new Tuple2(x.split(" ")[0], x);
}
};
JavaPairRDD pairs = (JavaPairRDD) lines.mapToPair(keyData);
System.out.println(pairs.collect());
輸出:[(hello,hello world), (hangtian,hangtian is from hangzhou), (hi,hi), (hi,hi)]
八、計算單詞個數
JavaRDD input = context.textFile("D:\\test.txt");
JavaRDD words = input.flatMap(new FlatMapFunction(){
@Override
public Iterable call(String x) throws Exception {
// TODO Auto-generated method stub
return Arrays.asList(x.split(" "));
}
});
JavaPairRDD wordspair = words.mapToPair(new PairFunction(){
@Override
public Tuple2 call(String x) throws Exception {
// TODO Auto-generated method stub
return new Tuple2(x, 1);
}
});
JavaPairRDD result = wordspair.reduceByKey(new Function2(){
@Override
public Integer call(Integer x, Integer y) throws Exception {
// TODO Auto-generated method stub
return x + y;
}
});
System.out.println(result.sortByKey().collect());
輸出:[(,2), (are,1), (can,1), (go,1), (i,2), (love,1), (me,1), (much,1), (ok?,1), (should,1), (so,2), (with,1), (you,3)]
九、使用Accumulator方法
Spark有兩種共享變量:累加器 和 廣播變量。累加器用來對信息進行聚合,而廣播變量用來高效分發較大的對象。累加器提供了將工作節點中的值聚合到驅動器程序中國的簡單語法。
JavaRDD rdd = context.textFile("D:\\test.txt");
final Accumulator blankLines = context.accumulator(0);
JavaRDD callSigns = rdd.flatMap(new FlatMapFunction(){
@Override
public Iterable call(String line) throws Exception {
// TODO Auto-generated method stub
if(line.equals("")){
blankLines.add(1);
}
return Arrays.asList(line.split(" "));
}
});
System.out.println(callSigns.collect());
System.out.println("Blank lines: " + blankLines.value());
輸出:
[i, love, you, so, much, , so, i, should, you, can, go, with, me, , are, you, ok?]
Blank lines: 2
十、Spark SQL使用
Spark提供Spark SQL來操作結構化和半結構化數據。直白說就是,可以使用sql語句操作json和txt文件進行數據查詢等操作。
JavaRDD rdd = context.textFile("D:\\test.json");
SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());
DataFrame dataFrame = sqlContext.read().json(rdd);
dataFrame.registerTempTable("person");
DataFrame resultDataFrame = sqlContext.sql("select * from person where lovesPandas=true");
resultDataFrame.show(false);
輸出:
+-----------+---------+
|lovesPandas|name |
+-----------+---------+
|true |nanchang |
|true |qier |
|true |kongshuai|
+-----------+---------+
十一、Spark Stream使用
用來實時計算數據,其構造函數接口用來指定多久時間處理一次新數據的批次間隔作為輸入。以下代碼在本地未能執行通過。設想是把netcat工具作為輸入源,在程序中打印輸入信息並進行處理
JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(2000));
JavaDStream lines = jssc.socketTextStream("localhost", 7778);
lines.print();
jssc.start();
jssc.awaitTermination();
以上代碼運行還需要刪除最上面的context初始化的代碼。
原文引自:http://blog.sina.com.cn/s/blog_628cc2b70102w9up.html
一、初始化SparkContext
System.setProperty("hadoop.home.dir", "D:\\spark-1.6.1-bin-hadoop2.6\\spark-1.6.1-bin-hadoop2.6"); SparkConf conf = new SparkConf().setAppName("spark test1").setMaster("local[2]"); JavaSparkContext context = new JavaSparkContext(conf);
Spark中使用Java編程的常用方法