spark 讀取各類資料來源
本文章主要通過程式碼實現spark讀取各類資料來源
1 spark讀取hive資料
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.hive.HiveContext; /** * Hive資料來源 * @author Administrator * */ public classHiveDataSource { @SuppressWarnings("deprecation") public static void main(String[] args) { // 首先還是建立SparkConf SparkConf conf = new SparkConf() .setAppName("HiveDataSource"); // 建立JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 建立HiveContext,注意,這裡,它接收的是SparkContext作為引數,不是JavaSparkContextHiveContext hiveContext = new HiveContext(sc.sc()); // 第一個功能,使用HiveContext的sql()方法,可以執行Hive中能夠執行的HiveQL語句 // 判斷是否存在student_infos表,如果存在則刪除 hiveContext.sql("DROP TABLE IF EXISTS student_infos"); // 判斷student_infos表是否不存在,如果不存在,則建立該表 hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING, age INT)"); // 將學生基本資訊資料匯入student_infos表 hiveContext.sql("LOAD DATA " + "LOCAL INPATH '/usr/local/spark-study/resources/student_infos.txt' " + "INTO TABLE student_infos"); // 用同樣的方式給student_scores匯入資料 hiveContext.sql("DROP TABLE IF EXISTS student_scores"); hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT)"); hiveContext.sql("LOAD DATA " + "LOCAL INPATH '/usr/local/spark-study/resources/student_scores.txt' " + "INTO TABLE student_scores"); // 第二個功能,執行sql還可以返回DataFrame,用於查詢 // 執行sql查詢,關聯兩張表,查詢成績大於80分的學生 DataFrame goodStudentsDF = hiveContext.sql("SELECT si.name, si.age, ss.score " + "FROM student_infos si " + "JOIN student_scores ss ON si.name=ss.name " + "WHERE ss.score>=80"); // 第三個功能,可以將DataFrame中的資料,理論上來說,DataFrame對應的RDD的元素,是Row即可 // 將DataFrame中的資料儲存到hive表中 // 接著將DataFrame中的資料儲存到good_student_infos表中 hiveContext.sql("DROP TABLE IF EXISTS good_student_infos"); goodStudentsDF.saveAsTable("good_student_infos"); // 第四個功能,可以用table()方法,針對hive表,直接建立DataFrame // 然後針對good_student_infos表,直接建立DataFrame Row[] goodStudentRows = hiveContext.table("good_student_infos").collect(); for(Row goodStudentRow : goodStudentRows) { System.out.println(goodStudentRow); } sc.close(); } }
2 spark讀取jdbc資料來源
import java.sql.Connection; import java.sql.DriverManager; import java.sql.Statement; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.Tuple2; /** * JDBC資料來源 * @author Administrator * */ public class JDBCDataSource { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("JDBCDataSource"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); // 總結一下 // jdbc資料來源 // 首先,是通過SQLContext的read系列方法,將mysql中的資料載入為DataFrame // 然後可以將DataFrame轉換為RDD,使用Spark Core提供的各種運算元進行操作 // 最後可以將得到的資料結果,通過foreach()運算元,寫入mysql、hbase、redis等等db / cache中 // 分別將mysql中兩張表的資料載入為DataFrame Map<String, String> options = new HashMap<String, String>(); options.put("url", "jdbc:mysql://spark1:3306/testdb"); options.put("dbtable", "student_infos"); DataFrame studentInfosDF = sqlContext.read().format("jdbc") .options(options).load(); options.put("dbtable", "student_scores"); DataFrame studentScoresDF = sqlContext.read().format("jdbc") .options(options).load(); // 將兩個DataFrame轉換為JavaPairRDD,執行join操作 JavaPairRDD<String, Tuple2<Integer, Integer>> studentsRDD = studentInfosDF.javaRDD().mapToPair( new PairFunction<Row, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(Row row) throws Exception { return new Tuple2<String, Integer>(row.getString(0), Integer.valueOf(String.valueOf(row.get(1)))); } }) .join(studentScoresDF.javaRDD().mapToPair( new PairFunction<Row, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(Row row) throws Exception { return new Tuple2<String, Integer>(String.valueOf(row.get(0)), Integer.valueOf(String.valueOf(row.get(1)))); } })); // 將JavaPairRDD轉換為JavaRDD<Row> JavaRDD<Row> studentRowsRDD = studentsRDD.map( new Function<Tuple2<String,Tuple2<Integer,Integer>>, Row>() { private static final long serialVersionUID = 1L; @Override public Row call( Tuple2<String, Tuple2<Integer, Integer>> tuple) throws Exception { return RowFactory.create(tuple._1, tuple._2._1, tuple._2._2); } }); // 過濾出分數大於80分的資料 JavaRDD<Row> filteredStudentRowsRDD = studentRowsRDD.filter( new Function<Row, Boolean>() { private static final long serialVersionUID = 1L; @Override public Boolean call(Row row) throws Exception { if(row.getInt(2) > 80) { return true; } return false; } }); // 轉換為DataFrame List<StructField> structFields = new ArrayList<StructField>(); structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true)); structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true)); StructType structType = DataTypes.createStructType(structFields); DataFrame studentsDF = sqlContext.createDataFrame(filteredStudentRowsRDD, structType); Row[] rows = studentsDF.collect(); for(Row row : rows) { System.out.println(row); } // 將DataFrame中的資料儲存到mysql表中 // 這種方式是在企業裡很常用的,有可能是插入mysql、有可能是插入hbase,還有可能是插入redis快取 studentsDF.javaRDD().foreach(new VoidFunction<Row>() { private static final long serialVersionUID = 1L; @Override public void call(Row row) throws Exception { String sql = "insert into good_student_infos values(" + "'" + String.valueOf(row.getString(0)) + "'," + Integer.valueOf(String.valueOf(row.get(1))) + "," + Integer.valueOf(String.valueOf(row.get(2))) + ")"; Class.forName("com.mysql.jdbc.Driver"); Connection conn = null; Statement stmt = null; try { conn = DriverManager.getConnection( "jdbc:mysql://spark1:3306/testdb", "", ""); stmt = conn.createStatement(); stmt.executeUpdate(sql); } catch (Exception e) { e.printStackTrace(); } finally { if(stmt != null) { stmt.close(); } if(conn != null) { conn.close(); } } } }); sc.close(); } }
3 spark讀取json格式資料
import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.Tuple2; /** * JSON資料來源 * @author Administrator * */ public class JSONDataSource { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("JSONDataSource"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); // 針對json檔案,建立DataFrame(針對json檔案建立DataFrame) DataFrame studentScoresDF = sqlContext.read().json( "hdfs://spark1:9000/spark-study/students.json"); // 針對學生成績資訊的DataFrame,註冊臨時表,查詢分數大於80分的學生的姓名 // (註冊臨時表,針對臨時表執行sql語句) studentScoresDF.registerTempTable("student_scores"); DataFrame goodStudentScoresDF = sqlContext.sql( "select name,score from student_scores where score>=80"); // (將DataFrame轉換為rdd,執行transformation操作) List<String> goodStudentNames = goodStudentScoresDF.javaRDD().map( new Function<Row, String>() { private static final long serialVersionUID = 1L; @Override public String call(Row row) throws Exception { return row.getString(0); } }).collect(); // 然後針對JavaRDD<String>,建立DataFrame // (針對包含json串的JavaRDD,建立DataFrame) List<String> studentInfoJSONs = new ArrayList<String>(); studentInfoJSONs.add("{\"name\":\"Leo\", \"age\":18}"); studentInfoJSONs.add("{\"name\":\"Marry\", \"age\":17}"); studentInfoJSONs.add("{\"name\":\"Jack\", \"age\":19}"); JavaRDD<String> studentInfoJSONsRDD = sc.parallelize(studentInfoJSONs); DataFrame studentInfosDF = sqlContext.read().json(studentInfoJSONsRDD); // 針對學生基本資訊DataFrame,註冊臨時表,然後查詢分數大於80分的學生的基本資訊 studentInfosDF.registerTempTable("student_infos"); String sql = "select name,age from student_infos where name in ("; for(int i = 0; i < goodStudentNames.size(); i++) { sql += "'" + goodStudentNames.get(i) + "'"; if(i < goodStudentNames.size() - 1) { sql += ","; } } sql += ")"; DataFrame goodStudentInfosDF = sqlContext.sql(sql); // 然後將兩份資料的DataFrame,轉換為JavaPairRDD,執行join transformation // (將DataFrame轉換為JavaRDD,再map為JavaPairRDD,然後進行join) JavaPairRDD<String, Tuple2<Integer, Integer>> goodStudentsRDD = goodStudentScoresDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(Row row) throws Exception { return new Tuple2<String, Integer>(row.getString(0), Integer.valueOf(String.valueOf(row.getLong(1)))); } }).join(goodStudentInfosDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(Row row) throws Exception { return new Tuple2<String, Integer>(row.getString(0), Integer.valueOf(String.valueOf(row.getLong(1)))); } })); // 然後將封裝在RDD中的好學生的全部資訊,轉換為一個JavaRDD<Row>的格式 // (將JavaRDD,轉換為DataFrame) JavaRDD<Row> goodStudentRowsRDD = goodStudentsRDD.map( new Function<Tuple2<String,Tuple2<Integer,Integer>>, Row>() { private static final long serialVersionUID = 1L; @Override public Row call( Tuple2<String, Tuple2<Integer, Integer>> tuple) throws Exception { return RowFactory.create(tuple._1, tuple._2._1, tuple._2._2); } }); // 建立一份元資料,將JavaRDD<Row>轉換為DataFrame List<StructField> structFields = new ArrayList<StructField>(); structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true)); structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true)); StructType structType = DataTypes.createStructType(structFields); DataFrame goodStudentsDF = sqlContext.createDataFrame(goodStudentRowsRDD, structType); // 將好學生的全部資訊儲存到一個json檔案中去 // (將DataFrame中的資料儲存到外部的json檔案中去) goodStudentsDF.write().format("json").save("hdfs://spark1:9000/spark-study/good-students"); } }
4 spark讀取parquet資料
import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; /** * Parquet資料來源之使用程式設計方式載入資料 * @author Administrator * */ public class ParquetLoadData { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("ParquetLoadData"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); // 讀取Parquet檔案中的資料,建立一個DataFrame DataFrame usersDF = sqlContext.read().parquet( "hdfs://spark1:9000/spark-study/users.parquet"); // 將DataFrame註冊為臨時表,然後使用SQL查詢需要的資料 usersDF.registerTempTable("users"); DataFrame userNamesDF = sqlContext.sql("select name from users"); // 對查詢出來的DataFrame進行transformation操作,處理資料,然後打印出來 List<String> userNames = userNamesDF.javaRDD().map(new Function<Row, String>() { private static final long serialVersionUID = 1L; @Override public String call(Row row) throws Exception { return "Name: " + row.getString(0); } }).collect(); for(String userName : userNames) { System.out.println(userName); } } }
相關推薦
spark 讀取各類資料來源
本文章主要通過程式碼實現spark讀取各類資料來源1 spark讀取hive資料import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.a
spark scala-讀取各類資料來源
本文章主要通過scala實現spark讀取各類資料來源1 讀取hive資料/** * @author jhp * 使用spark讀取Hive資料 */ object HiveDataSource { def main(args: Array[S
[Spark][Streaming]Spark讀取網絡輸入的例子
trac pair keep exception clas zookeeper 包含 air blog Spark讀取網絡輸入的例子: 參考如下的URL進行試驗 https://stackoverflow.com/questions/46739081/how-to-ge
spark 讀取mongodb失敗,報executor time out 和GC overhead limit exceeded 異常
資源 base for read 就是 conn context mon getc 代碼: import com.mongodb.spark.config.ReadConfig import com.mongodb.spark.sql._ val config = sql
mongo-spark-讀取不同的庫資料和寫入不同的庫中
mongo-spark-讀取不同的庫資料和寫入不同的庫中 package com.example.app import com.mongodb.spark.config.{ReadConfig, WriteConfig} import com.mongodb.spark.sql._ object
spark讀取hive資料-java
需求:將hive中的資料讀取出來,寫入es中。 環境:spark 2.0.2 1. SparkSession裡設定enableHiveSupport() SparkConf conf = new SparkConf().setAppName("appName").setMast
[Spark基礎]-- Spark 內建資料來源 options 名稱
在 Spark-2.1.0 以後支援的 Options 如下: --------- JDBC’s options --------- user password url dbtable driver partitionColumn lowerBound upperBound
spark sql jdbc資料來源 多種輸出方式
package com.ws.jdbc import java.util.Properties import org.apache.spark.sql.{DataFrame, SparkSession} /** * spark sql jdbc資料來源 */ object JdbcD
spark sql parquet資料來源 (推薦)
package com.ws.jdbc import org.apache.spark.sql.{DataFrame, SparkSession} /** * 推薦使用 */ object ParquetSource { def main(args: Array[String
spark sql csv資料來源
package com.ws.jdbc import org.apache.spark.sql.{DataFrame, SparkSession} object CsvSource { def main(args: Array[String]): Unit = { val sp
spark sql json資料來源
package com.ws.jdbc import org.apache.spark.sql.{DataFrame, SparkSession} object JsonSource { def main(args: Array[String]): Unit = { val
spark讀取es資料
spark-2.0.2 scala-2.11.8 <!-- https://mvnrepository.com/artifact/org.webjars.npm/spark-md5 --> <dependency> <groupId>org.apa
0016-Avro序列化&反序列化和Spark讀取Avro資料
溫馨提示:要看高清無碼套圖,請使用手機開啟並單擊圖片放大檢視。 1.簡介 本篇文章主要講如何使用java生成Avro格式資料以及如何通過spark將Avro資料檔案轉換成DataSet和DataFrame進行操作。 1.1Apache Arvo是什麼? Apache Avro 是一個數據序列
0016-Avro序列化&反序列化和Spark讀取Avro數據
ron ace raft 轉換 import 系統 提示 文章 offset 溫馨提示:要看高清無碼套圖,請使用手機打開並單擊圖片放大查看。 1.簡介 本篇文章主要講如何使用java生成Avro格式數據以及如何通過spark將Avro數據文件轉換成DataSet和Data
spark讀取hbase(NewHadoopAPI 例子)
package cn.piesat.controllerimport java.text.{DecimalFormat, SimpleDateFormat}import java.utilimport java.util.concurrent.{CountDownLatch, Executors, Futur
Spark學習(陸)- Spark操作外部資料來源
文章目錄 產生背景 概念 目標 操作Parquet檔案資料 操作Hive表資料 操作MySQL表資料 操作MySQL的資料方法一: 操作MySQL的資料方法二: 操作MySQL
從原始碼看Spark讀取Hive表資料小檔案和分塊的問題
原文連結:https://mp.csdn.net/postedit/82423831 使用Spark進行資料分析和計算早已成趨勢,你是否關注過讀取一張Hive表時Task數為什麼是那麼多呢?它跟什麼有關係呢? 最近剛好碰到這個問題,而之前對此有些模糊,所以做了些整理,希望大家拍磚探討
spark 讀取orc檔案
<dependency> <groupId>org.apache.orc</groupId> <
spark 讀取 ftp
class FtpShow(spark: SparkSession, map: Map[String, String]) { private val path = map(FtpOptions.PATH).stripPrefix("./") private val username = ma
spark 讀取 hdfs 資料分割槽規則
下文以讀取 parquet 檔案 / parquet hive table 為例: hive metastore 和 parquet 轉化的方式通過 spark.sql.hive.convertMetastoreParquet 控制,預設為 true。 如果設定為 true ,會