1. 程式人生 > >Spark中RDD轉換成DataFrame的兩種方式(分別用Java和scala實現)

Spark中RDD轉換成DataFrame的兩種方式(分別用Java和scala實現)

 一:準備資料來源

      在專案下新建一個student.txt檔案,裡面的內容為:

print?

  1. <code class="language-java">1,zhangsan,20  
  2. 2,lisi,21  
  3. 3,wanger,19  
  4. 4,fangliu,18</code>  
  1. 1,zhangsan,20

  2. 2,lisi,21

  3. 3,wanger,19

  4. 4,fangliu,18

      二:實現

     Java版:

    1.首先新建一個student的Bean物件,實現序列化和toString()方法,具體程式碼如下:

  1. package com.cxd.sql;

  2. import java.io.Serializable;

  3. @SuppressWarnings("serial")

  4. public class Student implements Serializable {

  5.     String sid;

  6.     String sname;

  7.     int sage;

  8.     public String getSid() {

  9.         return sid;

  10.     }

  11.     public void setSid(String sid) {

  12.         this.sid = sid;

  13.     }

  14.     public String getSname() {

  15.         return sname;

  16.     }

  17.     public void setSname(String sname) {

  18.         this.sname = sname;

  19.     }

  20.     public int getSage() {

  21.         return sage;

  22.     }

  23.     public void setSage(int sage) {

  24.         this.sage = sage;

  25.     }

  26.     @Override

  27.     public String toString() {

  28.         return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]";

  29.     }

  30. }

         2.轉換,具體程式碼如下

  1. package com.cxd.sql;

  2. import java.util.ArrayList;

  3. import org.apache.spark.SparkConf;

  4. import org.apache.spark.api.java.JavaRDD;

  5. import org.apache.spark.sql.Dataset;

  6. import org.apache.spark.sql.Row;

  7. import org.apache.spark.sql.RowFactory;

  8. import org.apache.spark.sql.SaveMode;

  9. import org.apache.spark.sql.SparkSession;

  10. import org.apache.spark.sql.types.DataTypes;

  11. import org.apache.spark.sql.types.StructField;

  12. import org.apache.spark.sql.types.StructType;

  13. public class TxtToParquetDemo {

  14.     public static void main(String[] args) {

  15.         SparkConf conf = new SparkConf().setAppName("TxtToParquet").setMaster("local");

  16.         SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

  17.         reflectTransform(spark);//Java反射

  18.         dynamicTransform(spark);//動態轉換

  19.     }

  20.     /**

  21.      * 通過Java反射轉換

  22.      * @param spark

  23.      */

  24.     private static void reflectTransform(SparkSession spark)

  25.     {

  26.         JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();

  27.         JavaRDD<Student> rowRDD = source.map(line -> {

  28.             String parts[] = line.split(",");

  29.             Student stu = new Student();

  30.             stu.setSid(parts[0]);

  31.             stu.setSname(parts[1]);

  32.             stu.setSage(Integer.valueOf(parts[2]));

  33.             return stu;

  34.         });

  35.         Dataset<Row> df = spark.createDataFrame(rowRDD, Student.class);

  36.         df.select("sid", "sname", "sage").

  37.         coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res");

  38.     }

  39.     /**

  40.      * 動態轉換

  41.      * @param spark

  42.      */

  43.     private static void dynamicTransform(SparkSession spark)

  44.     {

  45.         JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();

  46.         JavaRDD<Row> rowRDD = source.map( line -> {

  47.             String[] parts = line.split(",");

  48.             String sid = parts[0];

  49.             String sname = parts[1];

  50.             int sage = Integer.parseInt(parts[2]);

  51.             return RowFactory.create(

  52.                     sid,

  53.                     sname,

  54.                     sage

  55.                     );

  56.         });

  57.         ArrayList<StructField> fields = new ArrayList<StructField>();

  58.         StructField field = null;

  59.         field = DataTypes.createStructField("sid", DataTypes.StringType, true);

  60.         fields.add(field);

  61.         field = DataTypes.createStructField("sname", DataTypes.StringType, true);

  62.         fields.add(field);

  63.         field = DataTypes.createStructField("sage", DataTypes.IntegerType, true);

  64.         fields.add(field);

  65.         StructType schema = DataTypes.createStructType(fields);

  66.         Dataset<Row> df = spark.createDataFrame(rowRDD, schema);

  67.         df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1");

  68.     }

  69. }

     scala版本:

  1. import org.apache.spark.sql.SparkSession

  2. import org.apache.spark.sql.types.StringType

  3. import org.apache.spark.sql.types.StructField

  4. import org.apache.spark.sql.types.StructType

  5. import org.apache.spark.sql.Row

  6. import org.apache.spark.sql.types.IntegerType

  7. object RDD2Dataset {

  8. case class Student(id:Int,name:String,age:Int)

  9. def main(args:Array[String])

  10. {

  11. val spark=SparkSession.builder().master("local").appName("RDD2Dataset").getOrCreate()

  12. import spark.implicits._

  13. reflectCreate(spark)

  14. dynamicCreate(spark)

  15. }

  16. /**

  17. * 通過Java反射轉換

  18. * @param spark

  19. */

  20. private def reflectCreate(spark:SparkSession):Unit={

  21. import spark.implicits._

  22. val stuRDD=spark.sparkContext.textFile("student2.txt")

  23. //toDF()為隱式轉換

  24. val stuDf=stuRDD.map(_.split(",")).map(parts⇒Student(parts(0).trim.toInt,parts(1),parts(2).trim.toInt)).toDF()

  25. //stuDf.select("id","name","age").write.text("result") //對寫入檔案指定列名

  26. stuDf.printSchema()

  27. stuDf.createOrReplaceTempView("student")

  28. val nameDf=spark.sql("select name from student where age<20")

  29. //nameDf.write.text("result") //將查詢結果寫入一個檔案

  30. nameDf.show()

  31. }

  32. /**

  33. * 動態轉換

  34. * @param spark

  35. */

  36. private def dynamicCreate(spark:SparkSession):Unit={

  37. val stuRDD=spark.sparkContext.textFile("student.txt")

  38. import spark.implicits._

  39. val schemaString="id,name,age"

  40. val fields=schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true))

  41. val schema=StructType(fields)

  42. val rowRDD=stuRDD.map(_.split(",")).map(parts⇒Row(parts(0),parts(1),parts(2)))

  43. val stuDf=spark.createDataFrame(rowRDD, schema)

  44. stuDf.printSchema()

  45. val tmpView=stuDf.createOrReplaceTempView("student")

  46. val nameDf=spark.sql("select name from student where age<20")

  47. //nameDf.write.text("result") //將查詢結果寫入一個檔案

  48. nameDf.show()

  49. }

  50. }

     注:1.上面程式碼全都已經測試通過,測試的環境為spark2.1.0,jdk1.8。

             2.此程式碼不適用於spark2.0以前的版本。

--------------------- 本文來自 黑白調92 的CSDN 部落格 ,全文地址請點選:https://blog.csdn.net/u010592112/article/details/73730796?utm_source=copy