spark學習記錄(十、SparkSQL)
阿新 • • 發佈:2019-01-13
一、介紹
- SparkSQL支援查詢原生的RDD。 RDD是Spark平臺的核心概念,是Spark能夠高效的處理大資料的各種場景的基礎。
- 能夠在Scala中寫SQL語句。支援簡單的SQL語法檢查,能夠在Scala中寫Hive語句訪問Hive資料,並將結果取回作為RDD使用。
DataFrame也是一個分散式資料容器。與RDD類似,然而DataFrame更像傳統資料庫的二維表格,除了資料以外,還掌握資料的結構資訊,即schema。同時,與Hive類似,DataFrame也支援巢狀資料型別(struct、array和map)。從API易用性的角度上 看, DataFrame API提供的是一套高層的關係操作,比函式式的RDD API要更加友好,門檻更低。
DataFrame的底層封裝的是RDD,只不過RDD的泛型是Row型別。
二、載入DataFrame方法
新增依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>2.4.0</version>
</dependency>
public class JavaExample { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("JavaExample"); JavaSparkContext sc = new JavaSparkContext (conf); SQLContext sqlContext = new SQLContext(sc); // 載入json檔案 Dataset<Row> json = sqlContext.read().format("json").load("C://json"); Dataset<Row> json1 = sqlContext.read().json("C://json"); /** * sqlContext讀取json檔案載入成DataFrame時,DataFrame的列會按照ASCII碼排序 * 寫sql查詢出的DataFrame會按照指定欄位顯示列 * show()預設顯示前20行資料,show(100)顯示100行 */ //查詢表內容 // json.show(); //查詢表結構 // json.printSchema(); //select name,age from xxx where age >18 // json.select("name","age").where(json.col("age").gt(18)).show(); /** * 將DataFrame註冊成臨時表 * 注意:t1表這張表既不在記憶體中也不在磁碟中,相當於一個指標指向原始檔,底層操作解析Spark job讀取原始檔 */ json.registerTempTable("t1"); sqlContext.sql("select name,age from t1 where age>18").show(); //DataFrame轉換成RDD,並獲取第一列資料 JavaRDD<Row> rdd = json.javaRDD(); rdd.foreach(new VoidFunction<Row>() { public void call(Row row) throws Exception { System.out.println(row.get(0)); } }); sc.stop(); } }
讀取json格式的檔案建立DataFrame:
- json檔案中的json資料不能巢狀json格式資料。
- DataFrame是一個一個Row型別的RDD,df.rdd()/df.javaRdd()。
- 可以兩種方式讀取json格式的檔案。
- df.show()預設顯示前20行資料。
- DataFrame原生API可以操作DataFrame(不方便)。
- 註冊成臨時表時,表中的列預設按ascii順序顯示列。
普通RDD轉換為DataFrame
public class Person implements Serializable{ private String id; private String name; private Integer age; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; } @Override public String toString() { return "Person{" + "id='" + id + '\'' + ", name='" + name + '\'' + ", age=" + age + '}'; } }
//通過反射的方式將非json格式的RDD轉換成DataFrame
public class JavaExample {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("JavaExample");
JavaSparkContext sc = new JavaSparkContext (conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> lineRDD = sc.textFile("C:\\person.txt");
/**
* 注意:
* 1.自定義類Person必須為public
* 2.RDD轉化為DataFrame會把自定義類欄位名稱按ASCII排序
* 3.自定義類要實現序列化介面
*/
JavaRDD<Person> personRDD = lineRDD.map(new Function<String, Person>() {
public Person call(String s) throws Exception {
Person p = new Person();
p.setId(s.split(",")[0]);
p.setName(s.split(",")[1]);
p.setAge(Integer.valueOf(s.split(",")[2]));
return p;
}
});
Dataset<Row> dataFrame = sqlContext.createDataFrame(personRDD, Person.class);
dataFrame.show();
//將DataFrame轉換為JavaRDD
JavaRDD<Row> javaRDD = dataFrame.javaRDD();
JavaRDD<Person> map = javaRDD.map(new Function<Row, Person>() {
public Person call(Row row) throws Exception {
Person p = new Person();
p.setId((String) row.getAs("id"));
p.setName((String) row.getAs("name"));
p.setAge((Integer) row.getAs("age"));
return p;
}
});
map.foreach(new VoidFunction<Person>() {
public void call(Person person) throws Exception {
System.out.println(person);
}
});
sc.stop();
}
}
//動態建立Schema將非json格式的RDD轉換成DataFrame
public class JavaExample {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("JavaExample");
JavaSparkContext sc = new JavaSparkContext (conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> lineRDD = sc.textFile("C:\\person.txt");
JavaRDD<Row> rowRDD = lineRDD.map(new Function<String, Row>() {
public Row call(String s) throws Exception {
return RowFactory.create(
s.split(",")[0],
s.split(",")[1],
Integer.valueOf(s.split(",")[2])
);
}
});
/**
* 動態構建DataFrame中的元資料
*/
List<StructField> asList = Arrays.asList(
DataTypes.createStructField("id",DataTypes.StringType,true),
DataTypes.createStructField("name",DataTypes.StringType,true),
DataTypes.createStructField("age",DataTypes.IntegerType,true)
);
StructType schema = DataTypes.createStructType(asList);
Dataset<Row> dataFrame = sqlContext.createDataFrame(rowRDD,schema);
dataFrame.show();
sc.stop();
}
}
讀取parquet檔案建立DataFrame
public class JavaExample {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("JavaExample");
JavaSparkContext sc = new JavaSparkContext (conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> jsonRDD = sc.textFile("C:\\json");
Dataset<Row> df = sqlContext.read().json(jsonRDD);
/**
* 將DataFormat儲存成parquet檔案,
* SaveMode指定儲存檔案時的儲存模式:
* Overwrite:覆蓋
* Append:追加
* ErrorIfExists:如果存在就報錯
* Ignore:如果存在就忽略
*/
df.write().mode(SaveMode.Overwrite).parquet("C:\\parquet");
/**
* 載入parquet檔案成DataFrame檔案
*/
Dataset<Row> parquet = sqlContext.read().parquet("C:\\parquet");
parquet.show();
sc.stop();
}
}
讀取JDBC中的資料建立DataFrame(MySql為例)
public class JavaExample {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("JavaExample");
//配置join或聚合操作shuffle資料時的分割槽數量
conf.set("spark.sql.shuffle.partitions","1");
JavaSparkContext sc = new JavaSparkContext (conf);
SQLContext sqlContext = new SQLContext(sc);
/**
* 第一種方法
*/
Map<String, String> options = new HashMap<String, String>();
options.put("url","jdbc:mysql://192.168.2.125:3306/mysql");
options.put("driver","com.mysql.jdbc.Driver");
options.put("user","root");
options.put("password","123456");
options.put("dbtable","t_waybill");
Dataset<Row> load = sqlContext.read().format("jdbc").options(options).load();
load.show();
/**
* 第二種方法
*/
DataFrameReader reader = sqlContext.read().format("jdbc");
reader.option("url","jdbc:mysql://192.168.2.125:3306/mysql");
reader.option("driver","com.mysql.jdbc.Driver");
reader.option("user","root");
reader.option("password","123456");
reader.option("dbtable","t_waybill");
Dataset<Row> load1 = reader.load();
load1.show();
/**
* 將DataFrame結果儲存到mysql中
*/
Properties properties = new Properties();
properties.setProperty("user","root");
properties.setProperty("password","123456");
/**
* SaveMode:
* Overwrite:覆蓋
* Append:追加
* ErrorIfExists:如果存在就報錯
* Ignore:如果存在就忽略
*/
load.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.2.125:3306/mysql","t_waybill",properties);
sc.stop();
}
}