1. 程式人生 > >sparksql parquet 合並元數據

sparksql parquet 合並元數據

context tel scala final ext ext js oca load spark

java

 1 public class ParquetMergeSchema {
 2     private static SparkConf conf = new SparkConf().setAppName("parquetmergeschema").setMaster("local");
 3     private static JavaSparkContext jsc = new JavaSparkContext(conf);
 4     private static SparkSession session = new SparkSession(jsc.sc());
 5 
 6
public static void main(String[] args) { 7 JavaRDD<Tuple2<String, Object>> rdd1 = jsc.parallelize( 8 Arrays.asList(new Tuple2<String, Object>("jack", 21), new Tuple2<String, Object>("lucy", 20))); 9 10 JavaRDD<Row> row1 = rdd1.map(new
Function<Tuple2<String, Object>, Row>() { 11 12 private static final long serialVersionUID = 1L; 13 14 @Override 15 public Row call(Tuple2<String, Object> v1) throws Exception { 16 return RowFactory.create(v1._1, v1._2); 17 }
18 }); 19 20 JavaRDD<Tuple2<String, Object>> rdd2 = jsc.parallelize( 21 Arrays.asList(new Tuple2<String, Object>("jack", "A"), new Tuple2<String, Object>("yeye", "B"))); 22 23 JavaRDD<Row> row2 = rdd2.map(new Function<Tuple2<String, Object>, Row>() { 24 25 private static final long serialVersionUID = 1L; 26 27 @Override 28 public Row call(Tuple2<String, Object> v1) throws Exception { 29 return RowFactory.create(v1._1, v1._2); 30 } 31 }); 32 33 StructType schema1 = DataTypes 34 .createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, false), 35 DataTypes.createStructField("age", DataTypes.IntegerType, false))); 36 37 StructType schema2 = DataTypes 38 .createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, false), 39 DataTypes.createStructField("grade", DataTypes.StringType, false) 40 41 )); 42 43 // 將rdd轉成dataset 44 Dataset<Row> ds1 = session.createDataFrame(row1, schema1); 45 46 Dataset<Row> ds2 = session.createDataFrame(row2, schema2); 47 48 // 保存為parquet文件 49 ds1.write().mode(SaveMode.Append).save("./src/main/java/cn/tele/spark_sql/parquet/mergetest"); 50 ds2.write().mode(SaveMode.Append).save("./src/main/java/cn/tele/spark_sql/parquet/mergetest"); 51 52 // 指定parquet文件的目錄進行讀取,設置mergeSchema為true進行合並 53 Dataset<Row> dataset = session.read().option("mergeSchema", true) 54 .load("./src/main/java/cn/tele/spark_sql/parquet/mergetest"); 55 56 dataset.printSchema(); 57 dataset.show(); 58 59 session.stop(); 60 jsc.close(); 61 62 } 63 }

scala

 1 object ParquetMergeSchema {
 2   def main(args: Array[String]): Unit = {
 3     val conf = new SparkConf().setAppName("parquetmergeschema").setMaster("local")
 4     val sc = new SparkContext(conf)
 5     val sqlContext = new SQLContext(sc)
 6 
 7     val rdd1 = sc.parallelize(Array(("jack", 18), ("tele", 20)), 2).map(tuple => { Row(tuple._1, tuple._2) })
 8     val rdd2 = sc.parallelize(Array(("tele", "A"), ("wyc", "A"), ("yeye", "C")), 2).map(tuple => { Row(tuple._1, tuple._2) })
 9 
10     //schema
11     val schema1 = DataTypes.createStructType(Array(
12       StructField("name", DataTypes.StringType, false),
13       StructField("age", DataTypes.IntegerType, false)))
14 
15     val schema2 = DataTypes.createStructType(Array(
16       StructField("name", DataTypes.StringType, false),
17       StructField("grade", DataTypes.StringType, false)))
18 
19     //轉換
20     val df1 = sqlContext.createDataFrame(rdd1, schema1)
21     val df2 = sqlContext.createDataFrame(rdd2, schema2)
22 
23     //寫出
24     df1.write.mode(SaveMode.Append).save("./src/main/scala/cn/tele/spark_sql/parquet/mergetest")
25     df2.write.mode(SaveMode.Append).save("./src/main/scala/cn/tele/spark_sql/parquet/mergetest")
26 
27     //讀取進行合並
28     val df = sqlContext.read.option("mergeSchema", true).parquet("./src/main/scala/cn/tele/spark_sql/parquet/mergetest")
29     df.printSchema()
30     df.show()
31   }
32 }

sparksql parquet 合並元數據