第59課:使用Java和Scala在IDE中實戰RDD和DataFrame轉換操作
內容:
1.RDD與DataFrame轉換的重大意義 2.使用Java實戰RDD與DataFrame轉換 3.使用Scala實戰RDD與dataFrame轉換
一. RDD與DataFrame轉換的重大意義
1.在Spark中RDD可以直接轉換成DataFrame。SparkCore的核心是RDD,所有的排程都是基於RDD完成的,對RDD的操作都可以轉換成基於DataFrame使用SparkSQL來操作。RDD可能接上資料庫,接上NoSQL,其他檔案系統等各種資料來源,然後將資料轉換為DataFrame,極大簡化了大資料的開發,原來寫Scala\Java,現在只需要寫SparkSQL。 2.同時對DataFrame的操作又可以轉換成RDD,基於DataFrame對資料進行SQL或機器學習等操作後又可以轉換為RDD,這對於儲存資料、格式化非常方便。 3.RDD變DataFrame有兩種方式: a)通過反射,推斷RDD元素中的元資料。 RDD中的資料本身是沒有元資料的,例如一個Person的資訊裡有id/name/age,RDD的Record不知道id/name/age這些資訊,但如果變成DataFrame的話,DataFrame必須知道這些資訊。最簡單的就是通過反射知道在RDD和DataFrame轉換時擁有這些元資料資訊。 在Scala中就是Case Class對映。寫一個Case Class,描述RDD中不同列的元資料是什麼。Scala:case class對映。 在Java中就是通過JavaBean。Java:Bean(但不能支援巢狀的JavaBean,也不能有List/Map等複雜的資料結構。只能用簡單的資料型別:String/Int等。Scala就沒有這些限制) 注意:使用反射的前提,已經知道元資料資訊了(靜態的)。但有些場景下只有在執行時才能知道元資料資訊(動態的) b)建立DataFrame時事先不知道元資料資訊,只能在執行時動態構建元資料。然後再把這些元資料資訊應用於RDD上。這種情況是比較常見的情況,即動態獲取Schema。
二、使用Java實戰RDD與DataFrame轉換
1.準備資料:person.txt
1,Spark,7
2,Hadoop,11
3,Flink,5
2.編寫Person.class
package SparkSQL; import java.io.Serializable; /** * FileName: Person * Author: hadoop * Email: [email protected] * Date: 18-10-28 下午4:27 * Description: */ public class Person implements Serializable { private int id; private String name; private int age; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } @Override public String toString() { return "Person{" + "id=" + id + ", name='" + name + '\'' + ", age=" + age + '}'; } }
3.編寫RDDToDataFrameByReflection.class
package SparkSQL;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import java.util.List;
/**
* FileName: RDDToDataFrameByReflection
* Author: hadoop
* Email: [email protected]
* Date: 18-10-28 下午3:27
* Description:使用反射的方式將RDD轉換成為DataFrame
*/
public class RDDToDataFrameByReflection {
public static void main(String[] args){
//建立SparkConf用於讀取系統資訊並設定運用程式的名稱
SparkConf conf = new SparkConf().setAppName("RDDToDataFrameByReflection").setMaster("spark://Master:7077");
//建立JavaSparkContext物件例項作為整個Driver的核心基石
JavaSparkContext sc = new JavaSparkContext(conf);
//設定輸出log的等級,可以設定INFO,WARN,ERROR
sc.setLogLevel("ERROR");
//建立SQLContext上下文物件,用於SqL的分析
SQLContext sqlContext = new SQLContext(sc);
//建立RDD,讀取textFile
JavaRDD<String> lines = sc.textFile("/hadoop/dataframe/input");
/**
* 將讀入的RDD資料轉化為Person型別的DataFrame
*/
JavaRDD<Person> person = lines.map(new Function<String, Person>() {
@Override
public Person call(String line) throws Exception {
String[] splited = line.split(",");
Person p = new Person();
p.setId(Integer.valueOf(splited[0].trim()));
p.setName(splited[1]);
p.setAge(Integer.valueOf(splited[2].trim()));
return p;
}
});
/**
* reateDataFrame方法來自於sqlContext,有兩個引數,第一個是RDD,這裡就是lines.map之後的person
* 這個RDD裡的型別是Person,即每條記錄都是Person,Person其實是有id,name,age的,
* JavaRDD本身並不知道id,name,age資訊,所以要建立DataFrame,DataFrame需要知道id,name,age資訊,
* DataFrame怎麼知道的呢?這裡用createDataFrame時傳入兩個引數,第一個的RDD本身,第二個引數是
* 對RDD中每條資料的元資料的描述,這裡就是java bean class,即person.class
* 實際上工作原理是:person.class傳入時本身會用反射的方式建立DataFrame,
* 在底層通過反射的方式獲得Person的所有fields,結合RDD本身,就生成了DataFrame
*/
Dataset ds = sqlContext.createDataFrame(person,Person.class);
//將DataFrame變成一個TempTable
ds.registerTempTable("person");
//在記憶體中就會生成一個persons的表,在這張臨時表上就可以寫SQL語句了
Dataset bigDatas = sqlContext.sql("select * from person where age >= 6");
//轉過來就可以把查詢後的結果變成 RDD,返回的是JavaRDD<Row>
JavaRDD<Row> bigdataRDD = bigDatas.javaRDD();
//再對RDD進行map操作。元素是一行一行的資料(SQL的Row),結果是Person,再次還原成Person。
//這裡返回的是具體的每條RDD的元素。
JavaRDD<Person> result = bigdataRDD.map(new Function<Row, Person>() {
@Override
public Person call(Row row) throws Exception {
Person p = new Person();
/**
* 轉化為DataFrame後,dataFrame對資料欄位進行了結構優化,
* 對欄位進行了排序,所以使用下面的方式是不能按正確資料順序訪問資料的
* p.setId(row.getInt(0));
* p.setName(row.getString(1));
* p.setAge(row.getInt(2));
*/
p.setId(row.getInt(1));
p.setName(row.getString(2));
p.setAge(row.getInt(0));
return p;
}
});
List<Person> personList = result.collect();
for (Person p : personList){
System.out.println(p);
}
}
}
4.執行結果: