1. 程式人生 > >spark根據key輸出到多個目錄

spark根據key輸出到多個目錄

專案中需要將spark的輸出按id輸出到不同的目錄中,即實現在spark中的多路輸出。我們可以呼叫saveAsHadoopFile函式並自定義一個OutputFormat類,就可以達到上述目的。

import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by WangLei on 17-6-2.
  */
object genSpecifiedUserData { val inputPath = "xxx" def genData(sc:SparkContext) = { sc.textFile(inputPath) .filter(x => StringUtils.split(x, "\t").length == 7) .map(x => { val lines = StringUtils.split(x, "\t") val (id, aid, title, desc) = (lines(0
), lines(1), lines(5), lines(6)) (id, aid, title, desc) }) .filter(x => StringUtils.isNotBlank(x._1)) .map(x => { val (id, aid, title, desc) = (x._1, x._2, x._3, x._4) val pattern = Array(aid, title, desc).mkString("\t"
) (id, pattern) }) .partitionBy(new org.apache.spark.HashPartitioner(10)) } def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("gen_specified_user_data") sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sc = new SparkContext(sparkConf) val output = "/data/xxx/specified/" val fileSystem = FileSystem.get(sc.hadoopConfiguration) fileSystem.delete(new Path(output), true) findSpecifiedUser(sc).saveAsHadoopFile( output, classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat[_, _]]) sc.stop() } } class RDDMultipleTextOutputFormat[K, V]() extends MultipleTextOutputFormat[K, V]() { override def generateFileNameForKeyValue(key: K, value: V, name: String) : String = { (key + "/" + name) } }

其中,輸入的資料是以”\t”分隔一共七列,第一列為使用者id。我們希望將輸出的時候,相同的使用者id輸出到同一個目錄下面,不同的使用者id分開。

RDDMultipleTextOutputFormat類中的generateFileNameForKeyValue函式有三個引數,key和value就是我們RDD的Key和Value,而name引數是每個Reduce的編號。上面的程式碼中沒有使用該引數,而是直接將同一個Key的資料輸出到同一個檔案中。

最後生成的結果在HDFS上為:

/data/xxx/specified/idA/...
/data/xxx/specified/idB/...
/data/xxx/specified/idC/...

相關推薦

spark根據key輸出目錄

專案中需要將spark的輸出按id輸出到不同的目錄中,即實現在spark中的多路輸出。我們可以呼叫saveAsHadoopFile函式並自定義一個OutputFormat類,就可以達到上述目的。 import org.apache.commons.lang3.

Spark實現根據key值來分目錄儲存檔案 檔案輸出(MultipleOutputFormat)

假設我們有這樣的(key,value)資料: sc.parallelize(List((20180701, "aaa"), (20180702, "bbb"), (20180701, "ccc")))我們想把它們存到路徑“output/”下面,而且key值相同的儲存在同一檔案

spark 載入目錄; RDD輸出到hdfs檔案壓縮

(1)  spark textFile載入多個目錄:   其實很簡單,將多個目錄(對應多個字串),用,作為分隔符連線起來    val inputPath = List("hdfs://localhost:9000/test/hiveTest", "hdfs://local

用一個MapReduce輸出key的分割槽檔案

先看一下要處理的資料型別 19392963501,17816115082,2018-09-18 16:19:44,1431 14081946321,13094566759,2018-05-23 09:34:27,0610 13415701165,18939575060,2018-

MapReduce-MulitipleOutputs實現自定義輸出目錄

輸入源資料樣例: Source1-0001 Source2-0002 Source1-0003 Source2-0004 Source1-0005 Source2-0006 Source3-0007 Source3-0008描述: Source1開頭的資料屬於集合A;Sou

map集合根據value找key(一個keykey)

//根據value值獲取到對應的一個key值 public static String getKey(HashMap<String,String> map,String value){ String key = null

如何將Linux系統的目錄及文件備份並壓縮到一個文件,以方面保持和傳遞?

如何將linux系統的多個目錄及文件備份並壓縮到一個文件 以方面保持和傳遞? 1.備份Linux系統  window系統在運行狀態下,我們是無法將文件拷貝出來的,那麽在Linux下呢?她的文件結構式一種樹型結構。而且在系統運行的時候我們可以進行打包所有系統文件。特別要說的在Linux的root賬戶具備

大型工程目錄下的Makefile寫法

qt5 pan ron 指定 com exec bsp 可執行文件 不同 1、前言   目前從事於linux下程序開發,涉及到多個文件,多個目錄,這時候編譯文件的任務量比較大,需要寫Makefile。關於Makefile的詳細內容可以參考網上流傳非常廣泛的《跟我一起寫Ma

sersync 臺服務器、目錄的時時同步、備份

sersync sersync時時同步 同步軟件 一、為什麽要用Rsync+sersync架構?1、sersync是基於Inotify開發的,類似於Inotify-tools的工具2、sersync可以記錄下被監聽目錄中發生變化的(包括增加、刪除、修改)具體某一個文件或某一個目錄的名字,然後使用r

基於zabbix api根據hostname管理template

zabbix api基於zabbix api根據hostname添加多個template 之前寫了一個關聯模版的api但是考慮到每個添加一個template是有點復雜,而且最近有那麽一個需求,所以改了一下方法,使得可以根據hostname添加多個template。話不多說直接上腳本和效果:(en

Map之一個KeyValue的MultiValueMap(一個鍵值)

arrays set for get work article () buffer 很好 原鏈接:https://blog.csdn.net/yanzhenjie1003/article/details/51550264 MultiValueMap可以讓一個key對應多個v

用Jenkins集成ios項目設置scheme,同一代碼自動輸出環境包 實現便捷切換API環境

ios項目 bug 編譯打包 不同配置 online space 測試 jenkin spa Jenkins 安裝使用參考我的博客http://www.cnblogs.com/zhujin/p/9064820.html Xcode 配置:說明 一個schema 對應一套環境

mkdir一次創建目錄

size sha color 承載 ima 技術分享 系統 term ext 系統管理員必用的十大基礎之一也可以這樣連貫起來一次性創建.... 其中的知識點其實就是花括號{}{};可承載一個以逗號(,)分割的列表,並將其展開為多個列表。。。。有個知識點 展開命令行~USER

本地ssh key連線git賬號

在開發過程中,可能需要在本地同時連線到多個git賬戶,如公司內部git和github,但是一個使用者的ssh key只能連線到一個git賬戶,這就需要建立多個ssh key,分別連線到不同的賬戶。具體步驟如下: 1.生成ssh key ssh-keygen -t rsa -b 4096 -C

inotifywait監控目錄

inotifywait監控多個目錄 一.需求: 1.監控特定的服務配置檔案和目錄變更情況。 2.監控自定義檔案和目錄變更情況。 3.可以手動殺掉程序。 4.把所有變更資訊弄到日誌裡。 二.Inotify介紹 Inotify 是一個核心用於通知使用者空間程式檔案系統變化的機制,是基於inode級別

Qt for Android gradle編譯同時輸出渠道apk

前言 Gradle是Android目前主流的編譯工具,Gradle剛出來的時候確實有很多詬病,比如編譯速度,那真是一個慢啊,不過隨著版本的更新,現在的gradle可比當年牛逼多了,功能也越來越完善,畢竟是谷歌的親兒子。那麼在用Qt 做android開發時候,也是預設使用Gradle來

open_basedir設定目錄問題

在PHP內有一個安全選項是open_basedir,這個選項是限制PHP可以開啟的目錄,可以透過php.ini及httpd.conf設定,而我一直也有使用這個選項。 今天server要改一些設定,要將兩個目錄加入open_basedir內,發現用論用空格、逗號、分號來區隔兩個

zbb20181227 map,LinkedHashMap按順序存放key,MultiValueMap 一個keyvalue

MultiValueMap 一個key對多個value 需要引入pom <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-collections4</a

SVN 一次性提交目錄中檔案的方法

情況一:將專案中未加入版本控制的檔案提交到版本庫。  在使用WINDOW下的SVN客戶端工具時,在提交一個專案的檔案時,如果有未加入版本庫的檔案,這時可以先將未加入的檔案選中,然後一起提交。  但在LINUX命令列中,如果一個專案中新建立了一個檔案new.php,那麼我們可以使用如下命令來進行版本的提交。 

cp 快捷命令:複製檔案到目錄

導讀 在學習 Linux 的過程中,對於新手而言總是會使用幾個命令來完成一個簡單的任務。對正在熟悉使用終端的人這是很容易理解的行為。然而,如果你想要成為一個老手,學習我說的“快捷命令”會顯著減少時間浪費。在本篇中,我們會用一個簡單的方法在 Linux 中用一個命令來將目錄