第59課:使用Java和Scala在IDE中實戰RDD和DataFrame轉換操作’學習筆記
第59課:使用Java和Scala在IDE中實戰RDD和DataFrame轉換操作’學習筆記
本期內容:
1 RDD與DataFrame轉換的重大意義
2 使用Java實戰RDD與DaraFrame轉換
3 使用Scala實戰RDD與DataFrame轉換
一. RDD與DataFrame轉換的重大意義
在Spark中RDD可以直接轉換成DataFrame。SparkCore的核心是RDD,所有的排程都是基於RDD完成的,對RDD的操作都可以轉換成基於DataFrame使用SparkSQL來操作。RDD可能接上資料庫,接上NoSQL,其他檔案系統等各種資料來源,然後將資料轉換為DataFrame
極大簡化了大資料的開發,原來寫Scala\Java,現在只需要寫SparkSQL。
同時對DataFrame的操作又可以轉換成RDD,基於DataFrame對資料進行SQL或機器學習等操作後又可以轉換為RDD,這對於儲存資料、格式化非常方便。
RDD變DataFrame有兩種方式:
1.通過反射,推斷RDD元素中的元資料。
RDD中的資料本身是沒有元資料的,例如一個Person的資訊裡有id/name/age,RDD的Record不知道id/name/age這些資訊,但如果變成DataFrame的話,DataFrame必須知道這些資訊。如何在RDD和DataFrame轉換時擁有這些元資料資訊呢?最簡單的就是通過反射。
在Scala中就是Case Class對映。寫一個Case Class,描述RDD中不同列的元資料是什麼。
在Java中就是通過JavaBean。
Scala:case class對映。
Java:Bean(但不能支援巢狀的JavaBean,也不能有List/Map等複雜的資料結構。只能用簡單的資料型別:String/Int等。Scala就沒有這些限制)
使用反射的前提:已經知道元資料資訊了(靜態的)。但有些場景下只有在執行時才能知道元資料資訊(動態的)
2. 建立DataFrame時事先不知道元資料資訊,只能在執行時動態構建元資料。然後再把這些元資料資訊應用於RDD上。這種情況是比較常見的情況,即動態獲取
class Person{
private int id;
private String name;
private int age;
}
點選右鍵,選擇source -> Generate Getters and Setters
選擇age、id、name後點擊OK。
即可自動生成getter和setter程式碼:
class Person{
private int id;
private String name;
private int age;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
資料:
在D:\DT-IMF\testdata目錄下建立persons.txt檔案,內容如下:
1,Spark,7
2,Hadoop,11
3,Flink,5
下面是實戰程式碼:
package SparkSQLByJava;
//使用反射的方式將RDD轉換成為DataFrame
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
public class RDD2DataFrameByReflection {
public static void main(String[] args) {
//建立SparkConf物件
SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameByReflection");
//建立SparkContext物件
JavaSparkContext sc = new JavaSparkContext(conf);
//建立SQLContext上下文物件用於SQL分析
SQLContext sqlContext = new SQLContext(sc);
//建立RDD,讀取textFile
JavaRDD<String> lines = sc.textFile("D://DT-IMF//testdata//persons.txt");
JavaRDD<Person> persons = lines.map(new Function<String, Person>() {
@Override
public Person call(String line) throws Exception {
String[] splited = line.split(",");
Person p = new Person();
p.setId(Integer.valueOf(splited[0].trim()));
p.setName(splited[1]);
p.setAge(Integer.valueOf(splited[2].trim()));
return p;
}
});
/*
*reateDataFrame方法來自於sqlContext,有兩個引數,第一個是RDD,這裡就是lines.map之後的persons
*這個RDD裡的型別是person,即每條記錄都是person,person其實是有id,name,age的,
*JavaRDD本身並不知道id,name,age資訊,所以要建立DataFrame,DataFrame需要知道id,name,age資訊,
*DataFrame怎麼知道的呢?這裡用createDataFrame時傳入兩個引數,第一個的RDD本身,第二個引數是
*對RDD中每條資料的元資料的描述,這裡就是java bean class,即person.class
*實際上工作原理是:person.class傳入時本身會用反射的方式建立DataFrame,
*在底層通過反射的方式獲得Person的所有fields,結合RDD本身,就生成了DataFrame
*/
DataFrame df = sqlContext.createDataFrame(persons, Person.class);
//將DataFrame變成一個TempTable。
df.registerTempTable("persons");
//在記憶體中就會生成一個persons的表,在這張臨時表上就可以寫SQL語句了。
DataFrame bigDatas = sqlContext.sql("select * from persons where age >= 6");
//轉過來就可以把查詢後的結果變成 RDD。返回的是JavaRDD<Row>
//注意:這裡需要匯入org.apache.spark.sql.Row
JavaRDD<Row> bigDataRDD = bigDatas.javaRDD();
//再對RDD進行map操作。元素是一行一行的資料(SQL的Row),結果是Person,再次還原成Person。
//這裡返回的是具體的每條RDD的元素。
JavaRDD<Person> result = bigDataRDD.map(new Function<Row, Person>() {
@Override
public Person call(Row row) throws Exception {
Person p = new Person();
//p.setId(row.getInt(0));
//p.setName(row.getString(1));
//p.setAge(row.getInt(2));
//資料讀進來時第一列是id,第二列是name,第三列是age,生成的RDD也是這個順序,
//變成DataFrame後,DataFrame有自己的優化引擎,優化(資料結構優化等)之後再進行處理,
//處理後再變成RDD時就不能保證第一列是id,第二列是name,第三列是age了。
//原因是DataFrame對資料進行了排序。
p.setId(row.getInt(1));
p.setName(row.getString(2));
p.setAge(row.getInt(0));
return p;
}
});
List<Person> personList = result.collect();
for(Person p : personList){
System.out.println(p);
}
}
}
package SparkSQLByJava;
import java.io.Serializable;
public class Person implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
public int id;
public String name;
public int age;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "Person [id=" + id + ", name=" + name + ", age=" + age + "]";
}
}
下面是在eclipse中執行的console資訊:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/03/29 01:24:54 INFO SparkContext: Running Spark version 1.6.0
16/03/29 01:24:57 INFO SecurityManager: Changing view acls to: think
16/03/29 01:24:57 INFO SecurityManager: Changing modify acls to: think
16/03/29 01:24:57 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(think); users with modify permissions: Set(think)
16/03/29 01:25:00 INFO Utils: Successfully started service 'sparkDriver' on port 56818.
16/03/29 01:25:01 INFO Slf4jLogger: Slf4jLogger started
16/03/29 01:25:02 INFO Remoting: Starting remoting
16/03/29 01:25:02 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:56831]
16/03/29 01:25:02 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 56831.
16/03/29 01:25:02 INFO SparkEnv: Registering MapOutputTracker
16/03/29 01:25:03 INFO SparkEnv: Registering BlockManagerMaster
16/03/29 01:25:03 INFO DiskBlockManager: Created local directory at C:\Users\think\AppData\Local\Temp\blockmgr-2307f4ac-3fca-4e65-ad95-99802c35dffc
16/03/29 01:25:03 INFO MemoryStore: MemoryStore started with capacity 1773.8 MB
16/03/29 01:25:03 INFO SparkEnv: Registering OutputCommitCoordinator
16/03/29 01:25:04 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/03/29 01:25:04 INFO SparkUI: Started SparkUI at http://192.168.56.1:4040
16/03/29 01:25:05 INFO Executor: Starting executor ID driver on host localhost
16/03/29 01:25:06 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 56838.
16/03/29 01:25:06 INFO NettyBlockTransferService: Server created on 56838
16/03/29 01:25:06 INFO BlockManagerMaster: Trying to register BlockManager
16/03/29 01:25:06 INFO BlockManagerMasterEndpoint: Registering block manager localhost:56838 with 1773.8 MB RAM, BlockManagerId(driver, localhost, 56838)
16/03/29 01:25:06 INFO BlockManagerMaster: Registered BlockManager
16/03/29 01:25:10 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 127.4 KB, free 127.4 KB)
16/03/29 01:25:10 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 13.9 KB, free 141.3 KB)
16/03/29 01:25:10 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:56838 (size: 13.9 KB, free: 1773.7 MB)
16/03/29 01:25:10 INFO SparkContext: Created broadcast 0 from textFile at RDD2DataFrameByReflection.java:24
16/03/29 01:25:17 WARN : Your hostname, think-PC resolves to a loopback/non-reachable address: fe80:0:0:0:d401:a5b5:2103:6d13%eth8, but we couldn't find any external IP address!
16/03/29 01:25:18 INFO FileInputFormat: Total input paths to process : 1
16/03/29 01:25:18 INFO SparkContext: Starting job: collect at RDD2DataFrameByReflection.java:77
16/03/29 01:25:19 INFO DAGScheduler: Got job 0 (collect at RDD2DataFrameByReflection.java:77) with 1 output partitions
16/03/29 01:25:19 INFO DAGScheduler: Final stage: ResultStage 0 (collect at RDD2DataFrameByReflection.java:77)
16/03/29 01:25:19 INFO DAGScheduler: Parents of final stage: List()
16/03/29 01:25:19 INFO DAGScheduler: Missing parents: List()
16/03/29 01:25:19 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[7] at map at RDD2DataFrameByReflection.java:58), which has no missing parents
16/03/29 01:25:19 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 8.8 KB, free 150.1 KB)
16/03/29 01:25:19 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.6 KB, free 154.6 KB)
16/03/29 01:25:19 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:56838 (size: 4.6 KB, free: 1773.7 MB)
16/03/29 01:25:19 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/03/29 01:25:19 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[7] at map at RDD2DataFrameByReflection.java:58)
16/03/29 01:25:19 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/03/29 01:25:19 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2138 bytes)
16/03/29 01:25:19 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/03/29 01:25:19 INFO HadoopRDD: Input split: file:/D:/DT-IMF/testdata/persons.txt:0+33
16/03/29 01:25:19 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
16/03/29 01:25:19 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
16/03/29 01:25:19 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
16/03/29 01:25:19 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
16/03/29 01:25:19 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
16/03/29 01:25:20 INFO GeneratePredicate: Code generated in 479.023467 ms
16/03/29 01:25:20 INFO GenerateUnsafeProjection: Code generated in 97.650998 ms
16/03/29 01:25:20 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2443 bytes result sent to driver
16/03/29 01:25:20 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1507 ms on localhost (1/1)
16/03/29 01:25:20 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/03/29 01:25:20 INFO DAGScheduler: ResultStage 0 (collect at RDD2DataFrameByReflection.java:77) finished in 1.573 s
16/03/29 01:25:20 INFO DAGScheduler: Job 0 finished: collect at RDD2DataFrameByReflection.java:77, took 1.890426 s
Person [id=1, name=Spark, age=7]
Person [id=2, name=Hadoop, age=11]
16/03/29 01:25:20 INFO SparkContext: Invoking stop() from shutdown hook
16/03/29 01:25:21 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040
16/03/29 01:25:21 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/03/29 01:25:21 INFO MemoryStore: MemoryStore cleared
16/03/29 01:25:21 INFO BlockManager: BlockManager stopped
16/03/29 01:25:21 INFO BlockManagerMaster: BlockManagerMaster stopped
16/03/29 01:25:21 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/03/29 01:25:21 INFO SparkContext: Successfully stopped SparkContext
16/03/29 01:25:21 INFO ShutdownHookManager: Shutdown hook called
16/03/29 01:25:21 INFO ShutdownHookManager: Deleting directory C:\Users\think\AppData\Local\Temp\spark-481db032-91d6-4ced-a94c-b38dd0b9033c
16/03/29 01:25:21 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/03/29 01:25:21 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
執行時報錯解決1:
16/03/29 00:33:31 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.IllegalAccessException: Class org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1 can not access a member of class SparkSQLByJava.Person with modifiers "public"
這是許可權的問題,說明反射時需要類為public。
執行時報錯解決2:
16/03/29 00:53:43 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.NotSerializableException: SparkSQLByJava.Person
Serialization stack:
- object not serializable (class: SparkSQLByJava.Person, value: [email protected])
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 2)
原因是Person類沒有序列化,需要改成public class Person implements Serializable
以上內容是王家林老師DT大資料夢工廠《 IMF傳奇行動》第59課的學習筆記。
王家林老師是Spark、Flink、Docker、Android技術中國區佈道師。Spark亞太研究院院長和首席專家,DT大資料夢工廠創始人,Android軟硬整合原始碼級專家,英語發音魔術師,健身狂熱愛好者。
微信公眾賬號:DT_Spark
電話:18610086859
QQ:1740415547
微訊號:18610086859
新浪微博:ilovepains
相關推薦
第59課:使用Java和Scala在IDE中實戰RDD和DataFrame轉換操作’學習筆記
第59課:使用Java和Scala在IDE中實戰RDD和DataFrame轉換操作’學習筆記 本期內容: 1 RDD與DataFrame轉換的重大意義 2 使用Java實戰RDD與DaraFrame轉換 3 使用Scala實戰RDD與DataFrame轉換 一.
第59課:使用Java和Scala在IDE中實戰RDD和DataFrame轉換操作
內容: 1.RDD與DataFrame轉換的重大意義 2.使用Java實戰RDD與DataFrame轉換 3.使用Scala實戰RDD與dataFrame轉換 一. RDD與DataFrame轉換的重大意義 1.在Spark中RDD可以
第三課:java開發hdfs
node 執行 需要 public conf iss import lean logging (1)關於hdfs小結 hadoop由hdfs + yarn + map/reduce組成, hdfs是數據庫存儲模塊,主要由1臺namenode和n臺datanode組成的一個集
hadoop第五課:java開發Map/Reduce
pla tool @override val code 項目 ssp ava ram 配置系統環境變量HADOOP_HOME,指向hadoop安裝目錄(如果你不想招惹不必要的麻煩,不要在目錄中包含空格或者中文字符)把HADOOP_HOME/bin加到PATH環境變量(非必要
第6課:Java Spring Boot 2.0實戰MyBatis與優化(Java面試題)
《阿里巴巴Java Spring Boot 2.0開發實戰課程》06課本期分享專家:徐雷—阿里巴巴特邀Java講師,MongoDB講師 本期分享主題:Spring Boot2.0實戰MyBatis與優化 (Java面試題)Java Spring Boot 2.0是最新的開發平臺,Mybatis是高效能ORM
第7課:Java Spring Boot 2.0安全機制、漏洞與MVC身份驗證實戰
《阿里巴巴Java Spring Boot 2.0開發實戰課程》07課本期分享專家:徐雷—阿里巴巴特邀Java講師,MongoDB講師 本期分享主題:Java Spring Boot2.0實戰MyBatis與優化 (Java面試題)Java Spring Boot 2.0是最新的開發平臺,深入介紹Sprin
Android學習第四課:Java程式碼實現XML佈局
權重屬性layout_height 在佈局控制元件中(如LinearLayout),子控制元件可以根據權重值(預設為0)來分配所佔據的空間,這需要結合layout_width或layout_height的值進行分類: 1)如果子控制元件layout_weight都為“ma
第35節:Java面向物件中的多執行緒
Java面向物件中的多執行緒 多執行緒 在Java面向物件中的多執行緒中,要理解多執行緒的知識點,首先要掌握什麼是程序,什麼是執行緒?為什麼有多執行緒呢?多執行緒存在的意義有什麼什麼呢?執行緒的建立方式又有哪些?以及要理解多執行緒的特點等。
Scala實戰高手****第16課:Scala implicits程式設計徹底實戰及Spark原始碼鑑賞
隱式轉換:當某個類沒有具體的方法時,可以在該類的伴生物件或上下文中查詢是否存在隱式轉換,將其轉換為可以呼叫該方法的類,通過程式碼簡單的描述下 一:隱式轉換 1、定義類Man class Man(val name: String) 2、定義類SuperMan,並在類中定義一個方法 class Supe
第73課:Spark SQL Thrift Server實戰
內容: 1.SparkSQL Thrift解析與測試 2.SparkSQL Thrift Server JDBC程式設計 一、SparkSQL Thrift解析與測試 ThriftServer是一個JDBC/ODBC介面,使用者可以通過JDBC/
第16課:Scala implicits程式設計徹底實戰及Spark原始碼鑑賞
本節課主要講的內容: 1、函式隱式轉換 2、隱式引數 3、隱式類 4、隱式物件 本節課搜狐視訊地址:http://my.tv.sohu.com/us/299637343/84785657.shtml隱式轉換:當某個類沒有具體的方法時,可以在該類的伴生物件或上下文中查詢是否存在隱式轉換,將其轉換為可以呼叫該方法
第四課:通過配置文件獲取對象(Spring框架中的IOC和DI的底層就是基於這樣的機制)
ted const dex generate stat clas name 必須 nbsp 首先在D盤創建一個文件hero.txt,內容為:com.hero.Hero(此處必須是Hero的完整路徑) 接下來是Hero類 package com.hero; publi
C#程式設計基礎第三課:C#中的運算子和分支語句
知識點:運算子、if結構、if-else語句 一、運算子 運算子用於執行程式程式碼運算,會針對一個以上運算元專案來進行運算。例如:2+3,其運算元是2和3,而運算子則是“+”。在vb2005中運算子大致可以分為5種類型:算術運算子、連線運算子、關係運算符、賦值運算子和邏輯運算子。。
C#程式設計基礎第六課:C#中三元運算子的初級使用和巢狀
知識點:三元運算子的使用。 1、三元運算子 三元運算子的初級使用: 符號: ?: 舉例:int c=bool ? a : b 當bool=true,c=表示式a,當bool=false,c=表示式b。 三元運算子?:是 if~else 語句的簡寫形式 書寫格式
第67課:Spark SQL下采用Java和Scala實現Join的案例綜合實戰(鞏固前面學習的Spark SQL知識)
內容: 1.SparkSQL案例分析 2.SparkSQL下采用Java和Scala實現案例 一、SparkSQL下采用Java和Scala實現案例 學生成績: {"name":"Michael","score":98} {"name":"Andy"
第九章:Java中的final的使用和Java中的super的使用
final關鍵字 使用final關鍵字做標識有“最終的”含義 final可以修飾類、方法、屬性和變數 final修飾類的時候,則該類不允許被繼承 final修飾方法,則該方法不允許被覆蓋 final修飾,則該類的屬性不會進行隱式的初始化(類的初始化屬性必須有值)或在構造方法中賦值(但只能
第91課:SparkStreaming基於Kafka Direct案例實戰和內幕原始碼解密 java.lang.ClassNotFoundException 踩坑解決問題詳細內幕版本
第91課:SparkStreaming基於Kafka Direct案例實戰和內幕原始碼解密 /* * *王家林老師授課http://weibo.com/ilovepains */ 每天晚上20:00YY頻道現場授課頻道68917580 1、作業內容:SparkS
第四天:JAVA中的迴圈語句詳解,和常用例子
1:switch語句(掌握)(1)格式:switch(表示式) {case 值1:語句體1;break;case 值2:語句體2;break;...default:語句體n+1;break;}格式解釋說明:switch:說明這是switch語句。表示式:可以是byte,short,int,charJDK5以後
編寫高質量程式碼:改善Java程式的151個建議(第1章:Java開發中通用的方法和準則___建議11~15)
序列化Serializable是Java提供的通用資料儲存和讀取的介面。任何類只要實現了Serializable介面,就可以被儲存到檔案中,或者作為資料流通過網路傳送到別的地方。 package OSChina.Serializable; import java.io.
爬蟲第三課:互聯網中網頁的解析
iso 來看 指向 應該 pri tro conn 路徑 獲取 基本步驟 這節課們們的目的就是使用Requests模塊+BeautifulSoup模塊爬取網站上的信息 首先爬取一個網站主要分兩步 1、第一步我們要了解服務器與本地交換機制,選擇正確的辦法我們才能獲取正確的信息