Spark ML特徵處理區域性敏感雜湊(Locality Sensitive Hashing)
LSH:將向量進行雜湊分桶,使得原語義上相似的文字大概率被雜湊到同一個桶中,同個桶內的文字可以認為是大概率是相似的。
LSH:區域性敏感雜湊演算法,是一種針對海量高維資料的快速最近鄰查詢演算法,主要有如下用法:
- 全基因組的相關研究:生物學家經常使用 LSH 在基因組資料庫中鑑定相似的基因表達。
- 大規模的圖片搜尋: Google 使用 LSH 和 PageRank 來構建他們的圖片搜尋技術VisualRank。
- 音訊/視訊指紋識別:在多媒體技術中,LSH 被廣泛用於 A/V 資料的指紋識別。
- 近似重複的檢測: LSH 通常用於對大量文件,網頁和其他檔案進行去重處理。
在推薦系統等應用中,經常會遇到的一個問題就是面臨著海量的高維資料,查詢最近鄰。如果使用線性查詢對於高維資料計算量和耗時都是災難性的。為了解決這樣的問題,出現了一種特殊的hash函式,使得2個相似度很高的資料以較高的概率對映成同一個hash值,而令2個相似度很低的資料以極低的概率對映成同一個hash值。我們把這樣的函式,叫做LSH(區域性敏感雜湊)。
區域性敏感雜湊(LSH)是一類重要的雜湊技術,常用於大資料集的聚類、近似最近鄰搜尋和異常值檢測。 LSH 的總體思路是使用一個函式族(“LSH family”)將資料點雜湊到桶中,使得彼此靠近的資料點很有可能在同一個桶中,而距離較遠的資料點 彼此遠離的很可能在不同的桶中。 LSH 家族的正式定義如下:
在度量空間 (M, d) 中,其中 M 是一個集合,d 是 M 上的距離函式,LSH 族是滿足以下屬性的函式 h 族:
這個 LSH 家族被稱為 (r1, r2, p1, p2) 敏感的。
在 Spark 中,不同的 LSH 系列在不同的類中實現(例如 MinHash),並且在每個類中提供了用於特徵變換、近似相似連線和近似最近鄰的 API。
在 LSH 中,我們將誤報定義為一對遠處的輸入特徵(d(p,q)≥r2),它們雜湊到同一個桶中,我們將誤報定義為一對附近的特徵(d( p,q)≤r1) 被雜湊到不同的桶中。
一、LSH Operations
描述了 LSH 可用於的主要操作型別。 擬合的 LSH 模型具有用於這些操作中的每一個的方法。
1.1Feature Transformation(特徵轉換)
特徵轉換是將雜湊值新增為新列的基本功能。 這對於降維很有用。 使用者可以通過設定 inputCol 和 outputCol 來指定輸入和輸出列名。
LSH 還支援多個 LSH 雜湊表。 使用者可以通過設定 numHashTables 來指定雜湊表的數量。 這也用於近似相似連線和近似最近鄰中的
outputCol 的型別是 Seq[Vector],其中陣列的維度等於 numHashTables,向量的維度當前設定為 1。
1.2Approximate Similarity Join(近似相似連線)
近似相似連線採用兩個資料集,並近似返回資料集中距離小於使用者定義閾值的行對。 近似相似連線既支援連線兩個不同的資料集,也支援自連線。 自連線會產生一些重複的對。
近似相似連線接受轉換和未轉換的資料集作為輸入。 如果使用未轉換的資料集,它將自動轉換。 在這種情況下,雜湊簽名將被建立為 outputCol。 在joined dataset中,可以在datasetA和datasetB中查詢原始資料集。 距離列將新增到輸出資料集中,以顯示返回的每對行之間的真實距離。
1.3Approximate Nearest Neighbor Search(近似最近鄰搜尋)
近似最近鄰搜尋採用資料集(特徵向量)和鍵(單個特徵向量),它近似返回資料集中與向量最近的指定行數。 近似最近鄰搜尋接受轉換和未轉換的資料集作為輸入。 如果使用未轉換的資料集,它將自動轉換。 在這種情況下,雜湊簽名將被建立為 outputCol。 距離列將新增到輸出資料集中,以顯示每個輸出行與搜尋鍵之間的真實距離。
注意:當雜湊桶中沒有足夠的候選者時,近似最近鄰搜尋將返回少於 k 行。
二、LSH Algorithms
2.1Bucketed Random Projection for Euclidean Distance
Bucketed Random Projection 是歐幾里得距離的 LSH 系列。 歐幾里得距離定義如下:
它的 LSH 系列將特徵向量 x 投影到隨機單位向量 v 上,並將投影結果分成雜湊桶:
其中 r 是使用者定義的桶長度。 桶長度可用於控制雜湊桶的平均大小(以及桶的數量)。 更大的桶長度(即更少的桶)會增加特徵被雜湊到同一個桶的概率(增加真假陽性的數量)。
Bucketed Random Projection 接受任意向量作為輸入特徵,並支援稀疏向量和密集向量。
%spark // 特徵處理區域性敏感雜湊 —— —— BucketedRandomProjectionLSH // 輸入是密集或稀疏向量,每個向量代表歐幾里得距離空間中的一個點。 輸出將是可配置維度的向量。 同一維度的雜湊值由相同的雜湊函式計算得出。 import org.apache.spark.ml.feature.BucketedRandomProjectionLSH import org.apache.spark.ml.linalg.Vectors import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.col val dfA = spark.createDataFrame(Seq( (0, Vectors.dense(1.0, 1.0)), (1, Vectors.dense(1.0, -1.0)), (2, Vectors.dense(-1.0, -1.0)), (3, Vectors.dense(-1.0, 1.0)) )).toDF("id", "features") val dfB = spark.createDataFrame(Seq( (4, Vectors.dense(1.0, 0.0)), (5, Vectors.dense(-1.0, 0.0)), (6, Vectors.dense(0.0, 1.0)), (7, Vectors.dense(0.0, -1.0)) )).toDF("id", "features") val key = Vectors.dense(1.0, 0.0) val brp = new BucketedRandomProjectionLSH() // 每個雜湊桶的長度,較大的桶降低了誤報率。 // 桶的數量將是(輸入向量的最大 L2 範數)/桶長度。 // 如果輸入向量被歸一化,1-10 倍 pow(numRecords, -1/inputDim) 將是一個合理的值 .setBucketLength(2.0) // LSH OR-amplification 中使用的雜湊表數量的引數。 // LSH OR-amplification 可用於降低假陰性率。 此引數的較高值會降低假陰性率,但會增加計算複雜性。 .setNumHashTables(3) // 隨機種子的引數。 // .setSeed() .setInputCol("features") .setOutputCol("hashes") // 擬合數據訓練轉換器 val model = brp.fit(dfA) // 1、特徵轉換 println("雜湊值儲存在“雜湊”列中的雜湊資料集: ") model.transform(dfA).show(false) // 2、計算輸入行的區域性敏感雜湊,然後執行近似相似性連線。 // 可以通過傳入已經轉換的資料集來避免計算雜湊,例如 `model.approxSimilarityJoin(transformedA,transformedB,1.5)` // approxSimilarityJoin方法:連線兩個資料集以近似找到距離小於閾值的所有行對。 如果 outputCol 缺失,該方法將轉換資料; 如果 outputCol 存在,它將使用 outputCol。 println("在小於 1.5 的歐幾里得距離上近似加入 dfA 和 dfB: ") model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance") .select(col("datasetA.id").alias("idA"), col("datasetB.id").alias("idB"), col("EuclideanDistance")).show() // 3、計算輸入行的區域性敏感雜湊,然後執行近似最近鄰搜尋。 // 可以通過傳入已經轉換的資料集來避免計算雜湊,例如`model.approxNearestNeighbors(transformedA, key, 2)` // 給定一個大資料集和一個專案,大約找到與該專案最近距離的 k 個專案。 如果 outputCol 缺失,該方法將轉換資料; 如果 outputCol 存在,它將使用 outputCol。 println("近似搜尋 dfA 的 2 個最近鄰居: ") model.approxNearestNeighbors(dfA, key, 2).show(false) 輸出: 雜湊值儲存在“雜湊”列中的雜湊資料集: +---+-----------+-----------------------+ |id |features |hashes | +---+-----------+-----------------------+ |0 |[1.0,1.0] |[[0.0], [0.0], [-1.0]] | |1 |[1.0,-1.0] |[[-1.0], [-1.0], [0.0]]| |2 |[-1.0,-1.0]|[[-1.0], [-1.0], [0.0]]| |3 |[-1.0,1.0] |[[0.0], [0.0], [-1.0]] | +---+-----------+-----------------------+ 在小於 1.5 的歐幾里得距離上近似加入 dfA 和 dfB: +---+---+-----------------+ |idA|idB|EuclideanDistance| +---+---+-----------------+ | 1| 4| 1.0| | 0| 6| 1.0| | 1| 7| 1.0| | 3| 5| 1.0| | 0| 4| 1.0| | 3| 6| 1.0| | 2| 7| 1.0| | 2| 5| 1.0| +---+---+-----------------+ 近似搜尋 dfA 的 2 個最近鄰居: +---+----------+-----------------------+-------+ |id |features |hashes |distCol| +---+----------+-----------------------+-------+ |0 |[1.0,1.0] |[[0.0], [0.0], [-1.0]] |1.0 | |1 |[1.0,-1.0]|[[-1.0], [-1.0], [0.0]]|1.0 | +---+----------+-----------------------+-------+
2.2MinHash for Jaccard Distance
MinHash 是 Jaccard 距離的 LSH 系列,其中輸入特徵是自然數集。 兩個集合的 Jaccard 距離由它們的交集和並集的基數定義:
MinHash 將隨機雜湊函式 g 應用於集合中的每個元素,並取所有雜湊值中的最小值:
MinHash 的輸入集表示為二進位制向量,其中向量索引表示元素本身,向量中的非零值表示該元素在集合中的存在。 雖然支援密集和稀疏向量,但通常建議使用稀疏向量以提高效率。 例如 Vectors.sparse(10, Array[(2, 1.0), (3, 1.0), (5, 1.0)]) 表示空間中有 10 個元素。 該集合包含元素 2、元素 3 和元素 5。所有非零值都被視為二進位制“1”值。
注意:空集不能被 MinHash 轉換,這意味著任何輸入向量必須至少有 1 個非零條目。
%spark // 特徵處理區域性敏感雜湊 —— —— MinHashLSH // Jaccard 距離的 LSH 類。 // 輸入可以是密集或稀疏向量,但如果是稀疏的,則效率更高。 // 例如 Vectors.sparse(10, Array((2, 1.0), (3, 1.0), (5, 1.0))) 表示空間中有 10 個元素。 該集合包含元素 2、3 和 5。此外,任何輸入向量必須至少有 1 個非零索引,並且所有非零值都被視為二進位制“1”值。 import org.apache.spark.ml.feature.MinHashLSH import org.apache.spark.ml.linalg.Vectors import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.col val dfA = spark.createDataFrame(Seq( (0, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0)))), (1, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (4, 1.0)))), (2, Vectors.sparse(6, Seq((0, 1.0), (2, 1.0), (4, 1.0)))) )).toDF("id", "features") val dfB = spark.createDataFrame(Seq( (3, Vectors.sparse(6, Seq((1, 1.0), (3, 1.0), (5, 1.0)))), (4, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (5, 1.0)))), (5, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0)))) )).toDF("id", "features") val key = Vectors.sparse(6, Seq((1, 1.0), (3, 1.0))) val mh = new MinHashLSH() // LSH OR-amplification 中使用的雜湊表數量的引數。 // LSH OR-amplification 可用於降低假陰性率。 此引數的較高值會降低假陰性率,但會增加計算複雜性。 .setNumHashTables(5) // 隨機種子的引數。 // .setSeed() .setInputCol("features") .setOutputCol("hashes") val model = mh.fit(dfA) // 1、特徵轉換 println("雜湊值儲存在“雜湊”列中的雜湊資料集: ") model.transform(dfA).show() // 計算輸入行的區域性敏感雜湊,然後執行近似相似連線。 // 我們可以通過傳入已經轉換的資料集來避免計算雜湊,例如 `model.approxSimilarityJoin(transformedA,transformedB,0.6)` println("在小於 0.6 的 Jaccard 距離上近似加入 dfA 和 dfB: ") model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance") .select(col("datasetA.id").alias("idA"), col("datasetB.id").alias("idB"), col("JaccardDistance")).show() // 計算輸入行的區域性敏感雜湊,然後執行近似最近鄰搜尋。 // 我們可以通過傳入已經轉換的資料集來避免計算雜湊,例如 // `model.approxNearestNeighbors(transformedA, key, 2)` // 當沒有找到足夠的近似近鄰候選時,它可能返回少於 2 行。 println("近似搜尋 dfA 的 2 個最近鄰居: ") model.approxNearestNeighbors(dfA, key, 2).show() 輸出: 雜湊值儲存在“雜湊”列中的雜湊資料集: +---+--------------------+--------------------+ | id| features| hashes| +---+--------------------+--------------------+ | 0|(6,[0,1,2],[1.0,1...|[[2.25592966E8], ...| | 1|(6,[2,3,4],[1.0,1...|[[2.25592966E8], ...| | 2|(6,[0,2,4],[1.0,1...|[[2.25592966E8], ...| +---+--------------------+--------------------+ 在小於 0.6 的 Jaccard 距離上近似加入 dfA 和 dfB: +---+---+---------------+ |idA|idB|JaccardDistance| +---+---+---------------+ | 1| 4| 0.5| | 0| 5| 0.5| | 1| 5| 0.5| | 2| 5| 0.5| +---+---+---------------+ 近似搜尋 dfA 的 2 個最近鄰居: +---+--------------------+--------------------+-------+ | id| features| hashes|distCol| +---+--------------------+--------------------+-------+ | 1|(6,[2,3,4],[1.0,1...|[[2.25592966E8], ...| 0.75| +---+--------------------+--------------------+-------+