SparkSQL建立RDD:建立DataFrame的方式,配置Spark on Hive【文字說明+關鍵程式碼】
阿新 • • 發佈:2019-01-28
建立DataFrame的方式
建立DataFrame的方式 1).讀取json格式的檔案 a).json檔案不能巢狀 b).讀取的兩種方式: DataFrame df = sqlContext.read().format("json").load("./sparksql/json"); DataFrame df2 = sqlContext.read().json("sparksql/json"); c).載入過來的DataFrame 列會按照Ascii碼排序 d).可以使用DataFrame的API操作DataFrame,也可以將DataFrame註冊成臨時表 df.registerTempTable("jtable"); 2).讀取json格式的RDD 3).讀取普通的RDD載入成DataFrame a).反射的方式(少) JavaRDD<Person> personRDD = lineRDD.map(new Function<String, Person>() { private static final long serialVersionUID = 1L; @Override public Person call(String line) throws Exception { Person p = new Person(); p.setId(line.split(",")[0]); p.setName(line.split(",")[1]); p.setAge(Integer.valueOf(line.split(",")[2])); return p; } }); DataFrame df = sqlContext.createDataFrame(personRDD, Person.class); i).自定類要實現序列化介面 ii).自定義類的訪問級別是public iii).載入過來的DataFrame列也會按照Ascii碼排序 b).動態建立Schema(多) List<StructField> asList =Arrays.asList( DataTypes.createStructField("id", DataTypes.StringType, true), DataTypes.createStructField("name", DataTypes.StringType, true), DataTypes.createStructField("age", DataTypes.IntegerType, true) ); StructType schema = DataTypes.createStructType(asList); DataFrame df = sqlContext.createDataFrame(rowRDD, schema); i).載入過來的DataFrame列不會按照Ascii碼排序 4).讀取parquent檔案載入成DataFrame 讀取: DataFrame load = sqlContext.read().format("parquet").load("./sparksql/parquet"); load = sqlContext.read().parquet("./sparksql/parquet"); 儲存: df.write().mode(SaveMode.Overwrite).format("parquet").save("./sparksql/parquet"); df.write().mode(SaveMode.Ignore).parquet("./sparksql/parquet"); 5).讀取Mysql中的資料載入成DataFrame 讀取: a). Map<String, String> options = new HashMap<String,String>(); options.put("url", "jdbc:mysql://192.168.179.4:3306/spark"); options.put("driver", "com.mysql.jdbc.Driver"); options.put("user", "root"); options.put("password", "123456"); options.put("dbtable", "person"); DataFrame person = sqlContext.read().format("jdbc").options(options).load(); b). DataFrameReader reader = sqlContext.read().format("jdbc"); reader.option("url", "jdbc:mysql://192.168.179.4:3306/spark"); reader.option("driver", "com.mysql.jdbc.Driver"); reader.option("user", "root"); reader.option("password", "123456"); reader.option("dbtable", "score"); DataFrame score = reader.load(); 儲存: Properties properties = new Properties(); properties.setProperty("user", "root"); properties.setProperty("password", "123456"); result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.179.4:3306/spark", "result", properties); 6).讀取Hive中的資料載入成DataFrame 要配置Spark on Hive,如果SparkSQL要讀取資料是Hive中資料,要使用HiveContext,HiveContext是SQLContext的子類。 讀取: HiveContext hiveContext = new HiveContext(sc); hiveContext.sql("USE spark"); DataFrame df = hiveContext.table("good_student_infos"); 儲存: hiveContext.sql("DROP TABLE IF EXISTS good_student_infos"); goodStudentsDF.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos");
配置spark on Hive:
5.配置Spark on Hive 1).在客戶端建立../conf/hive-site.xml <configuration> <property> <name>hive.metastore.uris</name> <value>thrift://node1:9083</value> </property> </configuration> 2).啟動Hive,在服務端啟動metaStore服務,hive --service metastore 3).spark-shell 測試
謝謝你的鼓勵,繼續加油。