1. 程式人生 > >spark 讀取各類資料來源

spark 讀取各類資料來源

本文章主要通過程式碼實現spark讀取各類資料來源

1 spark讀取hive資料

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
/**
 * Hive資料來源
 * @author Administrator
 *
 */
public class 
HiveDataSource { @SuppressWarnings("deprecation") public static void main(String[] args) { // 首先還是建立SparkConf SparkConf conf = new SparkConf() .setAppName("HiveDataSource"); // 建立JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 建立HiveContext,注意,這裡,它接收的是SparkContext作為引數,不是JavaSparkContext
HiveContext hiveContext = new HiveContext(sc.sc()); // 第一個功能,使用HiveContext的sql()方法,可以執行Hive中能夠執行的HiveQL語句 // 判斷是否存在student_infos表,如果存在則刪除 hiveContext.sql("DROP TABLE IF EXISTS student_infos"); // 判斷student_infos表是否不存在,如果不存在,則建立該表 hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING, age INT)"
); // 將學生基本資訊資料匯入student_infos表 hiveContext.sql("LOAD DATA " + "LOCAL INPATH '/usr/local/spark-study/resources/student_infos.txt' " + "INTO TABLE student_infos"); // 用同樣的方式給student_scores匯入資料 hiveContext.sql("DROP TABLE IF EXISTS student_scores"); hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT)"); hiveContext.sql("LOAD DATA " + "LOCAL INPATH '/usr/local/spark-study/resources/student_scores.txt' " + "INTO TABLE student_scores"); // 第二個功能,執行sql還可以返回DataFrame,用於查詢 // 執行sql查詢,關聯兩張表,查詢成績大於80分的學生 DataFrame goodStudentsDF = hiveContext.sql("SELECT si.name, si.age, ss.score " + "FROM student_infos si " + "JOIN student_scores ss ON si.name=ss.name " + "WHERE ss.score>=80"); // 第三個功能,可以將DataFrame中的資料,理論上來說,DataFrame對應的RDD的元素,是Row即可 // 將DataFrame中的資料儲存到hive表中 // 接著將DataFrame中的資料儲存到good_student_infos表中 hiveContext.sql("DROP TABLE IF EXISTS good_student_infos"); goodStudentsDF.saveAsTable("good_student_infos"); // 第四個功能,可以用table()方法,針對hive表,直接建立DataFrame // 然後針對good_student_infos表,直接建立DataFrame Row[] goodStudentRows = hiveContext.table("good_student_infos").collect(); for(Row goodStudentRow : goodStudentRows) { System.out.println(goodStudentRow); } sc.close(); } }

2 spark讀取jdbc資料來源

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
/**
 * JDBC資料來源
 * @author Administrator
 *
 */
public class JDBCDataSource {

   public static void main(String[] args) {
      SparkConf conf = new SparkConf()
            .setAppName("JDBCDataSource");  
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
// 總結一下
      // jdbc資料來源
      // 首先,是通過SQLContext的read系列方法,將mysql中的資料載入為DataFrame
      // 然後可以將DataFrame轉換為RDD,使用Spark Core提供的各種運算元進行操作
      // 最後可以將得到的資料結果,通過foreach()運算元,寫入mysql、hbase、redis等等db / cache中
      // 分別將mysql中兩張表的資料載入為DataFrame
Map<String, String> options = new HashMap<String, String>();
options.put("url", "jdbc:mysql://spark1:3306/testdb");
options.put("dbtable", "student_infos");
DataFrame studentInfosDF = sqlContext.read().format("jdbc")
            .options(options).load();
options.put("dbtable", "student_scores");
DataFrame studentScoresDF = sqlContext.read().format("jdbc")
            .options(options).load();
// 將兩個DataFrame轉換為JavaPairRDD,執行join操作
JavaPairRDD<String, Tuple2<Integer, Integer>> studentsRDD = 
            
            studentInfosDF.javaRDD().mapToPair(
            
                  new PairFunction<Row, String, Integer>() {
      
                     private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(Row row) throws Exception {
                        return new Tuple2<String, Integer>(row.getString(0), 
Integer.valueOf(String.valueOf(row.get(1))));  
}
                     
                  })
            .join(studentScoresDF.javaRDD().mapToPair(
                     
                  new PairFunction<Row, String, Integer>() {
      
                     private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(Row row) throws Exception {
                        return new Tuple2<String, Integer>(String.valueOf(row.get(0)),
Integer.valueOf(String.valueOf(row.get(1))));  
}
                     
                  }));
// 將JavaPairRDD轉換為JavaRDD<Row>
JavaRDD<Row> studentRowsRDD = studentsRDD.map(
            
            new Function<Tuple2<String,Tuple2<Integer,Integer>>, Row>() {

               private static final long serialVersionUID = 1L;
@Override
public Row call(
                     Tuple2<String, Tuple2<Integer, Integer>> tuple)
                     throws Exception {
                  return RowFactory.create(tuple._1, tuple._2._1, tuple._2._2);
}
               
            });
// 過濾出分數大於80分的資料
JavaRDD<Row> filteredStudentRowsRDD = studentRowsRDD.filter(
            
            new Function<Row, Boolean>() {

               private static final long serialVersionUID = 1L;
@Override
public Boolean call(Row row) throws Exception {
                  if(row.getInt(2) > 80) {
                     return true;
} 
                  return false;
}
               
            });
// 轉換為DataFrame
List<StructField> structFields = new ArrayList<StructField>();
structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));  
structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true)); 
structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true)); 
StructType structType = DataTypes.createStructType(structFields);
DataFrame studentsDF = sqlContext.createDataFrame(filteredStudentRowsRDD, structType);
Row[] rows = studentsDF.collect();
      for(Row row : rows) {
         System.out.println(row);  
}
      
      // 將DataFrame中的資料儲存到mysql表中
      // 這種方式是在企業裡很常用的,有可能是插入mysql、有可能是插入hbase,還有可能是插入redis快取
studentsDF.javaRDD().foreach(new VoidFunction<Row>() {
         
         private static final long serialVersionUID = 1L;
@Override
public void call(Row row) throws Exception {
            String sql = "insert into good_student_infos values(" 
+ "'" + String.valueOf(row.getString(0)) + "',"
+ Integer.valueOf(String.valueOf(row.get(1))) + ","
+ Integer.valueOf(String.valueOf(row.get(2))) + ")";   
Class.forName("com.mysql.jdbc.Driver");  
Connection conn = null;
Statement stmt = null;
            try {
               conn = DriverManager.getConnection(
                     "jdbc:mysql://spark1:3306/testdb", "", "");
stmt = conn.createStatement();
stmt.executeUpdate(sql);
} catch (Exception e) {
               e.printStackTrace();
} finally {
               if(stmt != null) {
                  stmt.close();
} 
               if(conn != null) {
                  conn.close();
}
            }
         }
         
      }); 
sc.close();
}
   
}

3 spark讀取json格式資料

import java.util.ArrayList;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
/**
 * JSON資料來源
 * @author Administrator
 *
 */
public class JSONDataSource {

   public static void main(String[] args) {
      SparkConf conf = new SparkConf()
            .setAppName("JSONDataSource");  
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
// 針對json檔案,建立DataFrame(針對json檔案建立DataFrame)
DataFrame studentScoresDF = sqlContext.read().json(
            "hdfs://spark1:9000/spark-study/students.json");  
// 針對學生成績資訊的DataFrame,註冊臨時表,查詢分數大於80分的學生的姓名
      // (註冊臨時表,針對臨時表執行sql語句)
studentScoresDF.registerTempTable("student_scores");
DataFrame goodStudentScoresDF = sqlContext.sql(
            "select name,score from student_scores where score>=80");
// (將DataFrame轉換為rdd,執行transformation操作)
List<String> goodStudentNames = goodStudentScoresDF.javaRDD().map(
            
            new Function<Row, String>() {
               
               private static final long serialVersionUID = 1L;
@Override
public String call(Row row) throws Exception {
                  return row.getString(0);
}
               
            }).collect();
// 然後針對JavaRDD<String>,建立DataFrame
      // (針對包含json串的JavaRDD,建立DataFrame)
List<String> studentInfoJSONs = new ArrayList<String>();
studentInfoJSONs.add("{\"name\":\"Leo\", \"age\":18}");  
studentInfoJSONs.add("{\"name\":\"Marry\", \"age\":17}");  
studentInfoJSONs.add("{\"name\":\"Jack\", \"age\":19}");
JavaRDD<String> studentInfoJSONsRDD = sc.parallelize(studentInfoJSONs);
DataFrame studentInfosDF = sqlContext.read().json(studentInfoJSONsRDD);
// 針對學生基本資訊DataFrame,註冊臨時表,然後查詢分數大於80分的學生的基本資訊
studentInfosDF.registerTempTable("student_infos");  
String sql = "select name,age from student_infos where name in (";
      for(int i = 0; i < goodStudentNames.size(); i++) {
         sql += "'" + goodStudentNames.get(i) + "'";
         if(i < goodStudentNames.size() - 1) {
            sql += ",";
}
      }
      sql += ")";
DataFrame goodStudentInfosDF = sqlContext.sql(sql);
// 然後將兩份資料的DataFrame,轉換為JavaPairRDD,執行join transformation
      // (將DataFrame轉換為JavaRDD,再map為JavaPairRDD,然後進行join)
JavaPairRDD<String, Tuple2<Integer, Integer>> goodStudentsRDD = 
            
            goodStudentScoresDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {

               private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(Row row) throws Exception {
                  return new Tuple2<String, Integer>(row.getString(0), 
Integer.valueOf(String.valueOf(row.getLong(1))));  
}
               
            }).join(goodStudentInfosDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
      
               private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(Row row) throws Exception {
                  return new Tuple2<String, Integer>(row.getString(0),
Integer.valueOf(String.valueOf(row.getLong(1))));   
}
               
            }));
// 然後將封裝在RDD中的好學生的全部資訊,轉換為一個JavaRDD<Row>的格式
      // (將JavaRDD,轉換為DataFrame)
JavaRDD<Row> goodStudentRowsRDD = goodStudentsRDD.map(
            
            new Function<Tuple2<String,Tuple2<Integer,Integer>>, Row>() {

               private static final long serialVersionUID = 1L;
@Override
public Row call(
                     Tuple2<String, Tuple2<Integer, Integer>> tuple)
                     throws Exception {
                  return RowFactory.create(tuple._1, tuple._2._1, tuple._2._2);
}
               
            });
// 建立一份元資料,將JavaRDD<Row>轉換為DataFrame
List<StructField> structFields = new ArrayList<StructField>();
structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); 
structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));  
structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));  
StructType structType = DataTypes.createStructType(structFields);
DataFrame goodStudentsDF = sqlContext.createDataFrame(goodStudentRowsRDD, structType);
// 將好學生的全部資訊儲存到一個json檔案中去
      // (將DataFrame中的資料儲存到外部的json檔案中去)
goodStudentsDF.write().format("json").save("hdfs://spark1:9000/spark-study/good-students");  
}
   
}

4 spark讀取parquet資料

import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
/**
 * Parquet資料來源之使用程式設計方式載入資料
 * @author Administrator
 *
 */
public class ParquetLoadData {

   public static void main(String[] args) {
      SparkConf conf = new SparkConf()
            .setAppName("ParquetLoadData");  
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
// 讀取Parquet檔案中的資料,建立一個DataFrame
DataFrame usersDF = sqlContext.read().parquet(
            "hdfs://spark1:9000/spark-study/users.parquet");
// 將DataFrame註冊為臨時表,然後使用SQL查詢需要的資料
usersDF.registerTempTable("users");  
DataFrame userNamesDF = sqlContext.sql("select name from users");  
// 對查詢出來的DataFrame進行transformation操作,處理資料,然後打印出來
List<String> userNames = userNamesDF.javaRDD().map(new Function<Row, String>() {

         private static final long serialVersionUID = 1L;
@Override
public String call(Row row) throws Exception {
            return "Name: " + row.getString(0);
}
         
      }).collect();
      for(String userName : userNames) {
         System.out.println(userName);  
}
   }
   
}

相關推薦

spark 讀取各類資料來源

本文章主要通過程式碼實現spark讀取各類資料來源1 spark讀取hive資料import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.a

spark scala-讀取各類資料來源

本文章主要通過scala實現spark讀取各類資料來源1 讀取hive資料/** * @author jhp * 使用spark讀取Hive資料 */ object HiveDataSource { def main(args: Array[S

[Spark][Streaming]Spark讀取網絡輸入的例子

trac pair keep exception clas zookeeper 包含 air blog Spark讀取網絡輸入的例子: 參考如下的URL進行試驗 https://stackoverflow.com/questions/46739081/how-to-ge

spark 讀取mongodb失敗,報executor time out 和GC overhead limit exceeded 異常

資源 base for read 就是 conn context mon getc 代碼: import com.mongodb.spark.config.ReadConfig import com.mongodb.spark.sql._ val config = sql

mongo-spark-讀取不同的庫資料和寫入不同的庫中

mongo-spark-讀取不同的庫資料和寫入不同的庫中 package com.example.app import com.mongodb.spark.config.{ReadConfig, WriteConfig} import com.mongodb.spark.sql._ object

spark讀取hive資料-java

需求:將hive中的資料讀取出來,寫入es中。 環境:spark 2.0.2 1. SparkSession裡設定enableHiveSupport() SparkConf conf = new SparkConf().setAppName("appName").setMast

[Spark基礎]-- Spark 內建資料來源 options 名稱

在 Spark-2.1.0 以後支援的 Options 如下: --------- JDBC’s options --------- user password url dbtable driver partitionColumn lowerBound upperBound

spark sql jdbc資料來源 多種輸出方式

package com.ws.jdbc import java.util.Properties import org.apache.spark.sql.{DataFrame, SparkSession} /** * spark sql jdbc資料來源 */ object JdbcD

spark sql parquet資料來源 (推薦)

package com.ws.jdbc import org.apache.spark.sql.{DataFrame, SparkSession} /** * 推薦使用 */ object ParquetSource { def main(args: Array[String

spark sql csv資料來源

package com.ws.jdbc import org.apache.spark.sql.{DataFrame, SparkSession} object CsvSource { def main(args: Array[String]): Unit = { val sp

spark sql json資料來源

package com.ws.jdbc import org.apache.spark.sql.{DataFrame, SparkSession} object JsonSource { def main(args: Array[String]): Unit = { val

spark讀取es資料

spark-2.0.2 scala-2.11.8 <!-- https://mvnrepository.com/artifact/org.webjars.npm/spark-md5 --> <dependency> <groupId>org.apa

0016-Avro序列化&反序列化和Spark讀取Avro資料

溫馨提示:要看高清無碼套圖,請使用手機開啟並單擊圖片放大檢視。 1.簡介 本篇文章主要講如何使用java生成Avro格式資料以及如何通過spark將Avro資料檔案轉換成DataSet和DataFrame進行操作。 1.1Apache Arvo是什麼? Apache Avro 是一個數據序列

0016-Avro序列化&反序列化和Spark讀取Avro數據

ron ace raft 轉換 import 系統 提示 文章 offset 溫馨提示:要看高清無碼套圖,請使用手機打開並單擊圖片放大查看。 1.簡介 本篇文章主要講如何使用java生成Avro格式數據以及如何通過spark將Avro數據文件轉換成DataSet和Data

spark讀取hbase(NewHadoopAPI 例子)

package cn.piesat.controllerimport java.text.{DecimalFormat, SimpleDateFormat}import java.utilimport java.util.concurrent.{CountDownLatch, Executors, Futur

Spark學習(陸)- Spark操作外部資料來源

文章目錄 產生背景 概念 目標 操作Parquet檔案資料 操作Hive表資料 操作MySQL表資料 操作MySQL的資料方法一: 操作MySQL的資料方法二: 操作MySQL

從原始碼看Spark讀取Hive表資料小檔案和分塊的問題

原文連結:https://mp.csdn.net/postedit/82423831  使用Spark進行資料分析和計算早已成趨勢,你是否關注過讀取一張Hive表時Task數為什麼是那麼多呢?它跟什麼有關係呢? 最近剛好碰到這個問題,而之前對此有些模糊,所以做了些整理,希望大家拍磚探討

spark 讀取orc檔案

<dependency> <groupId>org.apache.orc</groupId> <

spark 讀取 ftp

class FtpShow(spark: SparkSession, map: Map[String, String]) { private val path = map(FtpOptions.PATH).stripPrefix("./") private val username = ma

spark 讀取 hdfs 資料分割槽規則

下文以讀取 parquet 檔案 / parquet hive table 為例: hive metastore 和 parquet 轉化的方式通過 spark.sql.hive.convertMetastoreParquet 控制,預設為 true。 如果設定為 true ,會