Spark 讀取外部檔案的幾種方式
阿新 • • 發佈:2019-02-03
textFile函式
- /**
- * Read a text file from HDFS, a local file system (available on all nodes), or any
- * Hadoop-supported file system URI, and return it as an RDD of Strings.
- */
- def textFile(
- path: String,
- minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
- assertNotStopped()
- hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
- minPartitions).map(pair => pair._2.toString).setName(path)
- }
分析引數:path: String 是一個URI,這個URI可以是HDFS、本地檔案(全部的節點都可以),或者其他Hadoop支援的檔案系統URI返回的是一個字串型別的RDD,也就是是RDD的內部形式是Iterator[(String)]
minPartitions= math.min(defaultParallelism, 2) 是指定資料的分割槽,如果不指定分割槽,當你的核數大於2的時候,不指定分割槽數那麼就是 2
當你的資料大於128M時候,Spark是為每一個快(block)建立一個分片(Hadoop-2.X之後為128m一個block)
1、從當前目錄讀取一個檔案
- val path = "Current.txt" //Current fold file
- val rdd1 = sc.textFile(path,2)
從當前目錄讀取一個Current.txt的檔案
2、從當前目錄讀取多個檔案
- val path = "Current1.txt,Current2.txt," //Current fold file
- val rdd1 = sc.textFile(path,2)
從當前讀取兩個檔案,分別是Cuttent1.txt和Current2.txt
3、從本地系統讀取一個檔案
- val path = "file:///usr/local/spark/spark-1.6.0-bin-hadoop2.6/README.md" //local file
- val rdd1 = sc.textFile(path,2)
從本地系統讀取一個檔案,名字是README.md
4、從本地系統讀取整個資料夾
- val path = "file:///usr/local/spark/spark-1.6.0-bin-hadoop2.6/licenses/" //local file
- val rdd1 = sc.textFile(path,2)
從本地系統中讀取licenses這個資料夾下的所有檔案
這裡特別注意的是,比如這個資料夾下有35個檔案,上面分割槽數設定是2,那麼整個RDD的分割槽數是35*2?
這是錯誤的,這個RDD的分割槽數不管你的partition數設定為多少時,只要license這個資料夾下的這個檔案a.txt
(比如有a.txt)沒有超過128m,那麼a.txt就只有一個partition。那麼就是說只要這35個檔案其中沒有一個超過
128m,那麼分割槽數就是 35個
5、從本地系統讀取多個檔案
- val path = "file:///usr/local/spark/spark-1.6.0-bin-hadoop2.6/licenses/LICENSE-scala.txt,file:///usr/local/spark/spark-1.6.0-bin-hadoop2.6/licenses/LICENSE-spire.txt" //local file
- val rdd1 = sc.textFile(path,2)
從本地系統中讀取file:///usr/local/spark/spark-1.6.0-bin-hadoop2.6/licenses/下的LICENSE-spire.txt和
LICENSE-scala.txt兩個檔案。上面分割槽設定是2,那個RDD的整個分割槽數是2*2
6、從本地系統讀取多個資料夾下的檔案(把如下檔案全部讀取進來)
- val path = "/usr/local/spark/spark-1.6.0-bin-hadoop2.6/data/*/*" //local file
- val rdd1 = sc.textFile(path,2)
採用萬用字元的形式來代替檔案,來對資料資料夾進行整體讀取。但是後面設定的分割槽數2也是可以去除的。因為一個檔案沒有達到128m,所以上面的一個檔案一個partition,一共是20個。
7、採用萬用字元,來讀取多個檔名類似的檔案
比如讀取如下檔案的people1.txt和people2.txt,但google.txt不讀取
- for (i <- 1 to 2){
- val rdd1 = sc.textFile(s"/root/application/temp/people$i*",2)
- }
8、採用萬用字元讀取相同字尾的檔案
- val path = "/usr/local/spark/spark-1.6.0-bin-hadoop2.6/data/*/*.txt" //local file
- val rdd1 = sc.textFile(path,2)
9、從HDFS讀取一個檔案
- val path = "hdfs://master:9000/examples/examples/src/main/resources/people.txt"
- val rdd1 = sc.textFile(path,2)
從HDFS中讀取檔案的形式和本地上一樣,只是前面的路徑要表明是HDFS中的 </div>
</div>
</article>