1. 程式人生 > 實用技巧 >3. Spark常見資料來源

3. Spark常見資料來源

*以下內容由《Spark快速大資料分析》整理所得。

讀書筆記的第三部分是講的是Spark有哪些常見資料來源?怎麼讀取它們的資料並儲存。

Spark有三類常見的資料來源:

  • 檔案格式與檔案系統:它們是儲存在本地檔案系統或分散式檔案系統(比如 NFS、HDFS、Amazon S3 等)中的 資料,例如:文字檔案、JSON、SequenceFile, 以及 protocol buffer
  • Spark SQL中的結構化資料來源:它針對包括JSON和Apache Hive在內的結構化資料來源。
  • 資料庫與鍵值儲存:Spark 自帶的庫和一些第三方庫,它們可以用來連線Cassandra、HBase、Elasticsearch以及JDBC
    源。

一、檔案格式與檔案系統

1. 文字檔案

2. JSON

3. CSV

4. SequenceFile

二、Spark SQL中的結構化資料來源

三、資料庫與鍵值儲存


一、檔案格式與檔案系統

1. 文字檔案
文字檔案讀取:

# 方法1:文字檔案讀取
input = sc.textFile("file://home/holden/repos/sparks/README.md")
# 方法2:如果檔案足夠小,同時讀取整個檔案,從而返回一個pair RDD,其中鍵時輸入檔案的檔名
input = sc.wholeTextFiles("file://home/holden/salesFiles")

文字檔案儲存:

result.saveAsTextFile(outputFile)

2. JSON
JSON讀取:

# JSON讀取
import json
data = input.map(lambda x: json.loads(x))

JSON儲存:

# JSON儲存 - 舉例選出喜愛熊貓的人
(data.filter(lambda x: x["lovesPandas"]).map(lambda x: json.dumps(x)).saveAsTextFile(outputFile))
# 儲存文字檔案
result.SaveAsTextFile(outputFilePath)

3. CSV


CSV讀取:

import csv
import StringIO

# CSV讀取 - 如果資料欄位均沒有包括換行符,只能一行行讀取 def loadRecord(line):
"""解析一行CSV記錄""" input = StringIO.StringIO(line) reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"]) return reader.next()
input
= sc.textFile(inputFile).map(loadRecord)
# CSV讀取
- 如果資料欄位嵌有換行符,需要完整讀入每個檔案 def loadRecords(fileNameContents): """讀取給定檔案中的所有記錄""" input = StringIO.StringIO(fileNameContents[1]) reader = csv.DictReader(input, fieldnames=["name", "favoriteAnimal"]) return reader
fullFileData
= sc.wholeTextFiles(inputFile).flatMap(loadRecords)

CSV儲存:

# CSV儲存
def writeRecords(records):
    """寫出一些CSV記錄"""
    output = StringIO.StringIO()
    writer = csv.DictWriter(output, fieldnames=["names", "favoriteAnimal"])
    for record in records:
        writer.writerow(record)
    return [output.getvalue()]

pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)

4. SequenceFile

SequenceFile讀取:

# sc.sequenceFile(path, keyClass, valueClass)
data = sc.sequenceFile(inFile, "org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable")

SequenceFile儲存(用Scala):

val data = sc.parallelize(List(("Pandas", 3), ("Kay", 6), ("Snail", 2)))
data.saveAsSequenceFile(outputFile)

二、Spark SQL中的結構化資料來源


三、資料庫與鍵值儲存

關於Cassandra、HBase、Elasticsearch以及JDBC源的資料庫連線,詳情請參考書本81-86頁內容。