1. 程式人生 > >spark--DataFrame處理udf操作和一些聚合操作

spark--DataFrame處理udf操作和一些聚合操作

在spark中對資料進行處理udf函式還是很多的下面介紹在df下udf操作例項

  val sqlContext = new SQLContext(sc)

  import sqlContext.implicits._

呼叫sqlcontext裡面的udf函式
  sqlContext.udf.register("str",(_:String).length)
  sqlContext.sql("select str('test')")

這個就是對test這個String計算它的長度

下面程式碼是先自己構建一個DF

在裡面取出大於98的列值

  sqlContext.udf.register("rd",(n:Int)=>{n>98})
  case class TestData(key:Int,Value:String)
  val df4=sqlContext.sparkContext.parallelize(1 to 100).map(i=>TestData(i,i.toString)).toDF()
  df4.registerTempTable("integerData")
sql裡面where操作呼叫UDF
  val result=sqlContext.sql("select * from integerData  where rd(key)")

sql裡面group操作  對列值大於10的進行sum操作
<pre name="code" class="java">  sqlContext.udf.register("groupFunction", (n: Int) => { n > 10 })

  val df = Seq(("red", 1), ("red", 2), ("blue", 10),
    ("green", 100), ("green", 200)).toDF("g", "v")
  df.registerTempTable("groupData")

  val result =
    sqlContext.sql(
      """
        | SELECT SUM(v)
        | FROM groupData
        | GROUP BY groupFunction(v)
      """.stripMargin)

下面介紹一下聚合操作

建立一個DF

 val testData2: DataFrame = {
    val df = sqlContext.sparkContext.parallelize(
      TestData2(1, 1) ::
        TestData2(1, 2) ::
        TestData2(2, 1) ::
        TestData2(2, 2) ::
        TestData2(3, 1) ::
        TestData2(3, 2) :: Nil, 2).toDF()
    df.registerTempTable("testData2")
    df
  }
<pre name="code" class="java">//統計
  testData2.groupBy("a").agg(sum($"b"))
  testData2.groupBy("a").agg(count("*"))


  testData2.groupBy("a").agg(avg($"a"))
  testData2.groupBy("a").agg(avg($"a"),sumDistinct('a))


其實還有UDAF操作以後補上