1. 程式人生 > >Spark SQL 函式操作

Spark SQL 函式操作

Spark 內建函式

使用Spark SQL中的內建函式對資料進行分析,Spark SQL API不同的是,DataFrame中的內建函式操作的結果是返回一個Column物件,而DataFrame天生就是”A distributed collection of data organized into named columns.”,這就為資料的複雜分析建立了堅實的基礎並提供了極大的方便性,例如說,我們在操作DataFrame的方法中可以隨時呼叫內建函式進行業務需要的處理,這之於我們構建附件的業務邏輯而言是可以極大的減少不必須的時間消耗(基於上就是實際模型的對映),讓我們聚焦在資料分析上,這對於提高工程師的生產力而言是非常有價值的Spark 1.5.x開始提供了大量的內建函式,
還有max、mean、min、sum、avg、explode、size、sort_array、day、to_date、abs、acros、asin、atan
總體上而言內建函式包含了五大基本型別:

  • 聚合函式,例如countDistinct、sumDistinct等;
  • 集合函式,例如sort_array、explode等
  • 日期、時間函式,例如hour、quarter、next_day
  • 數學函式,例如asin、atan、sqrt、tan、round等;
  • 開窗函式,例如rowNumber等
  • 字串函式,concat、format_number、rexexp_extract
  • 其它函式,isNaN、sha、randn、callUDF

Hive 下的 單行單詞統計

select t.wd ,count(t.wd) as count from (select explode(split(line," ")) as wd from word) t group by t.wd;

在編寫程式程式碼的時候如果呼叫函式那麼需要注意的是要匯入functions

import org.apache.spark.sql.types.{DataTypes, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SQLContext, functions}

object SparkSQLFunctionOps {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SparkSQLFunctionOps").setMaster("local");
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val linesRDD = sc.textFile("E:/test/scala/sql-rdd-source.txt")
    val rowRDD = linesRDD.map(line => {
      val splits = line.split(",")
      Row(splits(0).trim.toInt,splits(1).trim,splits(2).trim.toInt,splits(3).trim.toInt)
    })
    val structType = StructType(Array(
      DataTypes.createStructField("id",DataTypes.IntegerType,true),
      DataTypes.createStructField("name",DataTypes.StringType,true),
      DataTypes.createStructField("age",DataTypes.IntegerType,true),
      DataTypes.createStructField("height",DataTypes.IntegerType,true)
    ))

    val df = sqlContext.createDataFrame(rowRDD,structType)
    df.registerTempTable("person")
    df.show()
    /**
      * 接下來對df中的資料進行查詢
      * 第一個查詢年齡的最大值,平均值
      * height的總身高
      * */
    sqlContext.sql("select avg(age) from person").show()
    sqlContext.sql("select max(age) from person").show()
    sqlContext.sql("select sum(height) from person").show()
    sc.stop()
  }
}

Java 版本

public class SparkSQLFunctionJava {
public static void main(String[] args) {
    SparkConf conf = new SparkConf( ).setAppName(SparkSQLFunctionJava.class.getSimpleName()).setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);
    SQLContext sqlContext = new SQLContext(sc);
   JavaRDD<String> linesRDD = sc.textFile("E:/test/scala/sql-rdd-source.txt");

    JavaRDD<Row> rowRDD = linesRDD.map(new Function<String, Row>() {
        @Override
        public Row call(String s) throws Exception {
            String splits[] = s.split(",");
            return  RowFactory.create(Integer.valueOf(splits[0].trim()),splits[1].trim(),Integer.valueOf(splits[2].trim()),Integer.valueOf(splits[3].trim()));
        }
    });
    StructField structFields[] = new StructField[4];
    structFields[0] = DataTypes.createStructField("id",DataTypes.IntegerType,true);
    structFields[1] = DataTypes.createStructField("name",DataTypes.StringType,true);
    structFields[2] = DataTypes.createStructField("age",DataTypes.IntegerType,true);
    structFields[3] = DataTypes.createStructField("height",DataTypes.IntegerType,true);
    StructType structType = new StructType(structFields);
    DataFrame dataFrame = sqlContext.createDataFrame(rowRDD, structType);
    dataFrame.show();
    dataFrame.registerTempTable("person");
    /**
     * 接下來對df中的資料進行查詢
     * 第一個查詢年齡的最大值,平均值
     * height的總身高
     * */
    sqlContext.sql("select avg(age) from person").show();
    sqlContext.sql("select max(age) from person").show();
    sqlContext.sql("select sum(height) from person").show();
    sc.close();
}
}

修改Spark執行日誌級別

cp log4j.properties.template log4j.properties
vim conf/log4j.properties 將INFO 修改為ERROR級別
需要重啟Spark叢集,使其生效

HIVE 設定列結構顯示

set hive.cli.print.header=true;

Spark SQL 開窗函式

1、Spark 1.5.x版本以後,在Spark SQL和DataFrame中引入了開窗函式,比如最經典的就是我們的row_number(),可以讓我們實現分組取topn的邏輯。

2、做一個案例進行topn的取值(利用Spark的開窗函式),不知道是否還有印象,我們之前在最早的時候,做過topn的計算,當時是非常麻煩的。但是現在用了Spark SQL之後,非常方便。

/**
  * Spark SQL 開窗函式之 row_number
  * */
object SparkSQLOpenWindowFunction {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SparkSQLOpenWindowFunction").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new HiveContext(sc)
    val linesRDD = sc.textFile("E:/test/scala/topn.txt")
    val rowRDD = linesRDD.map(line =>{
      val splits = line.split(" ")
      Row(splits(0).trim,splits(1).trim.toInt)
    })
    val structType = DataTypes.createStructType(Array(
      DataTypes.createStructField("class",DataTypes.StringType,true),
      DataTypes.createStructField("score",DataTypes.IntegerType,true)
    ))
    val df = sqlContext.createDataFrame(rowRDD,structType)
    df.registerTempTable("stu_score")
    /**
      * 查詢操作
      * 先按照class 進行分組,然後對每個class分組中的資料求出TOP3
      * */
     val topNDF = sqlContext.sql("select temp.* from (select *, row_number() over(partition by class order by score desc) rank from stu_score ) temp where temp.rank < 4")
     topNDF.show()
    sc.stop()
  }
}

Java版本

public class SparkSQLOpenWindowFunctionJava {
public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName(SparkSQLOpenWindowFunctionJava.class.getSimpleName()).setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);
    HiveContext sqlContext = new HiveContext(sc);
    JavaRDD<String> linesRDD = sc.textFile("E:/test/scala/topn.txt");

    JavaRDD<Row> rowRDD = linesRDD.map(new Function<String, Row>() {
        @Override
        public Row call(String s) throws Exception {
            return RowFactory.create(s.split(" ")[0].trim(),Integer.valueOf(s.split(" ")[1].trim()));
        }
    });
    StructField structFields[] = new StructField[2];
    structFields[0] = DataTypes.createStructField("class",DataTypes.StringType,true);
    structFields[1] = DataTypes.createStructField("score",DataTypes.IntegerType,true);
    StructType structType = new StructType(structFields);
    DataFrame df = sqlContext.createDataFrame(rowRDD, structType);
    df.registerTempTable("stu_score");
    /**
     * 查詢操作
     * 先按照class 進行分組,然後對每個class分組中的資料求出TOP3
     * */
    DataFrame dataFrame = null;
    dataFrame = sqlContext.sql("select temp.* from (select *, row_number() over(partition by class order by score desc) rank from stu_score ) temp where temp.rank < 4");
    dataFrame.show();
    sc.close();
}
}

在spark-sql 下執行,

  • 先在hive 中,建一張表,create table topn (class string,score int) row format delimited fields terminated by ’ ‘;
  • 然後把資料匯入表中, load data local inpath ‘/opt/data/spark/topn.txt’
  • 然後就可以利用開窗函式進行分組排序了,select temp.x from (select *,row_number() over(partition class order by score desc) rank from topn) temp where temp.rank < 4;

UDF自定義函式

1、UDF:User Defined Function。使用者自定義函式。
我們通常所說的UDF自定義函式,就是一對一的關係: 一個輸入引數和一個輸出引數
建立UDF的步驟:
1、先建立一個自定義的函式func
2、使用sqlContext.udf().register(“起個名字”, func _)
3、在我們的sql中直接使用就行了

object SparkSQLUDFOps {
def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SparkSQLUDFOps").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    val linesRDD = sc.textFile("E:/test/scala/sql-rdd-source.txt")
    val rowRDD = linesRDD.map(line => {
        val splits = line.split(", ")
        Row(splits(0).trim.toInt, splits(1).trim, splits(2).trim.toInt, splits(3).trim.toInt)
    })
    val structType = StructType(Array(
        DataTypes.createStructField("id", DataTypes.IntegerType, true),
        DataTypes.createStructField("name", DataTypes.StringType, true),
        DataTypes.createStructField("age", DataTypes.IntegerType, true),
        DataTypes.createStructField("height", DataTypes.IntegerType, true)
    ))
    val df = sqlContext.createDataFrame(rowRDD, structType)
    df.registerTempTable("person")
    //2、註冊自定義的UDF
    /**
      * 這是兩種註冊的方式
      */
    sqlContext.udf.register("myLen", myLen _)
    sqlContext.udf.register("len", (str:String, len:Int) => str.length > len)
    //3、使用起來
    sqlContext.sql("select id, name, myLen(name) as len from person where len(name, 5)").show()
    sc.stop()
}

//1、建立一個自定義的函式,用於求出字串的長度
def myLen(str: String) = str.length

}