Spark PairRDD 行動與資料分割槽
package com.fei.simple_project; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.apache.spark.HashPartitioner; import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.storage.StorageLevel; import com.google.common.base.Optional; import scala.Tuple2; /** * Hello world! * */ public class App { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Simple Application"); JavaSparkContext sc = new JavaSparkContext(conf); // convert from other RDD JavaRDD<String> line1 = sc.parallelize(Arrays.asList("1 aa", "2 bb", "4 cc", "3 dd")); JavaPairRDD<String, String> prdd = line1.mapToPair(new PairFunction<String, String, String>() { public Tuple2<String, String> call(String x) throws Exception { return new Tuple2(x.split(" ")[0], x); } }); System.out.println("111111111111mapToPair:"); prdd.foreach(new VoidFunction<Tuple2<String, String>>() { public void call(Tuple2<String, String> x) throws Exception { System.out.println(x); } }); // parallelizePairs Tuple2 t1 = new Tuple2(1, 2); Tuple2 t2 = new Tuple2(3, 4); Tuple2 t3 = new Tuple2(3, 6); List list1 = new ArrayList<Tuple2>(); list1.add(t1); list1.add(t2); list1.add(t3); JavaPairRDD<Integer, Integer> line2 = sc.parallelizePairs(list1); line2.persist(StorageLevel.MEMORY_ONLY()); Tuple2 t4 = new Tuple2(3, 9); List list2 = new ArrayList<Tuple2>(); list2.add(t4); JavaPairRDD<Integer, Integer> line3 = sc.parallelizePairs(list2); line3.persist(StorageLevel.MEMORY_ONLY()); // countByKey System.out.println("222222222222222countByKey:"); Map<Integer, Object> ma = line2.countByKey(); for(Entry<Integer, Object> e:ma.entrySet()){ System.out.println(e.getKey()+" "+e.getValue()+" "); } // collectAsMap,如果key已存在,後面覆蓋前面 System.out.println("3333333333333collectAsMap:"); Map<Integer, Integer> ca = line2.collectAsMap(); for(Entry<Integer, Integer> e:ca.entrySet()){ System.out.println(e.getKey()+" "+e.getValue()+" "); } // lookup System.out.println("4444444444444lookup:"); List<Integer> la = line2.lookup(3); for(Integer i:la){ System.out.println(i+" "); } // partitionBy, partitioner, join+filter Tuple2 ta = new Tuple2(1,"sina"); Tuple2 tb = new Tuple2(2, "taobao"); Tuple2 td = new Tuple2(2, "126"); List lista = new ArrayList<Tuple2>(); lista.add(ta); lista.add(tb); lista.add(td); //自帶hash分割槽,此外還有range分割槽 JavaPairRDD<Integer, String> linea = sc.parallelizePairs(lista).partitionBy(new HashPartitioner(2)); linea.persist(StorageLevel.MEMORY_ONLY()); Optional<Partitioner> op = linea.partitioner(); System.out.println("5555555555partitioner: "+ op); System.out.println("66666666666present: "+ op.isPresent()); if (op.isPresent()) { System.out.println("77777777value:"+ op.get().numPartitions()); } Tuple2 tc = new Tuple2(2, "126"); Tuple2 te = new Tuple2(2, "baidu"); Tuple2 tf = new Tuple2(1, "dangdang"); List listc = new ArrayList<Tuple2>(); listc.add(tc); listc.add(te); listc.add(tf); //自定義分割槽 JavaPairRDD<Integer, String> linec = sc.parallelizePairs(listc).partitionBy(new MyPartitioner(2)); linec.persist(StorageLevel.MEMORY_ONLY()); System.out.println("888888888888partitioner: "+ linec.partitioner()); System.out.println("9999999999join:"); JavaPairRDD<Integer, Tuple2<String, String>> js = linea.join(linec); js.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String,String>>>(){ public void call(Tuple2<Integer, Tuple2<String, String>> x) throws Exception { System.out.println(x); } }); JavaPairRDD<Integer, Tuple2<String, String>> fs = js.filter(new Function<Tuple2<Integer, Tuple2<String, String>>, Boolean>(){ public Boolean call(Tuple2<Integer, Tuple2<String, String>> y) throws Exception { return !y._2._1.equals(y._2._2); } }); System.out.println("aaaaaaaaaaafilter:"); fs.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String,String>>>(){ public void call(Tuple2<Integer, Tuple2<String, String>> x) throws Exception { System.out.println(x); } }); } }
package com.fei.simple_project; import org.apache.spark.Partitioner; public class MyPartitioner extends Partitioner { public int num; public MyPartitioner(int N) { num = N; } @Override public int getPartition(Object x) { int ret = x.hashCode()%num; if(ret<0) ret=(-1)*ret; return ret; } @Override public int numPartitions() { return num; } }
111111111111mapToPair:
[Stage 0:> (0 + 0) / 4]16/02/04 20:15:45 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
(2,2 bb)
(4,4 cc)
(3,3 dd)
(1,1 aa)
222222222222222countByKey:
1 1
3 2
3333333333333collectAsMap:
1 2
3 6
4444444444444lookup:
4
6
5555555555partitioner: Optional.of( [email protected])
66666666666present: true
77777777value:2
888888888888partitioner: Optional.of([email protected])
9999999999join:
(1,(sina,dangdang))
(2,(taobao,126))
(2,(taobao,baidu))
(2,(126,126))
(2,(126,baidu))
aaaaaaaaaaafilter:
(1,(sina,dangdang))
(2,(taobao,126))
(2,(taobao,baidu))
(2,(126,baidu))
相關推薦
Spark PairRDD 行動與資料分割槽
package com.fei.simple_project; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; import
一文搞定hive之insert into 和 insert overwrite與資料分割槽
資料分割槽 資料庫分割槽的主要目的是為了在特定的SQL操作中減少資料讀寫的總量以縮減響應時間,主要包括兩種分割槽形式:水平分割槽與垂直分割槽。水平分割槽是對錶進行行分割槽。而垂直分割槽是對列進行分割槽,一般是通過對錶的垂直劃分來減少目標表的寬度
大資料(二十):hive分割槽表、修改表語句與資料的匯入匯出
一、分割槽表 分割槽表實際上就是對應一個HDFS檔案系統上的一個獨立的資料夾,該資料夾下是該分割槽所有的資料檔案,hive中的分割槽就是分目錄,把一個大的資料集更具業務需求分割成小的資料集。在查詢時通過where子句中的
Spark自學之路(七)——資料分割槽
資料分割槽 對資料集在節點間的分割槽控制。在分散式程式中,網路的通訊代價是很大的,因此控制資料分佈以獲得最少的網路傳輸可以極大地提升整體效能,Spark可以控制RDD分割槽來減少網路通訊開銷。分割槽並不是對所有的應用都有好處,如果RDD只被掃
Spark-Streaming獲取kafka資料的兩種方式:Receiver與Direct的方
簡單理解為:Receiver方式是通過zookeeper來連線kafka佇列,Direct方式是直接連線到kafka的節點上獲取資料 回到頂部 使用Kafka的高層次Consumer API來實現。receiver從Kafka中獲取的資料都儲存在Spark Exec
spark 讀取 hdfs 資料分割槽規則
下文以讀取 parquet 檔案 / parquet hive table 為例: hive metastore 和 parquet 轉化的方式通過 spark.sql.hive.convertMetastoreParquet 控制,預設為 true。 如果設定為 true ,會
Spark效能優化之資料傾斜調優與shuffle調優
一、資料傾斜發生的原理 原理:在進行shuffle的時候,必須將各個節點上相同的key拉取到某個節點上的一個task來進行處理,比如按照key進行聚合或join等操作。此時如果某個key對應的資料量特別大的話,就會發生資料傾斜。資料傾斜只會發生在shuffle過程中。常用的並且可能會觸
學習筆記 --- Kafka Spark Streaming獲取Kafka資料 Receiver與Direct的區別
Receiver 使用Kafka的高層次Consumer API來實現 receiver從Kafka中獲取的資料都儲存在Spark Executor的記憶體中,然後Spark Streaming啟動的job會去處理那些資料 要啟用高可靠機制,讓資料零丟失,就必須啟用Spark
spark streaming 接收kafka資料寫入Hive分割槽表
直接上程式碼 object KafkaToHive{ def main(args: Array[String]){ val sparkConf = new SparkConf().setAppName("KafkaToHive") val sc = new SparkConte
Linux下基於Hadoop的大資料環境搭建步驟詳解(Hadoop,Hive,Zookeeper,Kafka,Flume,Hbase,Spark等安裝與配置)
Linux下基於Hadoop的大資料環境搭建步驟詳解(Hadoop,Hive,Zookeeper,Kafka,Flume,Hbase,Spark等安裝與配置) 系統說明 搭建步驟詳述 一、節點基礎配置 二、H
Spark(五)資料讀取與儲存
目錄: 5、資料讀取與儲存 5.1、檔案格式 5.1.1、文字檔案 5.1.2、JSON 5.1.3、逗號分隔值與製表符分隔值 5.1.4、SequenceFile 5.1.5、物件檔案 5.2、檔案系統 5.2.1、本地/“常規”檔案系統 5.2.3、HDF
Spark商業案例與效能調優實戰100課》第3課:商業案例之通過RDD分析大資料電影點評系各種型別的最喜愛電影TopN及效能優化技巧
Spark商業案例與效能調優實戰100課》第3課:商業案例之通過RDD分析大資料電影點評系各種型別的最喜愛電影TopN及效能優化技 原始碼 package com.dt.spark.core
spark三種清理資料的方式:UDF,自定義函式,spark.sql;Python中的zip()與*zip()函式詳解//及python中的*args和**kwargs
(1)UDF的方式清理資料 import sys reload(sys) sys.setdefaultencoding('utf8') import re import json from pyspark.sql import SparkSession
基於Kafka與Spark的實時大資料質量監控平臺
微軟的ASG (應用與服務集團)包含Bing,、Office,、Skype。每天產生多達5 PB以上資料,如何構建一個高擴充套件性的data audit服務來保證這樣量級的資料完整性和實時性非常具有挑戰性。本文將介紹微軟ASG大資料團隊如何利用Kafka、Spark以及Elasticsear
跟我一起學Spark之——資料分割槽
前言 控制資料分佈以獲得最少的網路傳輸可以極大地提升整體效能。 如果給定RDD只需要被掃描一次(例如大小表join中的小表),我們完全沒有必要對其預先進行分割槽處理,只有當資料
元資料與資料治理|Spark SQL結構化資料分析(第六篇)
資料科學家們早已熟悉的R和Pandas等傳統資料分析框架 雖然提供了直觀易用的API,卻侷限於單機,無法覆蓋分散式大資料場景。在Spark1.3.0以Spark SQL原有的SchemaRDD為藍本,引入了Spark DataFrameAPI,不僅為Scala、Python、Jav
spark叢集搭建與mysql元資料管理
找個spark叢集搭建是針對於上一篇hadoop的基礎上搭建的。 所以spark的版本也是要按照著hadoop版本進行下載。 1.解壓spark,修改spark的/etc/profile的home目錄。 2.安裝SCALA,並配置SCALA_HOME。 3.修改spar
TOP100summit:【分享實錄-Microsoft】基於Kafka與Spark的實時大資料質量監控平臺
本篇文章內容來自2016年TOP100summit Microsoft資深產品經理邢國冬的案例分享。 編輯:Cynthia 邢國冬(Tony Xing):Microsoft資深產品經理、負責微軟應用與服務集團的大資料平臺構建,資料產品與服務. 導讀:微軟的
大資料Spark “蘑菇雲”行動補充內容第70課: Spark SQL程式碼實戰和效能調優 4個spark sql調優技巧有用!!!!
大資料Spark “蘑菇雲”行動補充內容第70課: Spark SQL程式碼實戰和效能調優 dataframe: Row是沒有型別的,因為Row中的所有成員都被看著Object型別!!!untype
利用Spark sql操作Hdfs資料與Mysql資料,sql視窗函式的使用
需求說明: 對熱門商品進行統計 根據商品的點選資料,統計出各個區域的銷量排行TOPK 產品 輸入:開始時間與結束時間