[2.6]Spark SQL 操作各種資料來源筆記
阿新 • • 發佈:2019-01-30
參考
spark sql操作各種資料來源的資料流轉 :
各種資料來源的輸入 => RDD(lines) =>RDD(Rows) => DataFrame(註冊臨時表) => 分析與過濾(各種sql操作、機器學習等)=> RDD(Row) => 各種格式的輸出
場景
Spark sql怎麼操作各種資料來源: json格式的檔案、hive、mysql、hbase等
分析
- 以spark sql內建函式的agg的操作為例,解讀 sql 資料流轉
程式碼
package main.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.functions._
/**
* 使用spark sql 內建函式對資料進行分析
* 內建函式返回一個列Column
* 分類:
* 1、聚合函式 2、集合函式 3、日期、時間函式 4、數學函式 5、開窗函式 6、字串處理函式 7、其他
*/
object sqlagg {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("sparkinnerfunctions")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val sqlContext = new SQLContext(sc)
/*
* 1. 獲取資料來源 - RDD(line)
*/
val userData = Array(
"2016-5-29,001,http://spark.apache.org/,1000",
"2016-5-20,001,http://spark.apache.org/,1090",
"2016-5-20,001,http://spark.apache.org/,1060",
"2016-5-30,002,http://spark.apache.org/,1000",
"2016-5-30,003,http://spark.apache.org/,1000",
"2016-5-10,003,http://spark.apache.org/,1020",
"2016-5-10,003,http://spark.apache.org/,1020",
"2016-5-10,003,http://spark.apache.org/,1000"
)
var dataRDD = sc.parallelize(userData)
/*
* 2. 轉換成 RDD(Row)
*/
val rowRDD = dataRDD.map(line => {
val splited = line.split(",")
Row(splited(0),splited(1),splited(2),splited(3))
})
/*
* 3. 指定Row的資料結構 並生成 DataFrame
*/
val structTypes = StructType( Array(
StructField("time",StringType,true),
StructField("userid",StringType,true),
StructField("url",StringType,true),
StructField("amount",StringType,true)
) )
val userDataDF = sqlContext.createDataFrame(rowRDD, structTypes)
/*
* 4 . 使用Spark SQL提供的內建函式對DataFrame進行操作(需要匯入相關隱式轉換資訊)
* :內建函式生成column物件
*/
import sqlContext.implicits._
//按日期分類,然後進行聚合操作: 去重 userid, 計算每天的銷售總量
userDataDF.groupBy("time").agg('time, countDistinct('userid)).map(row=>Row(row(1),row(2))).collect().foreach(println)
userDataDF.groupBy("time").agg('time, sum('amount)).show()
}
}
執行結果
[2016-5-10,1]
[2016-5-20,1]
[2016-5-29,1]
[2016-5-30,2]
+---------+---------+-----------+
| time| time|sum(amount)|
+---------+---------+-----------+
|2016-5-10|2016-5-10| 3040.0|
|2016-5-20|2016-5-20| 2150.0|
|2016-5-29|2016-5-29| 1000.0|
|2016-5-30|2016-5-30| 2000.0|
+---------+---------+-----------+
- 一. json
程式碼
package main.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
/**
- spark sql操作 本地json檔案
*/
object DataFrameOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("DataFram Ops")
val sqlContext = new SQLContext(new SparkContext(conf))
val df = sqlContext.read.json("file:///home/hadoop/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json")
df.show
df.printSchema
df.select("name").show
df.select("name", "age").show
df.select(df("name"),df("age")+10).show
df.filter(df("age")>10).show
}
}
執行結果
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
+----+
|name|
+----+
|Andy|
+----+
+----+---+
|name|age|
+----+---+
|Andy| 30|
+----+---+
+----+----------+
|name|(age + 10)|
+----+----------+
|Andy| 40|
+----+----------+
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
二. Hive
- 程式碼
package cool.pengych.spark.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.hive.HiveContext;
/**
* SparkSQL OPS on Hive
* @author pengyucheng
*
*/
public class SparkSQL2Hive
{
public static void main(String[] args)
{
SparkConf config = new SparkConf().setAppName("SparkSQL2Hive");
SparkContext sc = new SparkContext(config);
HiveContext hiveContext = new HiveContext(sc);
hiveContext.sql("use hive");
hiveContext.sql("DROP TABLE IF EXISTS people");
hiveContext.sql("CREATE TABLE IF NOT EXISTS people(name STRING,age INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'");
/*
* (把本地資料載入到Hive資料倉庫中(背後實際上發生了資料的拷貝),
* 當然也可以通過LOAD DATA INPATH去獲取HDFS等上的資料到Hive(此時發生了資料的移動)
*/
hiveContext.sql("LOAD DATA LOCAL INPATH '/home/pengyucheng/resource/people.txt' INTO TABLE people ");
hiveContext.sql("DROP TABLE IF EXISTS peoplescores");
hiveContext.sql("CREATE TABLE IF NOT EXISTS peoplescores(name STRING,score INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'");
hiveContext.sql("LOAD DATA LOCAL INPATH '/home/pengyucheng/resource/peoplescores.txt' INTO TABLE peoplescores");
/*
* 通過HiveContext使用join直接基於Hive中的兩張表進行操作
*/
DataFrame resultDF = hiveContext.sql("SELECT pi.name,pi.age,ps.score FROM people pi JOIN peoplescores ps ON pi.name = ps.name WHERE ps.score > 75");
/*
* 通過saveAsTable建立一張 Hive Managerd Table,資料的元資料和資料的具體位置都是由Hive資料倉庫
* 進行管理的,當刪除該表的時候,資料也會一起被刪除(磁碟上的資料不再存在)
*/
hiveContext.sql("DROP TABLE IF EXISTS peopleinformationresult");
resultDF.saveAsTable("peopleinformationresult");
/*
* 使用HiveContext的table方法可以直接讀取Hive資料倉庫中的Table並生成DataFrame,接下來就可以進行機器學習、圖計算
* 等各種複雜ETL操作
*/
DataFrame dataFromHive = hiveContext.table("peopleinformationresult");
dataFromHive.show();
}
}
- SparkSQL JDBC 2 Mysql
package cool.pengych.spark.sql;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
/**
* SparkSQL 通過 JDBC 操作 MySQL資料庫
* @author pengyucheng
*
*/
public class SparkSQLJDBC2MySQL
{
public static void main(String[] args)
{
SparkConf config = new SparkConf().setMaster("local[*]").setAppName("SparkSQLJDBC2MySQL");
SparkContext sc = new SparkContext(config);
sc.addJar("/home/hadoop/spark-1.6.0-bin-hadoop2.6/lib/mysql-connector-java-5.1.39-bin.jar");
SQLContext sqlContext = new SQLContext(sc);
/*
* 1、連線資料庫:通過format(“jdbc”)的方式說明SparkSQL操作的資料來源是通過JDBC獲得,JDBC後端
* 一般都是資料庫,eg、MySQL; 傳遞相關DB連結資訊
*/
DataFrameReader reader = sqlContext.read().format("jdbc");
reader.option("url", "jdbc:mysql://112.74.21.122:3306/hive");
reader.option("driver", "com.mysql.jdbc.Driver");
reader.option("dbtable", "spark");
reader.option("user", "hive");
reader.option("password", "hive");
/*
* 2、載入相關資料
*/
DataFrame sparkDF = reader.load();
reader.option("dbtable", "hadoop");
DataFrame hadoopDF = reader.load();
/*
* 3、用Spark core組織待處理的資料:這裡以進行join操作(DataFrame轉換成 RDD後進行)為例
*/
JavaPairRDD<String, Tuple2<Integer, Integer>> resultRDD = sparkDF.javaRDD().mapToPair(new PairFunction<Row, String,Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String,Integer> call(Row row) throws Exception {
return new Tuple2<String, Integer>(String.valueOf(row.getAs("name")), (Integer) row.getAs("age"));
}
}).join(hadoopDF.javaRDD().mapToPair(new PairFunction<Row,String,Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String,Integer> call(Row row) throws Exception {
return new Tuple2<String, Integer>(String.valueOf(row.getAs("name")), (Integer)row.getAs("score"));
}
}));
/*
* 4、將組織好的資料交給 DataFrame 做業務處理 - 可以利用 Spark SQL 、Core、ML等進行復雜的操作!!!
*/
// 獲取 JavaRDD<Row>
JavaRDD<Row> resultRowRDD = resultRDD.map(new Function<Tuple2<String,Tuple2<Integer,Integer>>, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Row call(Tuple2<String, Tuple2<Integer, Integer>> tuple) throws Exception {
return RowFactory.create(tuple._1,tuple._2._1,tuple._2._2);
}
});
//構建StructType,用於最後DataFrame元資料的描述
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
fields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));
StructType type = DataTypes.createStructType(fields);
DataFrame personsDF = sqlContext.createDataFrame(resultRowRDD, type);
// 具體業務處理 - 這裡只是簡單的 show 一下
System.out.println("========================業務處理開始:ML,圖計算等工具處理=================");
System.out.println("==== start showing ====");
personsDF.show();
System.out.println("========================業務處理結束:ML,圖計算等工具處理=================");
/*
* 5、儲存處理後的資料:可以存放到hive,db等資料倉庫中
*/
personsDF.javaRDD().foreachPartition(new VoidFunction<Iterator<Row>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Iterator<Row> iterator) throws SQLException
{
Connection conn = null;
StringBuilder sql = new StringBuilder("INSERT INTO dfperson VALUES ( ");
while(iterator.hasNext())
{
Row row = iterator.next();
sql.append(String.valueOf(row.getAs("name"))).append(",").append(row.getInt(1)).append(",").append(row.getInt(2));
}
sql.append(")");
try
{
conn = DriverManager.getConnection("jdbc:mysql://112.74.21.122:3306/hive", "hive", "hive");
boolean flag = conn.createStatement().execute(sql.toString());
}
catch (SQLException e)
{
e.printStackTrace();
}
finally
{
if(null != conn) conn.close();
}
}
});
}
}
- SparkSQL JDBC 2 ThriftServer
package cool.pengych.spark.sql;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
/**
* @author pengyucheng
* 通過JDBC訪問Thrift Server,進而訪問Spark SQL,進而訪問Hive
*
*/
public class SparkSQLJDBC2ThriftServer
{
public static void main(String[] args) throws ClassNotFoundException, SQLException
{
String sql = "select name from people wher age = ? ";
Class.forName("org.apache.hive.jdbc.HiveDriver");
Connection conn = DriverManager.getConnection("jdbc:hive2://localhost:10000/default?hive.server2.transport.mode=http;hive.server2.thrift.http.path=cliservice",
"root","");
conn.prepareStatement(sql);
PreparedStatement preparedStatement = conn.prepareStatement(sql);
preparedStatement.setInt(0, 27);
ResultSet rs = preparedStatement.executeQuery();
while(rs.next())
{
System.out.println(rs.getString(1));
}
conn.close();
}
}
- SparkSQL 2 Parquet
package cool.pengych.spark.sql;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
/**
* Spark SQL操作 Parquet 格式的檔案內容
* @author pengyucheng
*
*/
public class SparkSQLParquet
{
public static void main(String[] args) {
/*
* 建立SQLContext
*/
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLParquet");
JavaSparkContext jsc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(jsc);
/*
* 註冊成為臨時表以供後續SQL查詢操作
*/
DataFrame df = sqlContext.read().parquet("file:///home/hadoop/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/users.parquet");
df.registerTempTable("users");
/*
* 進行資料的多維度分析
*/
DataFrame result = sqlContext.sql("select name from users");
JavaRDD<String> strs = result.javaRDD().map(new Function<Row, String>() {
@Override
public String call(Row row) throws Exception {
return "The name is :"+row.getAs("name");
}
});
/*
* 對結果進行處理
*/
List<String> listRow = strs.collect();
for (String row : listRow) {
System.out.println(row);
}
}
}
總結
用蓬勃的生命力戰勝人性的悲劇性!package cool.pengych.spark.sql;