spark1.6使用:讀取本地外部資料,把RDD轉化成DataFrame,儲存為parquet格式,讀取csv格式
一、先開啟Hadoop和spark
略
二、啟動spark-shell
spark-shell --master local[2] --jars /usr/local/src/spark-1.6.1-bin-hadoop2.6/libext/com.mysql.jdbc.Driver.jar
1.讀取spark目錄下面的logs日誌作為測試:
val alllog=sc.textFile("file:///usr/local/src/spark-1.6.1-bin-hadoop2.6/logs/*out*")
alllog.count 看看一共有347記錄
2.轉為為DataFrame
現在讀取進來的是RDD格式,用map函式把每條記錄轉成一行
import org.apache.spark.sql.Row val alllogRDD=alllog.map(x =>Row(x)) import org.apache.spark.sql.types._ val schemaString="line" val schema=StructType(schemaString.split(" ").map(fieldName =>StructField(fieldName,StringType,true))) val alllogDataFrame = sqlContext.createDataFrame(alllogRDD, schema) alllogDataFrame.printSchema #列印schema alllogDataFrame.show(false) #這裡的false表示不省略,否則跟下午一樣,會三點省略
到此為止,已經把RDD轉化為DataFrame了。
三、把DataFrame轉為為表用SQL查詢
alllogDataFrame.registerTempTable("log")
sqlContext.sql("SELECT * FROM log").show()
到此就可以使用SQL了。
四、讀取和儲存外部資料來源
1.讀取json檔案
val df = sqlContext.read.format("json").load("file:///mnt/hgfs/vm/china.json")
df.printSchema
df.select("*").write.format("parquet").mode("overwrite").save("file:///mnt/hgfs/vm/china.parquet") #儲存為parquet格式
這裡的mode可以有overwrite,append,ignore等模式,也可以不用。
這樣就直接生產DataFrame資料,不用新增schema資訊了。
對於parquet檔案,還有更高階的使用方法,直接讀取檔案就行了
val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
對於json裡面有巢狀陣列,想要展開成多行,可以在SQL中使用explode函