spark scala-實現udf函式
本文章主要通過spark實現udf自定義函式
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType /*** @author jhp * spark實現udf功能 */ object UDF { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") .setAppName("UDF") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) // 構造模擬資料 val names = Array("Leo", "Marry", "Jack", "Tom") val namesRDD = sc.parallelize(names, 5) val namesRowRDD = namesRDD.map { name => Row(name) } val structType = StructType(Array(StructField("name", StringType, true))) val namesDF = sqlContext.createDataFrame(namesRowRDD, structType) // 註冊一張names表 namesDF.registerTempTable("names") // 定義和註冊自定義函式 // 定義函式:自己寫匿名函式 // 註冊函式:SQLContext.udf.register() sqlContext.udf.register("strLen", (str: String) => str.length()) // 使用自定義函式 sqlContext.sql("select name,strLen(name) from names") .collect() .foreach(println) } }
相關推薦
spark scala-實現udf函式
本文章主要通過spark實現udf自定義函式import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import
spark-sql使用UDF函式實現ip對映省份,資料寫出到mysql引數設定。
spark-SQL使用廣播變數以及應用資料庫的UDF自定義函式的查詢會比兩張表的連線更加的優化的程式的執行。 兩表連線是比較費效率的。 spar-sql 2.x的資料讀取,處理,新增schema資訊,常見表,SQL查詢。 將sql結果輸出到mysql的api引數設定。 還
scala實現Hash函式
在做spark graphx計算時,圖的邊表初始化,通常需要轉化為 srcId,dstId,property 的形式,其中srcId,dstId 需要轉化為數字以壓縮資料,提高計算效率。即,在分散式spark程式中將字串轉化為正整數的問題。要想實現該功能,有兩種方法,首先
Spark/Scala實現推薦系統中的相似度演算法(歐幾里得距離、皮爾遜相關係數、餘弦相似度:附實現程式碼)
在推薦系統中,協同過濾演算法是應用較多的,具體又主要劃分為基於使用者和基於物品的協同過濾演算法,核心點就是基於"一個人"或"一件物品",根據這個人或物品所具有的屬性,比如對於人就是性別、年齡、工作、收入、喜好等,找出與這個人或物品相似的人或物,當然實際處理中參考的因子會複雜的多。 本篇文章不介紹相關數學概念,
Spark使用UDF函式之WordCount實現
使用者定義函式(User-defined functions, UDFs)是大多數 SQL 環境的關鍵特性,用於擴充套件系統的內建功能。 UDF允許開發人員通過抽象其低階語言實現來在更高階語言(如SQL)中啟用新功能。 Apache Spark 也不例外,並且提
Spark SQL用UDF實現按列特徵重分割槽
歡迎關注,浪尖公眾號,bigdatatip,建議置頂。 這兩天,球友又問了我一個比較有意思的問題: 解決問題之前,要先了解一下Spark 原理,要想進行相同資料歸類到相同分割槽,肯定要有產生shuffle步驟。 比如,F到G這個shuffle過程,那麼如何決定資料
Spark Streaming狀態管理函式(二)——updateStateByKey的使用(scala版)
updateStateByKey的使用 關於updateStateByKey 注意事項 示例程式碼 執行 結論 關於updateStateByKey 1.重點:首先會以DStream中的資料進行按key做reduce操作,然
Spark Streaming狀態管理函式(三)——MapWithState的使用(scala版)
MapWithState 關於mapWithState 注意事項 示例程式碼 執行 結論 關於mapWithState 需要自己寫一個匿名函式func來實現自己想要的功能。如果有初始化的值得需要,可以使用initia
Spark中RDD轉換成DataFrame的兩種方式(分別用Java和scala實現)
一:準備資料來源 在專案下新建一個student.txt檔案,裡面的內容為: print? <code class="language-java">1,zhangsan,20 2,lisi,21 3,wanger,1
第67課:Spark SQL下采用Java和Scala實現Join的案例綜合實戰(鞏固前面學習的Spark SQL知識)
內容: 1.SparkSQL案例分析 2.SparkSQL下采用Java和Scala實現案例 一、SparkSQL下采用Java和Scala實現案例 學生成績: {"name":"Michael","score":98} {"name":"Andy"
Spark Core 和 Spark SQL 實現分組取Top N(基於scala)
分組取Top N在日常需求中很多見: 每個班級分數前三名同學的名字以及分數 各省指標數量前三的市的名字 等等需求,主要思想就是在某一個分割槽(班級,省)中取出該分割槽Top N的資料 測試資料格式: 如上圖,欄位含義為,班級,學生姓名,分數 下面我們通過一
Spark 的鍵值對(pair RDD)操作,Scala實現
一:什麼是Pair RDD? Spark為包含鍵值對對型別的RDD提供了一些專有操作,這些操作就被稱為Pair RDD,Pair RDD是很多程式的構成要素,因為它們提供了並行操作對各個鍵或跨節點重新進行資料分組的操作介面。 二:Pair RDD的操作例項
hive 使用udf函式實現資料匯入到mysql
利用hive內建的hive-contrib 來實現udf匯入mysql,同時還需要mysql驅動包 例子: add jar /usr/local/hive-0.13.1b/hive-contri
hive中使用自定義函式(UDF)實現分析函式row_number的功能
1. hive0.10及之前的版本沒有row_number這個函式,假設我們現在出現如下業務場景,現在我們在hdfs上有個log日誌檔案,為了方便敘述,該檔案只有2個欄位,第一個是使用者的id,第二個是當天登入的timestamp,現在我們需要求每個使用者最早登入的那條記錄(
impala udf函式實現中文擷取
目前,impala 的substr函式及substring函式都不支援中文的擷取,因此,需要通過udf函式實現。具體的實現效果需要與substr的英文效果相同。具體如下: SUBSTR("abcde",3)=cde SUBSTR("abcde",-2)=de SUBST
Spark 之 UDF 函式
package cn.com.systex import scala.reflect.runtime.universe import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark
spark將資料載入到postgresql中的Scala實現
spark將資料載入到postgresql中: 方法一: def save2Postgresql(jdbcDf: DataFrame, url: String, tableName: Strin
Spark-基於scala實現文章特徵提取(TF-IDF)
一.基本原理: TF-IDF(term frequency–inverse document frequency):TF表示 詞頻,IDF表示 反文件頻率.TF-IDF主要內容就是:如果一個詞語在本篇文章出現的頻率(TF)高,並且在其他文章出現少(即反文件頻率IDF高)
UDF函式:對字串實現sha256加密,返回64位十六進位制字串
實際需求多一些特殊資料需要加密儲存。下面 實現sha256加密,返回64位十六進位制字串package cnsuning.udf.functions.string; import org.apache
python、scala、java分別實現在spark上實現WordCount
下面分別貼出python、scala、java版本的wordcount程式: python版: import logging from operator import add from pyspark import SparkContext logging.basicCo