Spark RDD 按Key儲存到不同檔案
基本需求
將Keyed RDD[(Key,Value)]按Key儲存到不同檔案。
測試資料
資料格式:id,studentId,language,math,english,classId,departmentId 1,111,68,69,90,Class1,Economy 2,112,73,80,96,Class1,Economy 3,113,90,74,75,Class1,Economy 4,114,89,94,93,Class1,Economy 5,115,99,93,89,Class1,Economy 6,121,96,74,79,Class2,Economy 7,122,89,86,85,Class2,Economy 8,123,70,78,61,Class2,Economy 9,124,76,70,76,Class2,Economy 10,211,89,93,60,Class1,English 11,212,76,83,75,Class1,English 12,213,71,94,90,Class1,English 13,214,94,94,66,Class1,English 14,215,84,82,73,Class1,English 15,216,85,74,93,Class1,English 16,221,77,99,61,Class2,English 17,222,80,78,96,Class2,English 18,223,79,74,96,Class2,English 19,224,75,80,78,Class2,English 20,225,82,85,63,Class2,English
用Spark RDD實現
package com.bigData.spark
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.log4j.{Level, Logger}
import org.apache.spark.HashPartitioner
import org.apache.spark.sql.SparkSession
/**
* Author: Wang Pei
* License: Copyright(c) Pei.Wang
* Summary:
* RDD 按Key儲存到不同檔案
*/
object OutputToMultiFile {
def main(args: Array[String]): Unit = {
/**設定日誌等級*/
Logger.getLogger("org").setLevel(Level.WARN)
/**spark環境*/
val spark = SparkSession.builder().master("local[3]").appName(this.getClass.getSimpleName.replace("$","")).getOrCreate()
/**Keyed RDD*/
val data = spark.sparkContext.textFile("data/scores.csv")
//Keyed RDD
.map(item=>(item.split(",").takeRight(2).reverse.mkString("_"),item))
//按Key Hash分割槽,4個Key分到4個Partition中
.partitionBy(new HashPartitioner(4))
/**按Key儲存到不同檔案*/
data.saveAsHadoopFile("data/multiKeyedDir",
classOf[String],
classOf[String],
classOf[PairRDDMultipleTextOutputFormat]
)
spark.stop()
}
}
/**繼承類重寫方法*/
class PairRDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
//1)檔名:根據key和value自定義輸出檔名。
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String ={
val fileNamePrefix=key.asInstanceOf[String]
val fileName=fileNamePrefix+"-"+name
fileName
}
//2)檔案內容:默認同時輸出key和value。這裡指定不輸出key。
override def generateActualKey(key: Any, value: Any): String = {
null
}
}
相關推薦
Spark RDD 按Key儲存到不同檔案
基本需求 將Keyed RDD[(Key,Value)]按Key儲存到不同檔案。 測試資料 資料格式:id,studentId,language,math,english,classId,depart
spark rdd根據key儲存進不同的資料夾
public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> { //靜態內部類,LineRecordWriter,實現了RecordWriter。這個就是我們想要的 protected stati
Spark 兩個RDD按key合併(join運算元和cogroup運算元)
在工作中經常遇到需要合併RDD的情況,記錄下處理情況。join和cogroup運算元都能達到要求,按key合併,只是當rdd存在多個相同的key時候,最終的輸出結果不一樣。網上找到了處理情況,自己也測試了,程式碼如下: object Test { def main(ar
MongoDB配置按天儲存日誌檔案
mongodb預設不提供直接按天來輸出日誌檔案的配置,但是提供一個日誌清理的命令:logRotate。如果日誌不及時清理會導致mongo訪問越來越慢,甚至卡死。 要使用logRotate命令需要進入到mongo shell,然後執行: use admin #db.
Spark RDD/DataFrame map儲存資料的兩種方式
使用Spark RDD或DataFrame,有時需要在foreachPartition或foreachWith裡面儲存資料到本地或HDFS。 直接儲存資料 當然如果不需要在map裡面儲存資料,那麼針對RDD可以有如下方式 val rdd = // targ
Spark實現根據key值來分目錄儲存檔案 多檔案輸出(MultipleOutputFormat)
假設我們有這樣的(key,value)資料: sc.parallelize(List((20180701, "aaa"), (20180702, "bbb"), (20180701, "ccc")))我們想把它們存到路徑“output/”下面,而且key值相同的儲存在同一檔案
Spark RDD 操作實戰之檔案讀取
/1、本地檔案讀取 val local_file_1 = sc.textFile("/home/hadoop/sp.txt") val local_file_2 = sc.textFile("file://home/hadoop/sp.txt") //2、當前目錄下的檔案 val file1 = sc
maven外掛 按配置載入不同環境配置檔案進行打包(maven-war-plugin)
1.配置多種不同環境 如(本地local,開發dev,測試test 環境等) 1 <profiles> 2 <profile> 3 <id>local</id> 4 <p
Spark核心程式設計:建立RDD(集合、本地檔案、HDFS檔案)
1,建立RDD 1.進行Spark核心程式設計時,首先要做的第一件事,就是建立一個初始的RDD。該RDD中,通常就代表和包含了Spark應用程式的輸入源資料。然後在建立了初始的RDD之後,才可以通過Spark Core提供的transformation運算元,
logback 配置 不同level日誌輸出到不同檔案 按天 大小拆分
logback.xml <!-- Logback configuration. See http://logback.qos.ch/manual/index.html --> <configuration scan="true" scanPeriod="1
spark RDD運算元(十一)之RDD Action 儲存操作saveAsTextFile,saveAsSequenceFile,saveAsObjectFile,saveAsHadoopFile 等
關鍵字:Spark運算元、Spark函式、Spark RDD行動Action、Spark RDD儲存操作、saveAsTextFile、saveAsSequenceFile、saveAsObjectFile,saveAsHadoopFile、saveAsHa
Python中陣列按行儲存,與Matlab不同
陣列因為是線性結構,因此在記憶體中都是順序儲存的,但按行儲存還是按列儲存,不同語言有不同的規定。matlab是按列儲存的,而C/C++和python是按行儲存的 。 以二維陣列為例: #python import numpy as np >>
MapReduce讀取txt檔案儲存至HBase,以檔名作Key,整個檔案內容作Value
把已抓取好的網路輿情資訊(以txt形式存放),儲存到HBase中,再進行資訊分析。 要求: 以檔名作Key,整個檔案內容作Value 思路: txt檔案先上傳到HDFS中,再使用HBase MapReduce將檔案寫入HBase中。(很簡單的思路) 問題分析:
spark rdd讀取檔案
rdd讀取一個檔案 val rdd = sc.textFile("hdfs://172.20.20.17:9000/tmp/wht/account/accounts.txt").map(_.split(",")) rdd讀取多個文字檔案 val rdd = sc.text
log4j使用心得之一 -- 自定義日誌級別並分不同檔案儲存
我們組由.NET切換到JAVA,現有程式碼都需要重構,遇到的第一個問題就是缺少一個公共基礎類庫,網上找的或是其他找來的程式碼,往往不能很好的適應公司的需求,迫切的需要一個JAVA版本的公共基礎類庫,適應大夥現有的開發習慣,開發風格. 而開發這樣的基礎類庫,是我最喜歡乾的事情
Spark學習筆記 --- spark RDD載入檔案
wechat:812716131 ------------------------------------------------------ 技術交流群請聯絡上面wechat ----------------------------------------------
使用properties配置log4j2.生成種類日誌並按天儲存檔案
log4j2.properties.直接放在resources下面就行了,程式自己會匹配status = error dest = err name = PropertiesConfig #公共變數 #檔案路徑 property.filePath=logs property
Spark 按key聚合求平均值與佔比
1.求key的平均值 k,v結構的資料中,求每個key對應的平均值,在spark中怎麼應該怎麼求? 例如有如下的資料: ("a",10) ("b",4) ("a",10) ("b",20) 想求a,b對應的平均值。 直接上程式碼 sc.par
完美spring boot 使用log4j2按級別輸出到不同檔案
1. pom.xml引入需要的jar <dependency> <groupId>org.springframework.boot</grou
HadoopConsumer——消費kafka中若干topic的訊息,追加儲存至hdfs的不同檔案內
在kafka原始碼提供的hadoopconsumer的基礎上進行開發,該程式可消費多個topic的訊息,追加至hdfs檔案中。 本程式的輸入輸出檔案有: 配置檔案:topics.properties,指定要消費的topic列表,broker列表,以及程式被呼叫的時