1. 程式人生 > >SparkSQL中的UDF

SparkSQL中的UDF

一、UDF(User Defined Function):spark SQL中使用者自定義函式,用法和spark SQL中的內建函式類似;是saprk SQL中內建函式無法滿足要求,使用者根據業務需求自定義的函式。

二、UDF使用分為兩步:

(1)自定義UDF類,根據業務需要,實現UDF1/2/3....22中的介面之一,其中UDF後跟的數字,
比如UDF1、UDF2;表示輸入引數的個數,1表示有一個入參,2表示有兩個入參,
最多可傳入22個輸入引數。如:
public class 自定義UDF類名 implements UDF3<String,String,String,String>{
    @override
    public String call(String p1,String p2,String p3)throw Exception{}
} 
其中UDF3中前三個引數表示輸入引數的資料型別,最後的引數用於指定call方法的返回值型別
(2)註冊UDF函式:
//建立配置
SparkConf conf = new SparkConf()
         .setAppName("openwindowFunctionDemo");
//建立提交spark應用的叢集入口類物件
JavaSparkContext sc = new JavaSparkContext(conf);
//建立SQLContext物件
SQLContext sqlContext = new SQLContext(sc.sc());
//註冊getJsonData自定義函式
/**
* register方法三個引數中,第一個引數指定自定義函式名,
* 第二個引數傳入一個自定義函式物件;第三個引數指定返回值型別
*
*/
sqlContext.udf().register("getJsonData",new GetJsonDataUDF(), DataTypes.StringType);
(3)通過sqlContext.sql(sql語句)使用上述註冊的UDF函式

三、具體案例:

 /**
         * 假如spark sql註冊過一張臨時表user,其資料格式如下
         * id       userInfo
         * 001      {'username':'tom','age':'12','sex':'男'}
         * 002      {'username':'henry','age':'18','sex':'男'}
         * 003      {'username':'jack','age':'32','sex':'男'}
         * 及user表中userInfo儲存的資料格式為json格式,如果需要單獨獲取userInfo裡的username;
         * spark sql內建的函式無法滿足我們的需要,這時就可以通過自定函式來實現上述需求
         */
(1)自定義UDF類:
public class GetJsonDataUDF implements UDF2<String,String,String>{
    @Override
    public String call(String json, String field) throws Exception {
        //構建json物件
        JSONObject jsonObject = JSONObject.parseObject(json);
        //根據欄位名獲取其對應的value
        return jsonObject.getString(field);
    }

}
(2)註冊UDF函式並使用
  
public class Test {
    public static void main(String[] args) {
        //建立配置
        SparkConf conf = new SparkConf()
                .setAppName("openwindowFunctionDemo");

        //建立提交spark應用的叢集入口類物件
        JavaSparkContext sc = new JavaSparkContext(conf);
        //建立SQLContext物件
        SQLContext sqlContext = new SQLContext(sc.sc());
        //註冊udf函式
        sqlContext.udf().register("getJsonData",new GetJsonDataUDF(), DataTypes.StringType);

        //構建臨時資料
        List<String> userList = new ArrayList<String>();
        userList.add("001 {'username':'tom','age':'12','sex':'男'}");
        userList.add("002 {'username':'henry','age':'18','sex':'男'}");
        userList.add("003 {'username':'jack','age':'32','sex':'男'}");

        JavaRDD<String> rdd = sc.parallelize(userList);
        //將原始資料轉化為格式:<id,userInfo>的形式
       JavaRDD<Row> userRDD = rdd.map(
               new Function<String, Row>() {
                   @Override
                   public Row call(String s) throws Exception {
                       //切分資料
                       String[] datas = s.split(" ");
                       String id = datas[0];
                       String userInfo = datas[1];
                       return RowFactory.create(id,userInfo);
                   }
               }
       );
       //註冊臨時表
       List<StructField> fields = new ArrayList<StructField>();
       fields.add(DataTypes.createStructField("id",DataTypes.StringType,true));
       fields.add(DataTypes.createStructField("userInfo",DataTypes.StringType,true));
       StructType schema = DataTypes.createStructType(fields);
       DataFrame dataFrame = sqlContext.createDataFrame(userRDD, schema);

       dataFrame.registerTempTable("tmp_user");

        //定義sql
        String sql = "SELECT id,"
                +"getJsonData(userInfo,'username')"
                +"FROM tmp_user";
        //開始查詢
        DataFrame res = sqlContext.sql(sql);
        res.show();
        sc.stop();
    }
}


 

相關推薦

sparkSQLudf的使用

class from lock all function apache ol3 clas name 在Spark中使用sql時一些功能需要自定義方法實現,這時候就可以使用UDF功能來實現 多參數支持 UDF不支持參數*的方式輸入多個參數,例如String*,不過可以使用

SparkSQLUDF和UDAF

UDF: User Defined Function,使用者自定義的函式,函式的輸入是一條具體的資料記錄,實現上講就是普通的Scala函式; UDAF:User Defined Aggregation Function,使用者自定義的聚合函式,函式本身作用於資料集合,能夠在

SparkSQLUDF

一、UDF(User Defined Function):spark SQL中使用者自定義函式,用法和spark SQL中的內建函式類似;是saprk SQL中內建函式無法滿足要求,使用者根據業務需求自定義的函式。二、UDF使用分為兩步:(1)自定義UDF類,根據業務需要,實

sparkSQL無法找到Hive表apache.spark.sql.catalyst.analysis.NoSuchTableException:Table or view 'emp' not f

1.問題描述 使用Scala程式設計,通過SparkSession連線hive中的表,但是沒有找到,報錯: Exception in thread "main" org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseEx

SparkSQL的三種Join及其實現(broadcast join、shuffle hash join和sort merge join)

1.小表對大表(broadcast join) 將小表的資料分發到每個節點上,供大表使用。executor儲存小表的全部資料,一定程度上犧牲了空間,換取shuffle操作大量的耗時,這在SparkSQL中稱作Broadcast Join Broadcast Jo

SparkSQLDataFrame與RDD互操作之一:反射方式

一.引言      Spark SQL支援兩種不同的方法將現有RDD轉換為資料集。第一種方法使用反射來推斷包含特定型別物件的RDD的模式。這種基於反射的方法可以使程式碼更簡潔,並且在編寫Spark應用程式時已經瞭解了模式,因此可以很好地工作。詳細資料參考  Da

**sparksql DataFrame 的函式**

Action 操作 1、 collect() ,返回值是一個數組,返回dataframe集合所有的行 2、 collectAsList() 返回值是一個java型別的陣列,返回dataframe集合所有的行 3、 count() 返回一個number型別的,返回

SparkSQL的內建函式

    使用Spark SQL中的內建函式對資料進行分析,Spark SQL API不同的是,DataFrame中的內建函式操作的結果是返回一個Column物件,而DataFrame天生就是"A distributed collection of data organize

Hive和sparksql的dayofweek

dayofweek在hive2.2.0開始支援 低版本的sparksql和hive中可用以下方式實現 select 7- datediff(next_day('2018-03-12',"Sund

SparkSQL 實現UDF的兩種方式

import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext import org.apache.spark.sql.functions._ /** *

hiveUDF開發:解析json物件和解析json陣列物件

hive預設函式: +-------------------------------------------------------------------+ json +--------------

hiveUDF、UDAF和UDTF使用

Hive進行UDF開發十分簡單,此處所說UDF為Temporary的function,所以需要hive版本在0.4.0以上才可以。一、背景:Hive是基於Hadoop中的MapReduce,提供HQL查詢的資料倉庫。Hive是一個很開放的系統,很多內容都支援使用者定製,包

SparkSqljoin的實現( inner join,left outer join,right outer join,full outer join)

Join是SQL語句中的常用操作,良好的表結構能夠將資料分散在不同的表中,使其符合某種正規化,減少表冗餘、更新容錯等。而建立表和表之間關係的最佳方式就是Join操作。 SparkSQL作為大資料領域的SQL實現,自然也對Join操作做了不少優化,今天主要看一下在SparkS

十八、Hive UDF程式設計

 依據課程中講解的如何自定義UDF,進行案例編寫,進行總結步驟,並完成額外需求,具體說明如下:1) 依據課程講解UDF程式設計案例,完成練習,總結開發UDF步驟,程式碼貼圖, 給予註釋,重點在於

十二.SparkSQLjson資料檔案轉換成parquet檔案

第一步      首先在本地建立一個json檔案,名字叫json_schema_infer.json,檔案中資料的格式如下: {"name":"liguohui","gender":"M","height":160} {"name":"zhangsan","ge

HiveUDF和UDAF的使用

UDF使用者自定義函式(user defined function)–針對單條記錄。 建立函式流程 1、自定義一個Java類2、繼承UDF類 3、重寫evaluate方法 4、打成jar包 6、在hive執行add jar方法7、在hive執行建立模板函式 8、hql中使用D

hiveUDF和UDAF使用說明

Hive進行UDF開發十分簡單,此處所說UDF為Temporary的function,所以需要hive版本在0.4.0以上才可以。 一、背景:Hive是基於Hadoop中的MapReduce,提供HQL查詢的資料倉庫。Hive是一個很開放的系統,很多內容都支援使用者定製,包括

python實現Spark(Hive) SQLUDF的使用

相對於使用MapReduce或者Spark Application的方式進行資料分析,使用Hive SQL或Spark SQL能為我們省去不少的程式碼工作量,而Hive SQL或Spark SQL本身內建的各類UDF也為我們的資料處理提供了不少便利的工具,當這些內建的UDF不

SparkSQLUDF的使用

中,形如: export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/software/mysql-connector-java-5.1.27-bin.jar:/home/hadoop/lib/weekday.jar sql("create temporary

MySQL的system命令在滲透測試的使用以及UDF提權

style data erro 重新 pos empty linux類 directory 意思 一、MySQL中的system命令 在MySQL 5.x中增加了system命令,簡單的符號是\!,從而使MySQL可以執行系統的命令 1 mysql> system