Spark SQL之RDD轉換DataFrame的方法
阿新 • • 發佈:2021-12-02
RDD轉換DataFrame之Reflection方法
第一種方式是使用反射的方式,用反射去推倒出來RDD裡面的schema。這個方式簡單,但是不建議使用,因為在工作當中,使用這種方式是有限制的。
對於以前的版本來說,case class最多支援22個大資料培訓欄位如果超過了22個欄位,我們就必須要自己開發一個類,實現product接口才行。因此這種方式雖然簡單,但是不通用;因為生產中的欄位是非常非常多的,是不可能只有20來個欄位的。
//Java import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import javax.jnlp.PersistenceService; import javax.xml.crypto.Data; public class rddtoDFreflectionJava { public static void main(String[] args) { SparkSession spark = SparkSession .builder() .appName("program") .master("local").config("spark.sql.warehouse.dir", "file:/Users/zhangjingyu/Desktop/Spark架構/spark-warehouse") .getOrCreate(); String Path = "file:/Users/zhangjingyu/Desktop/spark-2.4.0/examples/src/main/resources/people.txt"; JavaRDD<PersonJava> personRDD = Spark.read().textFile(Path).javaRDD().map(line -> { String name = line.split(",")[0]; Long age = Long.valueOf(line.split(",")[1].trim()); PersonJava person = new PersonJava(); person.setName(name); person.setAge(age); return person; }); /** * JavaRDD<PersonJava> personRdd = Spark.read().textFile(Path).javaRDD().map(new Function<String, PersonJava>() { * @Override * public PersonJava call(String line) throws Exception { * String name = line.split(",")[0]; * Long age = Long.valueOf(line.split(",")[1].trim()); * PersonJava person = new PersonJava(); * person.setName(name); * person.setAge(age); * return person; * } * }); */ Dataset<Row> personDF = Spark.createDataFrame(personRDD,PersonJava.class); personDF.createOrReplaceTempView("test"); Dataset<Row> ResultDF = Spark.sql("select * from test a where a.age < 30"); ResultDF.show(); JavaRDD<PersonJava> ResultRDD = ResultDF.javaRDD().map(line -> { PersonJava person = new PersonJava(); person.setName(line.getAs("name")); person.setAge(line.getAs("age")); return person; }); for (PersonJava personJava : ResultRDD.collect()) { System.out.println(personJava.getName()+":"+personJava.getAge()); } /** * JavaRDD<PersonJava> resultRdd = ResultDF.javaRDD().map(new Function<Row, PersonJava>() { * @Override * public PersonJava call(Row row) throws Exception { * PersonJava person = new PersonJava(); * String name = row.getAs("name"); * long age = row.getAs("age"); * person.setName(name); * person.setAge(age); * return person; * } * }); * resultRdd.foreach(new VoidFunction<PersonJava>() { * @Override * public void call(PersonJava personJava) throws Exception { * System.out.println(personJava.getName()+":"+personJava.getAge()); * } * }); */ } } //Scala object rddtoDFreflectionScala { case class Person(name : String , age : Long) def main(args: Array[String]): Unit = { val spark = CommSparkSessionScala.getSparkSession() val path = "file:/Users/zhangjingyu/Desktop/spark-2.4.0/examples/src/main/resources/people.txt" import spark.implicits._; val personDF = spark.sparkContext.textFile(path).map(row => row.split(",")).map(line => { Person(line(0),line(1).trim.toLong) }).toDF personDF.createOrReplaceTempView("test") val resultDF = spark.sql("select * from test a where a.age > 20") val resultrdd = resultDF.rdd.map(x =>{ val name = x.getAs[String]("name") val age = x.getAs[Long]("age") Person(name,age) }) for (elem <- resultrdd.collect()) { System.out.println(elem.name+" : "+ elem.age) } } }
RDD轉換DataFrame之Programm方式
建立一個DataFrame,使用程式設計的方式,這個方式用的非常多。通過程式設計方式指定schema ,對於第一種方式的schema其實定義在了case class裡面了。
//Java import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.Tuple2; import java.util.ArrayList; import java.util.List; public class rddtoDFprogrammJava { public static void main(String[] args) { SparkSession spark = SparkSession .builder() .appName("program") .master("local").config("spark.sql.warehouse.dir", "file:/Users/zhangjingyu/Desktop/Spark架構/spark-warehouse") .getOrCreate(); String Path = "file:/Users/zhangjingyu/Desktop/spark-2.4.0/examples/src/main/resources/people.txt"; //建立列屬性 List<StructField> fields = new ArrayList<>(); StructField structField_name = DataTypes.createStructField("name", DataTypes.StringType, true); StructField structField_age = DataTypes.createStructField("age", DataTypes.LongType, true); fields.add(structField_name); fields.add(structField_age); StructType scheme = DataTypes.createStructType(fields); JavaRDD PersonRdd = spark.read().textFile(Path).javaRDD().map(x -> { String[] lines = x.split(","); return RowFactory.create(lines[0], Long.valueOf(lines[1].trim())); }); Dataset<Row> PersonDF = spark.createDataFrame(PersonRdd, scheme); PersonDF.createOrReplaceTempView("program"); Dataset<Row> ResultDF = spark.sql("select * from program "); ResultDF.show(); for (Row row : ResultDF.javaRDD().collect()) { System.out.println(row); } } } //Scala import org.apache.spark.sql.Row import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} object rddtoDFprogrammScala { def main(args: Array[String]): Unit = { val spark = CommSparkSessionScala.getSparkSession() val path = "file:/Users/zhangjingyu/Desktop/spark-2.4.0/examples/src/main/resources/people.txt" val scheme = StructType(Array( StructField("name",StringType,true), StructField("age",LongType,true) )) val rdd = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => { Row(x(0),x(1).trim.toLong) }) val PersonDF = spark.createDataFrame(rdd,scheme) PersonDF.createOrReplaceTempView("person") val resultDF = spark.sql("select * from person a where a.age < 30") for (elem <- resultDF.collect()) { System.out.println(elem.get(0)+":"+elem.get(1)) } } }
原創作者:張景宇