Spark中RDD轉換成DataFrame的兩種方式(分別用Java和scala實現)
一:準備資料來源
在專案下新建一個student.txt檔案,裡面的內容為:
- <code class="language-java">1,zhangsan,20
- 2,lisi,21
- 3,wanger,19
- 4,fangliu,18</code>
-
1,zhangsan,20
-
2,lisi,21
-
3,wanger,19
-
4,fangliu,18
二:實現
Java版:
1.首先新建一個student的Bean物件,實現序列化和toString()方法,具體程式碼如下:
-
package com.cxd.sql;
-
import java.io.Serializable;
-
@SuppressWarnings("serial")
-
public class Student implements Serializable {
-
String sid;
-
String sname;
-
int sage;
-
public String getSid() {
-
return sid;
-
}
-
public void setSid(String sid) {
-
this.sid = sid;
-
}
-
public String getSname() {
-
return sname;
-
}
-
public void setSname(String sname) {
-
this.sname = sname;
-
}
-
public int getSage() {
-
return sage;
-
}
-
public void setSage(int sage) {
-
this.sage = sage;
-
}
-
@Override
-
public String toString() {
-
return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]";
-
}
-
}
2.轉換,具體程式碼如下
-
package com.cxd.sql;
-
import java.util.ArrayList;
-
import org.apache.spark.SparkConf;
-
import org.apache.spark.api.java.JavaRDD;
-
import org.apache.spark.sql.Dataset;
-
import org.apache.spark.sql.Row;
-
import org.apache.spark.sql.RowFactory;
-
import org.apache.spark.sql.SaveMode;
-
import org.apache.spark.sql.SparkSession;
-
import org.apache.spark.sql.types.DataTypes;
-
import org.apache.spark.sql.types.StructField;
-
import org.apache.spark.sql.types.StructType;
-
public class TxtToParquetDemo {
-
public static void main(String[] args) {
-
SparkConf conf = new SparkConf().setAppName("TxtToParquet").setMaster("local");
-
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
-
reflectTransform(spark);//Java反射
-
dynamicTransform(spark);//動態轉換
-
}
-
/**
-
* 通過Java反射轉換
-
* @param spark
-
*/
-
private static void reflectTransform(SparkSession spark)
-
{
-
JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();
-
JavaRDD<Student> rowRDD = source.map(line -> {
-
String parts[] = line.split(",");
-
Student stu = new Student();
-
stu.setSid(parts[0]);
-
stu.setSname(parts[1]);
-
stu.setSage(Integer.valueOf(parts[2]));
-
return stu;
-
});
-
Dataset<Row> df = spark.createDataFrame(rowRDD, Student.class);
-
df.select("sid", "sname", "sage").
-
coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res");
-
}
-
/**
-
* 動態轉換
-
* @param spark
-
*/
-
private static void dynamicTransform(SparkSession spark)
-
{
-
JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();
-
JavaRDD<Row> rowRDD = source.map( line -> {
-
String[] parts = line.split(",");
-
String sid = parts[0];
-
String sname = parts[1];
-
int sage = Integer.parseInt(parts[2]);
-
return RowFactory.create(
-
sid,
-
sname,
-
sage
-
);
-
});
-
ArrayList<StructField> fields = new ArrayList<StructField>();
-
StructField field = null;
-
field = DataTypes.createStructField("sid", DataTypes.StringType, true);
-
fields.add(field);
-
field = DataTypes.createStructField("sname", DataTypes.StringType, true);
-
fields.add(field);
-
field = DataTypes.createStructField("sage", DataTypes.IntegerType, true);
-
fields.add(field);
-
StructType schema = DataTypes.createStructType(fields);
-
Dataset<Row> df = spark.createDataFrame(rowRDD, schema);
-
df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1");
-
}
-
}
scala版本:
-
import org.apache.spark.sql.SparkSession
-
import org.apache.spark.sql.types.StringType
-
import org.apache.spark.sql.types.StructField
-
import org.apache.spark.sql.types.StructType
-
import org.apache.spark.sql.Row
-
import org.apache.spark.sql.types.IntegerType
-
object RDD2Dataset {
-
case class Student(id:Int,name:String,age:Int)
-
def main(args:Array[String])
-
{
-
val spark=SparkSession.builder().master("local").appName("RDD2Dataset").getOrCreate()
-
import spark.implicits._
-
reflectCreate(spark)
-
dynamicCreate(spark)
-
}
-
/**
-
* 通過Java反射轉換
-
* @param spark
-
*/
-
private def reflectCreate(spark:SparkSession):Unit={
-
import spark.implicits._
-
val stuRDD=spark.sparkContext.textFile("student2.txt")
-
//toDF()為隱式轉換
-
val stuDf=stuRDD.map(_.split(",")).map(parts⇒Student(parts(0).trim.toInt,parts(1),parts(2).trim.toInt)).toDF()
-
//stuDf.select("id","name","age").write.text("result") //對寫入檔案指定列名
-
stuDf.printSchema()
-
stuDf.createOrReplaceTempView("student")
-
val nameDf=spark.sql("select name from student where age<20")
-
//nameDf.write.text("result") //將查詢結果寫入一個檔案
-
nameDf.show()
-
}
-
/**
-
* 動態轉換
-
* @param spark
-
*/
-
private def dynamicCreate(spark:SparkSession):Unit={
-
val stuRDD=spark.sparkContext.textFile("student.txt")
-
import spark.implicits._
-
val schemaString="id,name,age"
-
val fields=schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true))
-
val schema=StructType(fields)
-
val rowRDD=stuRDD.map(_.split(",")).map(parts⇒Row(parts(0),parts(1),parts(2)))
-
val stuDf=spark.createDataFrame(rowRDD, schema)
-
stuDf.printSchema()
-
val tmpView=stuDf.createOrReplaceTempView("student")
-
val nameDf=spark.sql("select name from student where age<20")
-
//nameDf.write.text("result") //將查詢結果寫入一個檔案
-
nameDf.show()
-
}
-
}
注:1.上面程式碼全都已經測試通過,測試的環境為spark2.1.0,jdk1.8。
2.此程式碼不適用於spark2.0以前的版本。
--------------------- 本文來自 黑白調92 的CSDN 部落格 ,全文地址請點選:https://blog.csdn.net/u010592112/article/details/73730796?utm_source=copy