spark 連線mysql資料庫 讀取、寫入資料
阿新 • • 發佈:2019-01-30
資料庫連線並獲取資料:
JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("sparkApp").setMaster("local[5]")); SQLContext sqlContext = new SQLContext(sparkContext); Properties Properties = new Properties(); Properties.put("user", "資料庫使用者名稱"); Properties.put("password", "資料庫密碼"); Properties.put("driver", "資料庫驅動"); DataFrame DF = sqlContext.read().jdbc("資料庫地址","表名",Properties).;
DF: 從資料庫中獲取的資料
資料庫寫入資料:
/** * 1、建立型別為Row的RDD */ JavaRDD<List<String>> logDateRdd = sparkContext.parallelize(logDate); JavaRDD<Row> RDD = logDateRdd.map(new Function<List<String>,Row>(){ @Override public Row call(List<String> logDate) throws Exception { return RowFactory.create( logDate.get(0), logDate.get(1) ); } }); /** * 2、動態構造DataFrame的元資料。 */ List structFields = new ArrayList(); structFields.add(DataTypes.createStructField("col1",DataTypes.StringType,false)); structFields.add(DataTypes.createStructField("col2",DataTypes.StringType,true)); //構建StructType,用於最後DataFrame元資料的描述 StructType structType = DataTypes.createStructType(structFields); /** * 3、基於已有的元資料以及RDD<Row>來構造DataFrame */ DataFrame DF = sqlContext.createDataFrame(RDD,structType); /** * 4、將資料寫入到e_trade_acct_data表中 */ DF.write().mode("append").jdbc("資料庫地址","表名","存有使用者名稱、密碼、驅動的Properties類");
sparkContext.parallelize(logDate): 將資料轉成RDD
structFields : 裡面的col1 、col2為資料庫欄位名,DateTypes 表示資料型別,資料型別要保持一致。false:表示不能為null .true表示可為null