1. 程式人生 > >Spark中載入本地(或者hdfs)檔案以及 spark使用SparkContext例項的textFile讀取多個資料夾(巢狀)下的多個數據檔案

Spark中載入本地(或者hdfs)檔案以及 spark使用SparkContext例項的textFile讀取多個資料夾(巢狀)下的多個數據檔案

Spark中載入本地(或者hdfs)檔案以及 spark使用SparkContext例項的textFile讀取多個資料夾(巢狀)下的多個數據檔案

在正常呼叫過程中,難免需要對多個資料夾下的多個檔案進行讀取,然而之前只是明確了spark具備讀取多個檔案的能力。
針對多個資料夾下的多個檔案,以前的做法是先進行資料夾的遍歷,然後再進行各個資料夾目錄的讀取,其實不必那麼麻煩,因為spark原生就支援這樣的能力。
原理也非常簡單,就是textFile功能。編寫這樣的程式碼,讀取上次輸出的多個結果,由於RDD儲存結果都是儲存為一個資料夾。而多個相關聯RDD的結果就是多個資料夾。

(1)通過如下程式碼:

  //## read all files(files in different directorys)
          val alldata = sc.textFile("data/Flag/*/part-*")
          println(alldata.count())   
經過測試,可以實現對多個相關聯RDD儲存結果的一次性讀取

(2)textFile檔案路徑傳入格式

預設是從hdfs讀取檔案,也可以指定sc.textFile("路徑").在路徑前面加上hdfs://表示從hdfs檔案系統上讀

本地檔案讀取 sc.textFile("路徑").在路徑前面加上file:// 表示從本地檔案系統讀,如file:///home/user/spark/README.md

具體格式如下:

sc = SparkContext(conf=conf)
 '''# hdfs目錄格式如下'''
input_data_path = "hdfs://localhost:9002/input/2017-11*"
 '''# 本地檔案目錄'''
input_data_path="file:///Users/a6/Downloads/input_local/2017-09*"
print input_data_path
result = sc.textFile(input_data_path)

textFile的引數是一個path,這個path可以是:
1. 一個檔案路徑,這時候只裝載指定的檔案
2. 一個目錄路徑,這時候只裝載指定目錄下面的所有檔案(不包括子目錄下面的檔案)  (ps,這個沒有測試通過,應該是錯誤的)
3. 通過萬用字元的形式載入多個檔案或者載入多個目錄下面的所有檔案

1)hdfs://localhost:9002/input/下目錄結構如下圖:

localhost:userid_hbsid_map_new a6$ hadoop dfs -ls hdfs://localhost:9002/input/
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

17/11/08 16:30:15 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 3 items
-rw-r--r--   1 a6 supergroup        100 2017-11-07 16:41 hdfs://localhost:9002/input/2017-11-01.txt
-rw-r--r--   1 a6 supergroup        100 2017-11-07 16:41 hdfs://localhost:9002/input/2017-11-10.txt
drwxr-xr-x   - a6 supergroup          0 2017-11-08 15:17 hdfs://localhost:9002/input/test_input
localhost:userid_hbsid_map_new a6$ hadoop dfs -ls hdfs://localhost:9002/input/*
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

17/11/08 16:30:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-rw-r--r--   1 a6 supergroup        100 2017-11-07 16:41 hdfs://localhost:9002/input/2017-11-01.txt
-rw-r--r--   1 a6 supergroup        100 2017-11-07 16:41 hdfs://localhost:9002/input/2017-11-10.txt
Found 1 items
-rw-r--r--   1 a6 supergroup        100 2017-11-08 15:17 hdfs://localhost:9002/input/test_input/2017-11-20.txt
localhost:userid_hbsid_map_new a6$

2)下面是在python編寫的spark程式中測試的結果,判斷是否達到預期。 

'''# hdfs目錄'''
(1)input_data_path = "hdfs://localhost:9002/input/2017-11-01.txt"  #一個檔案路徑,這時候只裝載指定的檔案
(2)input_data_path = "hdfs://localhost:9002/input"  #這個報錯,沒有測試通過  —— 一個目錄路徑,這時候只裝載指定目錄下面的所有檔案(不包括子目錄下面的檔案)
(3)input_data_path = "hdfs://localhost:9002/input/*"   #通過萬用字元的形式載入多個檔案或者載入多個目錄下面的所有檔案

其中形如(2)格式的hdfs輸入會報如下錯誤:
py4j.protocol.Py4JJavaError: An error occurred while calling o18.partitions.
: java.io.IOException: Not a file: hdfs://localhost:9002/input/test_input
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:320)

3)此外,第三點是一個使用小技巧,現在假設我的資料結構為先按天分割槽,再按小時分割槽的,在hdfs上的目錄結構類似於:

/user/hdfs/input/dt=20130728/hr=00/
/user/hdfs/input/dt=20130728/hr=01/
...
/user/hdfs/input/dt=20130728/hr=23/
具體的資料都在hr等於某個時間的目錄下面,現在我們要分析20130728這一天的資料,我們就必須把這個目錄下面的所有hr=*的子目錄下面的資料全部裝載進RDD,於是我們可以這樣寫:sc.textFile("hdfs://n1:8020/user/hdfs/input/dt=20130728/hr=*/"),注意到hr=*,是一個模糊匹配的方式。

4)利用Transformations 操作函式union來合併多個檔案的輸入

'''合併輸入多個數據檔案,並集操作,將源資料集與union中的輸入資料集取並集,
    預設保留重複元素'''
    input_data_path_1 = "hdfs://localhost:9002/input/2017-11-01.txt"
    result1 = sc.textFile(input_data_path_1)
    input_data_path_2= "hdfs://localhost:9002/input/2017-11-10.txt"   
    result2 = sc.textFile(input_data_path_2)
    result = result1.union(result2)

union(otherDataset) 

並集操作,將源資料集與union中的輸入資料集取並集,預設保留重複元素(如果不保留重複元素,可以利用distinct操作去除,下邊介紹distinct時會介紹)。

>>> data1 = sc.parallelize(range(10))
>>> data2 = sc.parallelize(range(6,15))
>>> data1.union(data2).collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 6, 7, 8, 9, 10, 11, 12, 13, 14]
參考網址:http://blog.csdn.net/zy_zhengyang/article/details/46853441

相關推薦

Spark載入本地或者hdfs檔案以及 spark使用SparkContext例項textFile讀取資料()個數檔案

Spark中載入本地(或者hdfs)檔案以及 spark使用SparkContext例項的textFile讀取多個資料夾(巢狀)下的多個數據檔案 在正常呼叫過程中,難免需要對多個資料夾下的多個檔案進行讀取,然而之前只是明確了spark具備讀取多個檔案的能力。針對多個資料夾下

spark讀取資料()檔案

在正常呼叫過程中,難免需要對多個資料夾下的多個檔案進行讀取,然而之前只是明確了Spark具備讀取多個檔案的能力。 針對多個資料夾下的多個檔案,以前的做法是先進行資料夾的遍歷,然後再進行各個資料夾目錄的讀取。 今天在做測試的時候,居然發現spark原生就支援這樣的能力。

使用CMD模式批量刪除指定目錄支援目錄資料指定格式檔案

直入主題 win鍵+r鍵,在執行裡面輸入CMD,開啟dos模式, 直接輸入del /? 出現del的幫助,可以根據需要選擇, 這裡用的是 /s命令 然後輸入就可以了 del /s G:\test\test\*.doc 最後的*.doc即為你要刪除的檔案

SparkMapValues運算元可以將value的值加起來,相當於reducebykey;也可以將value的個數加起來,相當於countbykey

package com.bjsxt; import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.J

PHP使用memcache或者redis儲存session

問題:將session儲存在memcache中的好處? 優點:用 memcache 來儲存 session 在讀寫速度上會比 files 時快很多,而且在多個伺服器需要共用 session 時會比較方

計算資料,總檔案個數python

# -*- coding: utf-8 -*- # Time:2017.03.28 # Author:coplin # Function:Count the number of image file.

在IDEA編寫Spark的WordCount程式傻瓜版

通常會在IDE中編制程式,然後打成jar包,然後提交到叢集,最常用的是建立一個Maven專案,利用Maven來管理jar包的依賴。 一、生成WordCount的jar包 1. 開啟IDEA,File→New→Project→Maven→Next→填寫Groupld和Art

解決問題win10“.dll或者,ocx控制元件已載入,但對DllregisterServer的呼叫失敗,錯誤程式碼為0x80070005”

重構機房的過程需要參考用VB生成的“機房收費系統”,安裝後需要執行的步驟: 第一:需要配置檔案DSN: 檔名稱:charge 伺服器:(local) 使用者名稱:sa 密碼:123456 資料庫:charge_sys 第二:附加資料庫 第三:需要註冊檔案“機房收費系統所需素材”中的3個控

一、Cocos2dx在visualStudio或者vc++環境搭建入門篇

0、概述  Cocos2dx-win32的專案能夠被嚮導生成    嚮導支援vs2008,vs2010(0.8.0以及之後),visualc++2008 Express(0.8.0以及之後),visualc++ 2010 Express(0.8.0以及之後) 1、

spark-shell啟動bug除錯bug除錯

報錯資訊: Exception in thread "main" java.io.IOException: Failed on local exception: com.google.protobuf.InvalidProtocolBufferException: Prot

element-uitable表頭新增元素或者圖示

實現功能:element-ui table表頭開始單元格 新增元素。 直接上程式碼: html <el-table-column type="expand" width="64" label-class-name="use" > //...這裡

載入的動畫網路請求

public class Util { private static ProgressDialog processDia; /** * 顯示載入中對話方塊 * * @param context */

剖析Picasso載入壓縮本地圖片流程解決Android 5.0部分機型無法載入本地圖片的問題

public int read() throws IOException {     if (!this.allowExpire && this.offset + 1L > this.limit) {         this .setLimit(this.limit + (

Fragment載入網頁WebView監聽Back鍵 ,實現返回上一頁的功能

首先建立一個抽象類BackHandledFragment,該類有一個抽象方法onBackPressed(),所有BackHandledFragment的子類在onBackPressed方法中處理各自對Back事件的消費邏輯。onBackPressed返回布林值,宿主Acti

HDFS的checkpoint 檢查點 的問題

1、問題的描述 由於某種原因,需要在原來已經部署了Cloudera CDH叢集上重新部署,重新部署之後,啟動叢集,由於Cloudera Manager 會預設設定dfs.namenode.checkpo

Spark部分:Spark取交集intersection 和取差集subtract 【Java版純程式碼】

package com.bjsxt.spark; import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.Spar

redis 在 php 的應用string篇

否則 發現 版本 com 偏移量 .html incrby his num 本文為我閱讀了 redis參考手冊 之後結合 博友的博客 編寫,註意 php_redis 和 redis-cli 的區別(主要是返回值類型和參數用法) 上一篇:redis 在 php 中

redis 在 php 的應用List篇

color .com 博客 長度 多個 列表 conn ref ron 本文為我閱讀了 redis參考手冊 之後結合 博友的博客 編寫,註意 php_redis 和 redis-cli 的區別(主要是返回值類型和參數用法) 目錄: 一、List(列表) 1、LPUS

redis 在 php 的應用Set篇

之間 進行 group center 集合運算 返回 world cut 數據 上一篇:redis 在 php 中的應用(List篇) 本文為我閱讀了 redis參考手冊 之後編寫,註意 php_redis 和 redis-cli 的區別(主要是返回值類型和參數用法) Re

Rigidbody Angular Drag 角阻力

rigidbody 物體 如果 行鎖 發生 strong 凍結 free 設置 Rigidbody中 Angular Drag (角阻力):同樣指的是空氣阻力,只不過是用來阻礙物體旋轉的。如果設置成無限的話,物體會立即停止旋轉。如果設置成0,物體在上升過程中,會發生側翻旋