1. 程式人生 > 其它 >Spark ML特徵處理區域性敏感雜湊(Locality Sensitive Hashing)

Spark ML特徵處理區域性敏感雜湊(Locality Sensitive Hashing)

LSH:將向量進行雜湊分桶,使得原語義上相似的文字大概率被雜湊到同一個桶中,同個桶內的文字可以認為是大概率是相似的。

LSH:區域性敏感雜湊演算法,是一種針對海量高維資料的快速最近鄰查詢演算法,主要有如下用法:

  • 全基因組的相關研究:生物學家經常使用 LSH 在基因組資料庫中鑑定相似的基因表達。
  • 大規模的圖片搜尋: Google 使用 LSH 和 PageRank 來構建他們的圖片搜尋技術VisualRank。
  • 音訊/視訊指紋識別:在多媒體技術中,LSH 被廣泛用於 A/V 資料的指紋識別。
  • 近似重複的檢測: LSH 通常用於對大量文件,網頁和其他檔案進行去重處理。

在推薦系統等應用中,經常會遇到的一個問題就是面臨著海量的高維資料,查詢最近鄰。如果使用線性查詢對於高維資料計算量和耗時都是災難性的。為了解決這樣的問題,出現了一種特殊的hash函式,使得2個相似度很高的資料以較高的概率對映成同一個hash值,而令2個相似度很低的資料以極低的概率對映成同一個hash值。我們把這樣的函式,叫做LSH(區域性敏感雜湊)。

區域性敏感雜湊(LSH)是一類重要的雜湊技術,常用於大資料集的聚類、近似最近鄰搜尋和異常值檢測。 LSH 的總體思路是使用一個函式族(“LSH family”)將資料點雜湊到桶中,使得彼此靠近的資料點很有可能在同一個桶中,而距離較遠的資料點 彼此遠離的很可能在不同的桶中。 LSH 家族的正式定義如下:

在度量空間 (M, d) 中,其中 M 是一個集合,d 是 M 上的距離函式,LSH 族是滿足以下屬性的函式 h 族:

這個 LSH 家族被稱為 (r1, r2, p1, p2) 敏感的。

在 Spark 中,不同的 LSH 系列在不同的類中實現(例如 MinHash),並且在每個類中提供了用於特徵變換、近似相似連線和近似最近鄰的 API。

在 LSH 中,我們將誤報定義為一對遠處的輸入特徵(d(p,q)≥r2),它們雜湊到同一個桶中,我們將誤報定義為一對附近的特徵(d( p,q)≤r1) 被雜湊到不同的桶中。

一、LSH Operations

描述了 LSH 可用於的主要操作型別。 擬合的 LSH 模型具有用於這些操作中的每一個的方法。

1.1Feature Transformation(特徵轉換)

特徵轉換是將雜湊值新增為新列的基本功能。 這對於降維很有用。 使用者可以通過設定 inputCol 和 outputCol 來指定輸入和輸出列名。

LSH 還支援多個 LSH 雜湊表。 使用者可以通過設定 numHashTables 來指定雜湊表的數量。 這也用於近似相似連線和近似最近鄰中的

OR-amplification增加雜湊表的數量會提高準確性,但也會增加通訊成本和執行時間。

outputCol 的型別是 Seq[Vector],其中陣列的維度等於 numHashTables,向量的維度當前設定為 1。

1.2Approximate Similarity Join(近似相似連線)

近似相似連線採用兩個資料集,並近似返回資料集中距離小於使用者定義閾值的行對。 近似相似連線既支援連線兩個不同的資料集,也支援自連線。 自連線會產生一些重複的對。

近似相似連線接受轉換和未轉換的資料集作為輸入。 如果使用未轉換的資料集,它將自動轉換。 在這種情況下,雜湊簽名將被建立為 outputCol。 在joined dataset中,可以在datasetA和datasetB中查詢原始資料集。 距離列將新增到輸出資料集中,以顯示返回的每對行之間的真實距離。

1.3Approximate Nearest Neighbor Search(近似最近鄰搜尋)

近似最近鄰搜尋採用資料集(特徵向量)和鍵(單個特徵向量),它近似返回資料集中與向量最近的指定行數。 近似最近鄰搜尋接受轉換和未轉換的資料集作為輸入。 如果使用未轉換的資料集,它將自動轉換。 在這種情況下,雜湊簽名將被建立為 outputCol。 距離列將新增到輸出資料集中,以顯示每個輸出行與搜尋鍵之間的真實距離。

注意:當雜湊桶中沒有足夠的候選者時,近似最近鄰搜尋將返回少於 k 行。

二、LSH Algorithms

2.1Bucketed Random Projection for Euclidean Distance

Bucketed Random Projection 是歐幾里得距離的 LSH 系列。 歐幾里得距離定義如下:

它的 LSH 系列將特徵向量 x 投影到隨機單位向量 v 上,並將投影結果分成雜湊桶:

其中 r 是使用者定義的桶長度。 桶長度可用於控制雜湊桶的平均大小(以及桶的數量)。 更大的桶長度(即更少的桶)會增加特徵被雜湊到同一個桶的概率(增加真假陽性的數量)。

Bucketed Random Projection 接受任意向量作為輸入特徵,並支援稀疏向量和密集向量。

%spark
// 特徵處理區域性敏感雜湊 —— —— BucketedRandomProjectionLSH
// 輸入是密集或稀疏向量,每個向量代表歐幾里得距離空間中的一個點。 輸出將是可配置維度的向量。 同一維度的雜湊值由相同的雜湊函式計算得出。
import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

val dfA = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 1.0)),
  (1, Vectors.dense(1.0, -1.0)),
  (2, Vectors.dense(-1.0, -1.0)),
  (3, Vectors.dense(-1.0, 1.0))
)).toDF("id", "features")

val dfB = spark.createDataFrame(Seq(
  (4, Vectors.dense(1.0, 0.0)),
  (5, Vectors.dense(-1.0, 0.0)),
  (6, Vectors.dense(0.0, 1.0)),
  (7, Vectors.dense(0.0, -1.0))
)).toDF("id", "features")

val key = Vectors.dense(1.0, 0.0)

val brp = new BucketedRandomProjectionLSH()
// 每個雜湊桶的長度,較大的桶降低了誤報率。 
// 桶的數量將是(輸入向量的最大 L2 範數)/桶長度。
// 如果輸入向量被歸一化,1-10 倍 pow(numRecords, -1/inputDim) 將是一個合理的值 
  .setBucketLength(2.0)
// LSH OR-amplification 中使用的雜湊表數量的引數。
// LSH OR-amplification 可用於降低假陰性率。 此引數的較高值會降低假陰性率,但會增加計算複雜性。 
  .setNumHashTables(3)
//   隨機種子的引數。 
//   .setSeed()
  .setInputCol("features")
  .setOutputCol("hashes")

// 擬合數據訓練轉換器
val model = brp.fit(dfA)

// 1、特徵轉換
println("雜湊值儲存在“雜湊”列中的雜湊資料集: ")
model.transform(dfA).show(false)

// 2、計算輸入行的區域性敏感雜湊,然後執行近似相似性連線。 
// 可以通過傳入已經轉換的資料集來避免計算雜湊,例如 `model.approxSimilarityJoin(transformedA,transformedB,1.5)` 
// approxSimilarityJoin方法:連線兩個資料集以近似找到距離小於閾值的所有行對。 如果 outputCol 缺失,該方法將轉換資料; 如果 outputCol 存在,它將使用 outputCol。
println("在小於 1.5 的歐幾里得距離上近似加入 dfA 和 dfB: ")
model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance")
  .select(col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("EuclideanDistance")).show()

// 3、計算輸入行的區域性敏感雜湊,然後執行近似最近鄰搜尋。 
// 可以通過傳入已經轉換的資料集來避免計算雜湊,例如`model.approxNearestNeighbors(transformedA, key, 2)` 
// 給定一個大資料集和一個專案,大約找到與該專案最近距離的 k 個專案。 如果 outputCol 缺失,該方法將轉換資料; 如果 outputCol 存在,它將使用 outputCol。
println("近似搜尋 dfA 的 2 個最近鄰居: ")
model.approxNearestNeighbors(dfA, key, 2).show(false)

輸出:
雜湊值儲存在“雜湊”列中的雜湊資料集: 
+---+-----------+-----------------------+
|id |features   |hashes                 |
+---+-----------+-----------------------+
|0  |[1.0,1.0]  |[[0.0], [0.0], [-1.0]] |
|1  |[1.0,-1.0] |[[-1.0], [-1.0], [0.0]]|
|2  |[-1.0,-1.0]|[[-1.0], [-1.0], [0.0]]|
|3  |[-1.0,1.0] |[[0.0], [0.0], [-1.0]] |
+---+-----------+-----------------------+

在小於 1.5 的歐幾里得距離上近似加入 dfA 和 dfB: 
+---+---+-----------------+
|idA|idB|EuclideanDistance|
+---+---+-----------------+
|  1|  4|              1.0|
|  0|  6|              1.0|
|  1|  7|              1.0|
|  3|  5|              1.0|
|  0|  4|              1.0|
|  3|  6|              1.0|
|  2|  7|              1.0|
|  2|  5|              1.0|
+---+---+-----------------+

近似搜尋 dfA 的 2 個最近鄰居: 
+---+----------+-----------------------+-------+
|id |features  |hashes                 |distCol|
+---+----------+-----------------------+-------+
|0  |[1.0,1.0] |[[0.0], [0.0], [-1.0]] |1.0    |
|1  |[1.0,-1.0]|[[-1.0], [-1.0], [0.0]]|1.0    |
+---+----------+-----------------------+-------+

2.2MinHash for Jaccard Distance

MinHash 是 Jaccard 距離的 LSH 系列,其中輸入特徵是自然數集。 兩個集合的 Jaccard 距離由它們的交集和並集的基數定義:

MinHash 將隨機雜湊函式 g 應用於集合中的每個元素,並取所有雜湊值中的最小值:

MinHash 的輸入集表示為二進位制向量,其中向量索引表示元素本身,向量中的非零值表示該元素在集合中的存在。 雖然支援密集和稀疏向量,但通常建議使用稀疏向量以提高效率。 例如 Vectors.sparse(10, Array[(2, 1.0), (3, 1.0), (5, 1.0)]) 表示空間中有 10 個元素。 該集合包含元素 2、元素 3 和元素 5。所有非零值都被視為二進位制“1”值。

注意:空集不能被 MinHash 轉換,這意味著任何輸入向量必須至少有 1 個非零條目。

%spark
// 特徵處理區域性敏感雜湊 —— —— MinHashLSH
// Jaccard 距離的 LSH 類。
// 輸入可以是密集或稀疏向量,但如果是稀疏的,則效率更高。 
// 例如 Vectors.sparse(10, Array((2, 1.0), (3, 1.0), (5, 1.0))) 表示空間中有 10 個元素。 該集合包含元素 2、3 和 5。此外,任何輸入向量必須至少有 1 個非零索引,並且所有非零值都被視為二進位制“1”值。 
import org.apache.spark.ml.feature.MinHashLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

val dfA = spark.createDataFrame(Seq(
  (0, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0)))),
  (1, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (4, 1.0)))),
  (2, Vectors.sparse(6, Seq((0, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "features")

val dfB = spark.createDataFrame(Seq(
  (3, Vectors.sparse(6, Seq((1, 1.0), (3, 1.0), (5, 1.0)))),
  (4, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (5, 1.0)))),
  (5, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "features")

val key = Vectors.sparse(6, Seq((1, 1.0), (3, 1.0)))

val mh = new MinHashLSH()
// LSH OR-amplification 中使用的雜湊表數量的引數。
// LSH OR-amplification 可用於降低假陰性率。 此引數的較高值會降低假陰性率,但會增加計算複雜性。 
  .setNumHashTables(5)
//   隨機種子的引數。 
//   .setSeed()
  .setInputCol("features")
  .setOutputCol("hashes")

val model = mh.fit(dfA)

// 1、特徵轉換
println("雜湊值儲存在“雜湊”列中的雜湊資料集: ")
model.transform(dfA).show()

// 計算輸入行的區域性敏感雜湊,然後執行近似相似連線。
// 我們可以通過傳入已經轉換的資料集來避免計算雜湊,例如 `model.approxSimilarityJoin(transformedA,transformedB,0.6)` 
println("在小於 0.6 的 Jaccard 距離上近似加入 dfA 和 dfB: ")
model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
  .select(col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("JaccardDistance")).show()

// 計算輸入行的區域性敏感雜湊,然後執行近似最近鄰搜尋。
// 我們可以通過傳入已經轉換的資料集來避免計算雜湊,例如
// `model.approxNearestNeighbors(transformedA, key, 2)`
// 當沒有找到足夠的近似近鄰候選時,它可能返回少於 2 行。 
println("近似搜尋 dfA 的 2 個最近鄰居: ")
model.approxNearestNeighbors(dfA, key, 2).show()

輸出:
雜湊值儲存在“雜湊”列中的雜湊資料集: 
+---+--------------------+--------------------+
| id|            features|              hashes|
+---+--------------------+--------------------+
|  0|(6,[0,1,2],[1.0,1...|[[2.25592966E8], ...|
|  1|(6,[2,3,4],[1.0,1...|[[2.25592966E8], ...|
|  2|(6,[0,2,4],[1.0,1...|[[2.25592966E8], ...|
+---+--------------------+--------------------+

在小於 0.6 的 Jaccard 距離上近似加入 dfA 和 dfB: 
+---+---+---------------+
|idA|idB|JaccardDistance|
+---+---+---------------+
|  1|  4|            0.5|
|  0|  5|            0.5|
|  1|  5|            0.5|
|  2|  5|            0.5|
+---+---+---------------+

近似搜尋 dfA 的 2 個最近鄰居: 
+---+--------------------+--------------------+-------+
| id|            features|              hashes|distCol|
+---+--------------------+--------------------+-------+
|  1|(6,[2,3,4],[1.0,1...|[[2.25592966E8], ...|   0.75|
+---+--------------------+--------------------+-------+