1. 程式人生 > 其它 >Spark SQL之RDD轉換DataFrame的方法

Spark SQL之RDD轉換DataFrame的方法

RDD轉換DataFrame之Reflection方法

第一種方式是使用反射的方式,用反射去推倒出來RDD裡面的schema。這個方式簡單,但是不建議使用,因為在工作當中,使用這種方式是有限制的。

對於以前的版本來說,case class最多支援22個大資料培訓欄位如果超過了22個欄位,我們就必須要自己開發一個類,實現product接口才行。因此這種方式雖然簡單,但是不通用;因為生產中的欄位是非常非常多的,是不可能只有20來個欄位的。

//Java
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;


import javax.jnlp.PersistenceService;
import javax.xml.crypto.Data;


public class rddtoDFreflectionJava {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .appName("program")
                .master("local").config("spark.sql.warehouse.dir", "file:/Users/zhangjingyu/Desktop/Spark架構/spark-warehouse")
                .getOrCreate();
        String Path = "file:/Users/zhangjingyu/Desktop/spark-2.4.0/examples/src/main/resources/people.txt";


        JavaRDD<PersonJava> personRDD = Spark.read().textFile(Path).javaRDD().map(line -> {
            String name = line.split(",")[0];
            Long age = Long.valueOf(line.split(",")[1].trim());
            PersonJava person = new PersonJava();
            person.setName(name);
            person.setAge(age);
            return person;
        });
/**
 *         JavaRDD<PersonJava> personRdd = Spark.read().textFile(Path).javaRDD().map(new Function<String, PersonJava>() {
 *             @Override
 *             public PersonJava call(String line) throws Exception {
 *                 String name = line.split(",")[0];
 *                 Long age = Long.valueOf(line.split(",")[1].trim());
 *                 PersonJava person = new PersonJava();
 *                 person.setName(name);
 *                 person.setAge(age);
 *                 return person;
 *             }
 *         });
 */
        Dataset<Row> personDF = Spark.createDataFrame(personRDD,PersonJava.class);
        personDF.createOrReplaceTempView("test");
        Dataset<Row> ResultDF = Spark.sql("select * from test a where a.age < 30");
        ResultDF.show();




        JavaRDD<PersonJava> ResultRDD = ResultDF.javaRDD().map(line -> {
            PersonJava person = new PersonJava();
            person.setName(line.getAs("name"));
            person.setAge(line.getAs("age"));
            return person;
        });


        for (PersonJava personJava : ResultRDD.collect()) {
            System.out.println(personJava.getName()+":"+personJava.getAge());
        }


/**
 *         JavaRDD<PersonJava> resultRdd = ResultDF.javaRDD().map(new Function<Row, PersonJava>() {
 *             @Override
 *             public PersonJava call(Row row) throws Exception {
 *                 PersonJava person = new PersonJava();
 *                 String name = row.getAs("name");
 *                 long age = row.getAs("age");
 *                 person.setName(name);
 *                 person.setAge(age);
 *                 return person;
 *             }
 *         });
 *         resultRdd.foreach(new VoidFunction<PersonJava>() {
 *             @Override
 *             public void call(PersonJava personJava) throws Exception {
 *                 System.out.println(personJava.getName()+":"+personJava.getAge());
 *             }
 *         });
 */
    }
}


//Scala
object rddtoDFreflectionScala {
  case class Person(name : String , age : Long)


  def main(args: Array[String]): Unit = {
    val spark = CommSparkSessionScala.getSparkSession()
    val path = "file:/Users/zhangjingyu/Desktop/spark-2.4.0/examples/src/main/resources/people.txt"
    import spark.implicits._;
    val personDF = spark.sparkContext.textFile(path).map(row => row.split(",")).map(line => {
      Person(line(0),line(1).trim.toLong)
    }).toDF
    personDF.createOrReplaceTempView("test")
    val resultDF = spark.sql("select * from test a where a.age > 20")
    val resultrdd = resultDF.rdd.map(x =>{
      val name = x.getAs[String]("name")
      val age = x.getAs[Long]("age")
      Person(name,age)
    })


    for (elem <- resultrdd.collect()) {
      System.out.println(elem.name+" : "+ elem.age)
    }
  }
}

RDD轉換DataFrame之Programm方式

建立一個DataFrame,使用程式設計的方式,這個方式用的非常多。通過程式設計方式指定schema ,對於第一種方式的schema其實定義在了case class裡面了。

//Java
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;




import java.util.ArrayList;
import java.util.List;


public class rddtoDFprogrammJava {
    public static void main(String[] args) {


        SparkSession spark = SparkSession
                .builder()
                .appName("program")
                .master("local").config("spark.sql.warehouse.dir", "file:/Users/zhangjingyu/Desktop/Spark架構/spark-warehouse")
                .getOrCreate();
        String Path = "file:/Users/zhangjingyu/Desktop/spark-2.4.0/examples/src/main/resources/people.txt";


        //建立列屬性
        List<StructField> fields = new ArrayList<>();
        StructField structField_name = DataTypes.createStructField("name", DataTypes.StringType, true);
        StructField structField_age = DataTypes.createStructField("age", DataTypes.LongType, true);
        fields.add(structField_name);
        fields.add(structField_age);
        StructType scheme = DataTypes.createStructType(fields);


        JavaRDD PersonRdd = spark.read().textFile(Path).javaRDD().map(x -> {
            String[] lines = x.split(",");
            return RowFactory.create(lines[0], Long.valueOf(lines[1].trim()));
        });


        Dataset<Row> PersonDF = spark.createDataFrame(PersonRdd, scheme);
        PersonDF.createOrReplaceTempView("program");
        Dataset<Row> ResultDF = spark.sql("select * from program ");
        ResultDF.show();


        for (Row row : ResultDF.javaRDD().collect()) {
            System.out.println(row);
        }
    }
}


//Scala


import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}


object rddtoDFprogrammScala {
  def main(args: Array[String]): Unit = {
    val spark = CommSparkSessionScala.getSparkSession()
    val path = "file:/Users/zhangjingyu/Desktop/spark-2.4.0/examples/src/main/resources/people.txt"
    val scheme = StructType(Array(
      StructField("name",StringType,true),
      StructField("age",LongType,true)
    ))
    val rdd = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => {
      Row(x(0),x(1).trim.toLong)
    })
    val PersonDF = spark.createDataFrame(rdd,scheme)
    PersonDF.createOrReplaceTempView("person")
    val resultDF = spark.sql("select * from person a where a.age < 30")
    for (elem <- resultDF.collect()) {
      System.out.println(elem.get(0)+":"+elem.get(1))
    }
  }
}

原創作者:張景宇