1. 程式人生 > >Spark PairRDD 行動與資料分割槽

Spark PairRDD 行動與資料分割槽

package com.fei.simple_project;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.storage.StorageLevel;

import com.google.common.base.Optional;

import scala.Tuple2;

/**
 * Hello world!
 *
 */
public class App {
	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setAppName("Simple Application");
		JavaSparkContext sc = new JavaSparkContext(conf);

		// convert from other RDD
		JavaRDD<String> line1 = sc.parallelize(Arrays.asList("1 aa", "2 bb", "4 cc", "3 dd"));
		JavaPairRDD<String, String> prdd = line1.mapToPair(new PairFunction<String, String, String>() {
			public Tuple2<String, String> call(String x) throws Exception {
				return new Tuple2(x.split(" ")[0], x);
			}
		});
		System.out.println("111111111111mapToPair:");
		prdd.foreach(new VoidFunction<Tuple2<String, String>>() {
			public void call(Tuple2<String, String> x) throws Exception {
				System.out.println(x);
			}
		});

		// parallelizePairs
		Tuple2 t1 = new Tuple2(1, 2);
		Tuple2 t2 = new Tuple2(3, 4);
		Tuple2 t3 = new Tuple2(3, 6);
		List list1 = new ArrayList<Tuple2>();
		list1.add(t1);
		list1.add(t2);
		list1.add(t3);
		JavaPairRDD<Integer, Integer> line2 = sc.parallelizePairs(list1);
		line2.persist(StorageLevel.MEMORY_ONLY());

		Tuple2 t4 = new Tuple2(3, 9);
		List list2 = new ArrayList<Tuple2>();
		list2.add(t4);
		JavaPairRDD<Integer, Integer> line3 = sc.parallelizePairs(list2);
		line3.persist(StorageLevel.MEMORY_ONLY());
		
		// countByKey
		System.out.println("222222222222222countByKey:");
		Map<Integer, Object> ma = line2.countByKey();
		for(Entry<Integer, Object> e:ma.entrySet()){
			System.out.println(e.getKey()+" "+e.getValue()+" ");
		}
		
		// collectAsMap,如果key已存在,後面覆蓋前面
		System.out.println("3333333333333collectAsMap:");
		Map<Integer, Integer> ca = line2.collectAsMap();
		for(Entry<Integer, Integer> e:ca.entrySet()){
			System.out.println(e.getKey()+" "+e.getValue()+" ");
		}
		
		// lookup
		System.out.println("4444444444444lookup:");
		List<Integer> la = line2.lookup(3);
		for(Integer i:la){
			System.out.println(i+" ");
		}
		
		// partitionBy,  partitioner, join+filter
		Tuple2 ta = new Tuple2(1,"sina");
		Tuple2 tb = new Tuple2(2, "taobao");
		Tuple2 td = new Tuple2(2, "126");
		List lista = new ArrayList<Tuple2>();
		lista.add(ta);
		lista.add(tb);
		lista.add(td);
		//自帶hash分割槽,此外還有range分割槽
		JavaPairRDD<Integer, String> linea = sc.parallelizePairs(lista).partitionBy(new HashPartitioner(2));
		linea.persist(StorageLevel.MEMORY_ONLY());
		Optional<Partitioner> op = linea.partitioner();
		System.out.println("5555555555partitioner: "+ op);
		System.out.println("66666666666present: "+ op.isPresent());
		if (op.isPresent()) {
			System.out.println("77777777value:"+ op.get().numPartitions());
		}
		
		Tuple2 tc = new Tuple2(2, "126");
		Tuple2 te = new Tuple2(2, "baidu");
		Tuple2 tf = new Tuple2(1, "dangdang");
		List listc = new ArrayList<Tuple2>();
		listc.add(tc);
		listc.add(te);
		listc.add(tf);
		//自定義分割槽
		JavaPairRDD<Integer, String> linec = sc.parallelizePairs(listc).partitionBy(new MyPartitioner(2));
		linec.persist(StorageLevel.MEMORY_ONLY());
		System.out.println("888888888888partitioner: "+ linec.partitioner());
		
		System.out.println("9999999999join:");
		JavaPairRDD<Integer, Tuple2<String, String>> js = linea.join(linec);
		js.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String,String>>>(){
			public void call(Tuple2<Integer, Tuple2<String, String>> x) throws Exception {
				System.out.println(x);
			}
		});
		JavaPairRDD<Integer, Tuple2<String, String>> fs = js.filter(new Function<Tuple2<Integer, Tuple2<String, String>>, Boolean>(){
			public Boolean call(Tuple2<Integer, Tuple2<String, String>> y) throws Exception {
				return !y._2._1.equals(y._2._2);
			}
		});
		System.out.println("aaaaaaaaaaafilter:");
		fs.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String,String>>>(){
			public void call(Tuple2<Integer, Tuple2<String, String>> x) throws Exception {
				System.out.println(x);
			}
		});
	}
}


package com.fei.simple_project;

import org.apache.spark.Partitioner;

public class MyPartitioner extends Partitioner {
	public int num;
	public MyPartitioner(int N) {
		num = N;
	}
	@Override
	public int getPartition(Object x) {
		int ret = x.hashCode()%num;
		if(ret<0)
			ret=(-1)*ret;
		return ret;
	}
	@Override
	public int numPartitions() {
		return num;
	}
}

111111111111mapToPair:
[Stage 0:>                                                          (0 + 0) / 4]16/02/04 20:15:45 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
(2,2 bb)
(4,4 cc)
(3,3 dd)
(1,1 aa)
222222222222222countByKey:                                                      
1 1 
3 2 
3333333333333collectAsMap:
1 2 
3 6 
4444444444444lookup:
4 
6 
5555555555partitioner: Optional.of(
[email protected]
) 66666666666present: true 77777777value:2 888888888888partitioner: Optional.of([email protected]) 9999999999join: (1,(sina,dangdang)) (2,(taobao,126)) (2,(taobao,baidu)) (2,(126,126)) (2,(126,baidu)) aaaaaaaaaaafilter: (1,(sina,dangdang)) (2,(taobao,126)) (2,(taobao,baidu)) (2,(126,baidu))


相關推薦

Spark PairRDD 行動資料分割槽

package com.fei.simple_project; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; import

一文搞定hive之insert into 和 insert overwrite資料分割槽

資料分割槽         資料庫分割槽的主要目的是為了在特定的SQL操作中減少資料讀寫的總量以縮減響應時間,主要包括兩種分割槽形式:水平分割槽與垂直分割槽。水平分割槽是對錶進行行分割槽。而垂直分割槽是對列進行分割槽,一般是通過對錶的垂直劃分來減少目標表的寬度

資料(二十):hive分割槽表、修改表語句資料的匯入匯出

一、分割槽表         分割槽表實際上就是對應一個HDFS檔案系統上的一個獨立的資料夾,該資料夾下是該分割槽所有的資料檔案,hive中的分割槽就是分目錄,把一個大的資料集更具業務需求分割成小的資料集。在查詢時通過where子句中的

Spark自學之路(七)——資料分割槽

資料分割槽        對資料集在節點間的分割槽控制。在分散式程式中,網路的通訊代價是很大的,因此控制資料分佈以獲得最少的網路傳輸可以極大地提升整體效能,Spark可以控制RDD分割槽來減少網路通訊開銷。分割槽並不是對所有的應用都有好處,如果RDD只被掃

Spark-Streaming獲取kafka資料的兩種方式:ReceiverDirect的方

 簡單理解為:Receiver方式是通過zookeeper來連線kafka佇列,Direct方式是直接連線到kafka的節點上獲取資料 回到頂部 使用Kafka的高層次Consumer API來實現。receiver從Kafka中獲取的資料都儲存在Spark Exec

spark 讀取 hdfs 資料分割槽規則

下文以讀取 parquet 檔案 / parquet hive table 為例: hive metastore 和 parquet 轉化的方式通過 spark.sql.hive.convertMetastoreParquet 控制,預設為 true。 如果設定為 true ,會

Spark效能優化之資料傾斜調優shuffle調優

一、資料傾斜發生的原理 原理:在進行shuffle的時候,必須將各個節點上相同的key拉取到某個節點上的一個task來進行處理,比如按照key進行聚合或join等操作。此時如果某個key對應的資料量特別大的話,就會發生資料傾斜。資料傾斜只會發生在shuffle過程中。常用的並且可能會觸

學習筆記 --- Kafka Spark Streaming獲取Kafka資料 ReceiverDirect的區別

Receiver 使用Kafka的高層次Consumer API來實現 receiver從Kafka中獲取的資料都儲存在Spark Executor的記憶體中,然後Spark Streaming啟動的job會去處理那些資料 要啟用高可靠機制,讓資料零丟失,就必須啟用Spark

spark streaming 接收kafka資料寫入Hive分割槽

直接上程式碼 object KafkaToHive{ def main(args: Array[String]){ val sparkConf = new SparkConf().setAppName("KafkaToHive") val sc = new SparkConte

Linux下基於Hadoop的大資料環境搭建步驟詳解(Hadoop,Hive,Zookeeper,Kafka,Flume,Hbase,Spark等安裝配置)

Linux下基於Hadoop的大資料環境搭建步驟詳解(Hadoop,Hive,Zookeeper,Kafka,Flume,Hbase,Spark等安裝與配置) 系統說明 搭建步驟詳述 一、節點基礎配置 二、H

Spark(五)資料讀取儲存

目錄: 5、資料讀取與儲存 5.1、檔案格式 5.1.1、文字檔案 5.1.2、JSON 5.1.3、逗號分隔值與製表符分隔值 5.1.4、SequenceFile 5.1.5、物件檔案 5.2、檔案系統 5.2.1、本地/“常規”檔案系統 5.2.3、HDF

Spark商業案例效能調優實戰100課》第3課:商業案例之通過RDD分析大資料電影點評系各種型別的最喜愛電影TopN及效能優化技巧

Spark商業案例與效能調優實戰100課》第3課:商業案例之通過RDD分析大資料電影點評系各種型別的最喜愛電影TopN及效能優化技 原始碼 package com.dt.spark.core

spark三種清理資料的方式:UDF,自定義函式,spark.sql;Python中的zip()*zip()函式詳解//及python中的*args和**kwargs

(1)UDF的方式清理資料 import sys reload(sys) sys.setdefaultencoding('utf8') import re import json from pyspark.sql import SparkSession

基於KafkaSpark的實時大資料質量監控平臺

微軟的ASG (應用與服務集團)包含Bing,、Office,、Skype。每天產生多達5 PB以上資料,如何構建一個高擴充套件性的data audit服務來保證這樣量級的資料完整性和實時性非常具有挑戰性。本文將介紹微軟ASG大資料團隊如何利用Kafka、Spark以及Elasticsear

跟我一起學Spark之——資料分割槽

前言         控制資料分佈以獲得最少的網路傳輸可以極大地提升整體效能。         如果給定RDD只需要被掃描一次(例如大小表join中的小表),我們完全沒有必要對其預先進行分割槽處理,只有當資料

資料資料治理|Spark SQL結構化資料分析(第六篇)

  資料科學家們早已熟悉的R和Pandas等傳統資料分析框架 雖然提供了直觀易用的API,卻侷限於單機,無法覆蓋分散式大資料場景。在Spark1.3.0以Spark SQL原有的SchemaRDD為藍本,引入了Spark DataFrameAPI,不僅為Scala、Python、Jav

spark叢集搭建mysql元資料管理

找個spark叢集搭建是針對於上一篇hadoop的基礎上搭建的。 所以spark的版本也是要按照著hadoop版本進行下載。 1.解壓spark,修改spark的/etc/profile的home目錄。 2.安裝SCALA,並配置SCALA_HOME。 3.修改spar

TOP100summit:【分享實錄-Microsoft】基於KafkaSpark的實時大資料質量監控平臺

本篇文章內容來自2016年TOP100summit Microsoft資深產品經理邢國冬的案例分享。 編輯:Cynthia 邢國冬(Tony Xing):Microsoft資深產品經理、負責微軟應用與服務集團的大資料平臺構建,資料產品與服務. 導讀:微軟的

資料Spark “蘑菇雲”行動補充內容第70課: Spark SQL程式碼實戰和效能調優 4個spark sql調優技巧有用!!!!

大資料Spark “蘑菇雲”行動補充內容第70課: Spark SQL程式碼實戰和效能調優 dataframe: Row是沒有型別的,因為Row中的所有成員都被看著Object型別!!!untype

利用Spark sql操作Hdfs資料Mysql資料,sql視窗函式的使用

需求說明:                                                                  對熱門商品進行統計        根據商品的點選資料,統計出各個區域的銷量排行TOPK 產品        輸入:開始時間與結束時間