1. 程式人生 > >spark1.6.1讀取csv檔案,轉為為DataFrame和使用SQL

spark1.6.1讀取csv檔案,轉為為DataFrame和使用SQL

一、讀取csv

spark2.0才開始原始碼支援CSV,所以1.6版本需要藉助第三方包來實現讀取CSV檔案,有好幾種方法,

1.如果有maven的,到https://spark-packages.org/package/databricks/spark-csv下載對應scala版本的第三方jar包然後再maven的pom裡面新增denpency,然後根據官網的用法用--packages傳入。這樣它就會自動去maven裡面尋找了。

2.如果是Python開發的,用Python自帶的庫,比如pandas、csv等,可以參考這個部落格

3.如果沒有maven可以通過textfile讀入,然後通過opencsv來轉化。到這裡下載https://sourceforge.net/projects/opencsv/files/latest/download第三方jar包,放到spark安裝目錄的lib目錄和Hadoop的share目錄裡面/usr/local/src/hadoop-2.6.1/share/hadoop/common/lib

import java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
 val pmix=sc.textFile("file:///mnt/hgfs/vm/20170101.csv")
val pmixrdd=pmix.map{line =>val reader=new CSVReader(new StringReader(line)); reader.readNext()}
pmixrdd.count

我放進去一個一千萬行的資料,用了大概兩分鐘

二、轉化為DataFrame

RDD有兩種方式轉化df:根據反射推斷方式和程式設計方式。

根據反射推斷的方式,scala2.10只能支援22列,但是我的表有30多列,所以我選擇程式設計的方式,雖然程式設計的方式比較麻煩點。

import sqlContext.implicits._
import org.apache.spark.sql.types.{StructType,StructField,StringType};
import org.apache.spark.sql.Row;
val schemaString = "region,store_type,sitename,storeid,check_no,employee,dob,dob_full,daypart,hour,minute,qcid,qc_name,qc,category,category_name,item,bohname,longname,tender_name,check_type,tot_amt,tot_amt_ala,price_tot,tot_amt_disc,disc_name,quantity,food_cost,paper_cost,burger_count,dimension_product_mix,dimension_channel_mix"
val schema =
  StructType(
    schemaString.split(",").map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD= pmixrdd.map(p => Row(p(0), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10), p(11), p(12), p(13), p(14), p(15), p(16), p(17), p(18), p(19), p(20), p(21), p(22), p(23), p(24), p(25), p(26), p(27), p(28), p(29), p(30), p(31)))
val pmixdf= sqlContext.createDataFrame(rowRDD, schema)

三、使用SQL

接下來註冊一個臨時表

pmixdf.registerTempTable("pmix")
val results = sqlContext.sql("SELECT * FROM pmix limit 10").show()
pmixdf.write.format("parquet").mode("overwrite").save("file:///mnt/hgfs/vm/pmix.parquet")