1. 程式人生 > >Spark--資料讀取與儲存

Spark--資料讀取與儲存

1、動機
有時候資料量會大到本機可能無法儲存,這時就需要探索別的讀取和儲存方法了。
Spark支援很多種輸入源和輸出源。一部分原因是Spark本身是基於Hadoop生態圈二構建的,so spark可以通過Hadoop MapReduce 所使用的InputFormat 和 OutPutFormat 介面訪問資料,而大部分常見的檔案格式與儲存系統(S3,HDFS,Cassandra,HBase等)都支援這種介面。
Spark所支援的三種常見資料來源:

  • 文字格式與檔案系統
  • SparkSQL
  • 資料庫與鍵值儲存

2、檔案格式:
結構化
- 文字檔案:不是結構化,普通的文字檔案,每一行一條記錄
- JSON: 半結構化,常見的基於文字的格式,半結構化;大多數庫都要求每行一條記錄。
- CSV:結構化,非常常見的基於文字的格式,通常在電子表格應用中使用。
- SequenceFile:結構化,一種用於鍵值對資料的常見Hadoop檔案格式。
- Protocol buffers:結構化,一種快速的,節約空間的跨語言格式。
- 物件檔案:用來將Spark作業中的資料儲存下來以讓共享的程式碼讀取。改變類的時候它會失效,因為他依賴於java序列化。

(1) 文字檔案
當我們將一個文字檔案讀取為RDD時,輸入的每一行都會成為RDD的每一個元素。也可以將多個文字檔案讀取為一個pair RDD,其中鍵是檔名,值是檔案內容。

scala> val input = sc.textFile("/Users/mac/Documents/javascala/VNCluster/500points.txt ")
input: org.apache.spark.rdd.RDD[String] = /Users/mac/Documents/javascala/VNCluster/500points.txt  MapPartitionsRDD[1] at textFile at <console>:24

同樣可以指定分割槽數

scala> val input = sc.textFile("/Users/mac/Documents/javascala/VNCluster/500points.txt ",4)
input: org.apache.spark.rdd.RDD[String] = /Users/mac/Documents/javascala/VNCluster/500points.txt  MapPartitionsRDD[3] at textFile at <console>:24

我是讀入一個數據點文字檔案,然後對資料做一些操作儲存在源目錄中:

scala> val data = input.map
(x => (x.split(" ")(0),x.split(" ")(1))) data: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[6] at map at <console>:26 scala> data.first res3: (String, String) = (0.2489,0.7091) scala> data.filter{case(x,y) => y.toDouble > 0.5} res5: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[10] at filter at <console>:29 scala> val afterDeal = data.filter{case(x,y) => y.toDouble > 0.5} afterDeal: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[11] at filter at <console>:28 scala> afterDeal.first res7: (String, String) = (0.2489,0.7091) scala> afterDeal.saveAsTextFile("/Users/mac/Documents/javascala/VNCluster/rdd.txt")

(2)JSON

  • 讀取JSON檔案:將資料作為文字檔案,然後對JSON資料進行解析,這樣的方法可以在所有支援的程式語言中使用。
  • 儲存JSON檔案:不需要考慮格式錯誤的資料,並且也知道要寫出的資料型別。

(3)逗號分隔值與製表符分隔值
逗號分隔值(CSV)檔案每行都有固定數目的欄位,欄位間用逗號隔開。記錄通常是一行一條,有時也可以跨行。

  • 讀取CSV:
 val conf = new SparkConf().setMaster("local").setAppName("My app")
    val sc = new SparkContext(conf)

    val input = sc.textFile("inputFile")
    val result = input.map{ line => 
      val reader = new CSVReader(new StringBuilder(line))
      reader.readNext()
  • 儲存CSV:和儲存儲存文字檔案基本一樣
scala> val input = sc.textFile("/Users/mac/Desktop/500points.csv")
input: org.apache.spark.rdd.RDD[String] = /Users/mac/Desktop/500points.csv MapPartitionsRDD[1] at textFile at <console>:24

scala> input.first
res0: String = 237.09,712.1

scala> val data = input.map(x => (x.split(",")(0),x.split(",")(1)))
data: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at map at <console>:26

scala> data.first
res1: (String, String) = (237.09,712.1)

scala> val deal = data.mapValues(x => x.toDouble / x.max.toDouble)
deal: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[3] at mapValues at <console>:28

scala> deal.saveAsTextFile("/Users/mac/Desktop/500pointsresult.csv" )

相關推薦

Spark--資料讀取儲存

1、動機 有時候資料量會大到本機可能無法儲存,這時就需要探索別的讀取和儲存方法了。 Spark支援很多種輸入源和輸出源。一部分原因是Spark本身是基於Hadoop生態圈二構建的,so spark可以通過Hadoop MapReduce 所使用的InputF

Spark(五)資料讀取儲存

目錄: 5、資料讀取與儲存 5.1、檔案格式 5.1.1、文字檔案 5.1.2、JSON 5.1.3、逗號分隔值與製表符分隔值 5.1.4、SequenceFile 5.1.5、物件檔案 5.2、檔案系統 5.2.1、本地/“常規”檔案系統 5.2.3、HDF

R 語言資料讀取儲存

一、R語言讀取文字檔案: 1、檔案目錄操作:getwd() : 返回當前工作目錄setwd(“d:/data”) 更改工作目錄 2、常用的讀取指令readread.table() : 讀取文字檔案read.csv(): 讀取csv檔案如果出現缺失值,read.

easyui-tree資料讀取儲存

easyui版本: jQuery EasyUI 1.4.3 工具:eclipse+mysql 注:程式碼有刪除,因為是公司專案,主要是easyui-tree的獲取及儲存 表結構為:商品型別表(s

Go語言基礎(十五)—— Go語言實現json資料檔案讀取儲存

案例: package main import ( "os" "fmt" "encoding/json" "time" ) type Person2 struct { Name string Age int Sex string Hobby []string } fun

spark streaming小實戰之kafka讀取儲存

本次小實戰主要介紹一下spark streaming如何讀取kafka資料涉及理論部分在這就不多說了,自己也剛入門先說下需求待處理日誌格式為ouMrq2r_aU1mtKRTmQclGo1UzY,3251210381,2018/11/29 13:46,上海,上海,210.2.2

[spark streaming] ReceiverTracker 資料產生儲存

前言 在Spark Streaming裡,總體負責任務的動態排程是JobScheduler,而JobScheduler有兩個很重要的成員:JobGenerator 和 ReceiverTracker。JobGenerator 負責將每個 batch 生成具體的

Spark SQL 的資料載入儲存(load , )

Spark SQL主要是操作DataFrame,DataFrame本身提供了save和load的操作. Load:可以建立DataFrame; Save:把DataFrame中的資料儲存到檔案或者說與具體的格式來指明我們要讀取的檔案的型別以及與具體的格式來指出我

python包-numpy資料讀取儲存(二)

目錄 0.為什麼要使用numpy儲存資料 1.儲存為二進位制檔案(.npy/.npz)並讀取 numpy.save和numpy.load numpy.savez numpy.savez_compressed 2.儲存到文字檔案 numpy.savetxt nump

Python 檔案讀取儲存

file1=open('pima-indians-diabetes.txt','r') file2=open('out.txt','w+') #data=file1.read() i=0 while True: line=file1.readline() tt='"'+line[

自定義XML格式讀取儲存

背景         本人頭一回寫部落格,請大家多多關照。通過讀取XML檔案獲取使用者管理許可權,其中涉及三部分: 1.XML檔案的生成; 2.XML檔案的讀取; 3.XML檔案的儲存; 如何做 第一步:自己先將XML檔案格式列出來。  XML格

iOS開發技巧之:相簿中的GIF圖片的讀取儲存

大家都知道iOS的系統相簿是不支援gif圖片預覽的。但是,這並不代表系統相簿不能儲存和讀取gif圖片。通過Safari長按gif圖片,選擇儲存到相簿,這時儲存到相簿裡的圖片就是gif的,雖然它不會動。 下面將介紹如何對系統相簿進行gif的讀取與儲存。 什麼是 UTI iOS系統相

24位bmp影象的資料讀取&儲存

24位bmp檔案的讀取&儲存 我採用的方法是將影象檔案讀取,儲存到一維陣列中,以便後期的操作。    void checkFileExist(FILE * fpbmp) {     //開啟圖片檔案 按照二進位制讀取  

c++的基本資料型別儲存結構(學生筆記)

資料型別: 1.基本型別:整型(int,bool,enum),浮點型(float,double),字元型(char) 2.結構型別:陣列([ ]),結構(struct)聯合(union),類(class) 3.指標型別:(*) 4.空型別:(void) 整形根據示數範圍分為:短整形(sh

OpenCV中影象顯示、讀取儲存

眾所周知,opencv中的cv2.imread函式返回的影象資料,通道是BGR,而不是一般意義上的RGB;但是,這時如果用cv2.imshow進行顯示,看到的卻是正常的樣子;而如果用其他庫的顯示函式,如matplotlib的plt.imshow來顯示,則是異常的顯示,一般都是

opencv學習筆記一:影象讀取儲存

影象讀取函式:cv2.imread(影象路徑,標誌符) 影象路勁可以是絕對路徑和相對路徑; 識別符號有三種: cv2.IMREAD_COLOR (忽視透明度); cv2.IMREAD_GRAYSCALE(轉換成灰度影象讀取); cv2.IMREAD_UNCHANGE

Tensorflow基礎0:檔案的讀取儲存

檔案讀取流程 學習目標 目標 說明TensorFlow檔案讀取的流程 應用 無 有四種獲取資料到TensorFlow程式的方法: tf.dataAPI:輕鬆構建複雜的輸入管道。(優

32、陣列的讀取儲存

32、讀取與儲存 import numpy as np s1 = np.array(range(10)).reshape((2,5)) print(s1) np.save('./陣列',s1) 將s1 儲

Unity3d+Json多物件資料讀取寫入+JsonUtility實現

        這幾天做自己的培訓班的畢業專案,涉及到Json的讀取與寫入,本來想用LitJson的,後來發現5.3以後的版本有自帶的實現,而且很方便,配合System.IO就可以方便的實現,網上這方面資料也不少,但這裡給出更具體的實現,例如Json檔案中不只有一個物件,涉及

Flume+hbase 日誌資料採集儲存

瞭解過flume的人,差不多都看過這張或則類似的圖片,本文即實現上圖部分內容。(由於條件有限,目前是單機上實現) flume-agent配置檔案 #flume agent conf source_agent.sources = server source_agent.si