1. 程式人生 > >Spark RDD 按Key儲存到不同檔案

Spark RDD 按Key儲存到不同檔案

基本需求

將Keyed RDD[(Key,Value)]按Key儲存到不同檔案。

測試資料

資料格式:id,studentId,language,math,english,classId,departmentId

1,111,68,69,90,Class1,Economy
2,112,73,80,96,Class1,Economy
3,113,90,74,75,Class1,Economy
4,114,89,94,93,Class1,Economy
5,115,99,93,89,Class1,Economy
6,121,96,74,79,Class2,Economy
7,122,89,86,85,Class2,Economy
8,123,70,78,61,Class2,Economy
9,124,76,70,76,Class2,Economy
10,211,89,93,60,Class1,English
11,212,76,83,75,Class1,English
12,213,71,94,90,Class1,English
13,214,94,94,66,Class1,English
14,215,84,82,73,Class1,English
15,216,85,74,93,Class1,English
16,221,77,99,61,Class2,English
17,222,80,78,96,Class2,English
18,223,79,74,96,Class2,English
19,224,75,80,78,Class2,English
20,225,82,85,63,Class2,English

用Spark RDD實現

package com.bigData.spark

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.log4j.{Level, Logger}
import org.apache.spark.HashPartitioner
import org.apache.spark.sql.SparkSession

/**
  * Author: Wang Pei
  * License: Copyright(c) Pei.Wang
  * Summary:
  *   RDD 按Key儲存到不同檔案
  */
object OutputToMultiFile { def main(args: Array[String]): Unit = { /**設定日誌等級*/ Logger.getLogger("org").setLevel(Level.WARN) /**spark環境*/ val spark = SparkSession.builder().master("local[3]").appName(this.getClass.getSimpleName.replace("$","")).getOrCreate() /**Keyed RDD*/ val data =
spark.sparkContext.textFile("data/scores.csv") //Keyed RDD .map(item=>(item.split(",").takeRight(2).reverse.mkString("_"),item)) //按Key Hash分割槽,4個Key分到4個Partition中 .partitionBy(new HashPartitioner(4)) /**按Key儲存到不同檔案*/ data.saveAsHadoopFile("data/multiKeyedDir", classOf[String], classOf[String], classOf[PairRDDMultipleTextOutputFormat] ) spark.stop() } } /**繼承類重寫方法*/ class PairRDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] { //1)檔名:根據key和value自定義輸出檔名。 override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String ={ val fileNamePrefix=key.asInstanceOf[String] val fileName=fileNamePrefix+"-"+name fileName } //2)檔案內容:默認同時輸出key和value。這裡指定不輸出key。 override def generateActualKey(key: Any, value: Any): String = { null } }

outputToMultiFile.png

相關推薦

Spark RDD Key儲存不同檔案

基本需求 將Keyed RDD[(Key,Value)]按Key儲存到不同檔案。 測試資料 資料格式:id,studentId,language,math,english,classId,depart

spark rdd根據key儲存不同的資料夾

public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {      //靜態內部類,LineRecordWriter,實現了RecordWriter。這個就是我們想要的   protected stati

Spark 兩個RDDkey合併(join運算元和cogroup運算元)

在工作中經常遇到需要合併RDD的情況,記錄下處理情況。join和cogroup運算元都能達到要求,按key合併,只是當rdd存在多個相同的key時候,最終的輸出結果不一樣。網上找到了處理情況,自己也測試了,程式碼如下: object Test { def main(ar

MongoDB配置儲存日誌檔案

mongodb預設不提供直接按天來輸出日誌檔案的配置,但是提供一個日誌清理的命令:logRotate。如果日誌不及時清理會導致mongo訪問越來越慢,甚至卡死。 要使用logRotate命令需要進入到mongo shell,然後執行: use admin #db.

Spark RDD/DataFrame map儲存資料的兩種方式

使用Spark RDD或DataFrame,有時需要在foreachPartition或foreachWith裡面儲存資料到本地或HDFS。 直接儲存資料 當然如果不需要在map裡面儲存資料,那麼針對RDD可以有如下方式 val rdd = // targ

Spark實現根據key值來分目錄儲存檔案檔案輸出(MultipleOutputFormat)

假設我們有這樣的(key,value)資料: sc.parallelize(List((20180701, "aaa"), (20180702, "bbb"), (20180701, "ccc")))我們想把它們存到路徑“output/”下面,而且key值相同的儲存在同一檔案

Spark RDD 操作實戰之檔案讀取

/1、本地檔案讀取 val local_file_1 = sc.textFile("/home/hadoop/sp.txt") val local_file_2 = sc.textFile("file://home/hadoop/sp.txt") //2、當前目錄下的檔案 val file1 = sc

maven外掛 配置載入不同環境配置檔案進行打包(maven-war-plugin)

1.配置多種不同環境 如(本地local,開發dev,測試test 環境等) 1 <profiles> 2 <profile> 3 <id>local</id> 4 <p

Spark核心程式設計:建立RDD(集合、本地檔案、HDFS檔案

1,建立RDD 1.進行Spark核心程式設計時,首先要做的第一件事,就是建立一個初始的RDD。該RDD中,通常就代表和包含了Spark應用程式的輸入源資料。然後在建立了初始的RDD之後,才可以通過Spark Core提供的transformation運算元,

logback 配置 不同level日誌輸出到不同檔案 天 大小拆分

logback.xml <!-- Logback configuration. See http://logback.qos.ch/manual/index.html --> <configuration scan="true" scanPeriod="1

spark RDD運算元(十一)之RDD Action 儲存操作saveAsTextFile,saveAsSequenceFile,saveAsObjectFile,saveAsHadoopFile 等

關鍵字:Spark運算元、Spark函式、Spark RDD行動Action、Spark RDD儲存操作、saveAsTextFile、saveAsSequenceFile、saveAsObjectFile,saveAsHadoopFile、saveAsHa

Python中陣列儲存,與Matlab不同

陣列因為是線性結構,因此在記憶體中都是順序儲存的,但按行儲存還是按列儲存,不同語言有不同的規定。matlab是按列儲存的,而C/C++和python是按行儲存的 。 以二維陣列為例: #python import numpy as np >>

MapReduce讀取txt檔案儲存至HBase,以檔名作Key,整個檔案內容作Value

把已抓取好的網路輿情資訊(以txt形式存放),儲存到HBase中,再進行資訊分析。 要求: 以檔名作Key,整個檔案內容作Value 思路: txt檔案先上傳到HDFS中,再使用HBase MapReduce將檔案寫入HBase中。(很簡單的思路) 問題分析:

spark rdd讀取檔案

rdd讀取一個檔案 val rdd = sc.textFile("hdfs://172.20.20.17:9000/tmp/wht/account/accounts.txt").map(_.split(",")) rdd讀取多個文字檔案 val rdd = sc.text

log4j使用心得之一 -- 自定義日誌級別並分不同檔案儲存

我們組由.NET切換到JAVA,現有程式碼都需要重構,遇到的第一個問題就是缺少一個公共基礎類庫,網上找的或是其他找來的程式碼,往往不能很好的適應公司的需求,迫切的需要一個JAVA版本的公共基礎類庫,適應大夥現有的開發習慣,開發風格. 而開發這樣的基礎類庫,是我最喜歡乾的事情

Spark學習筆記 --- spark RDD載入檔案

wechat:812716131 ------------------------------------------------------ 技術交流群請聯絡上面wechat ----------------------------------------------

使用properties配置log4j2.生成種類日誌並儲存檔案

log4j2.properties.直接放在resources下面就行了,程式自己會匹配status = error dest = err name = PropertiesConfig #公共變數 #檔案路徑 property.filePath=logs property

Spark key聚合求平均值與佔比

1.求key的平均值 k,v結構的資料中,求每個key對應的平均值,在spark中怎麼應該怎麼求? 例如有如下的資料: ("a",10) ("b",4) ("a",10) ("b",20) 想求a,b對應的平均值。 直接上程式碼 sc.par

完美spring boot 使用log4j2級別輸出到不同檔案

1. pom.xml引入需要的jar <dependency> <groupId>org.springframework.boot</grou

HadoopConsumer——消費kafka中若干topic的訊息,追加儲存至hdfs的不同檔案

在kafka原始碼提供的hadoopconsumer的基礎上進行開發,該程式可消費多個topic的訊息,追加至hdfs檔案中。 本程式的輸入輸出檔案有: 配置檔案:topics.properties,指定要消費的topic列表,broker列表,以及程式被呼叫的時