Spark機器學習:TF-IDF實現原理
先簡單地介紹下什麼是TF-IDF(詞頻-逆文件頻率),它可以反映出語料庫中某篇文件中某個詞的重要性。假設t表示某個詞,d表示一篇文件,則詞頻TF(t,d)是某個詞t在文件d中出現的次數,而文件DF(t,D)是包含詞t的文件數目。為了過濾掉常用的片語,如"the" "a" "of" "that",我們使用逆文件頻率來度量一個詞能提供多少資訊的數值:
IDF(t,D)=log(|D|+1)/(DF(t,D)+1)
這裡|D|表示語料庫的文件總數,為了不讓分母為了0,在此進行了加1平滑操作。而詞頻-逆文件頻率就是TF和IDF的簡單相乘:
TFIDF(t,d,D)=TF(t,d)*IDF(t,D)
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.mllib.feature.HashingTF import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.feature.IDF object TF_IDF_Test { def main(args: Array[String]) { val conf = new SparkConf().setAppName("TfIdfTest") val sc = new SparkContext(conf) // Load documents (one per line). val documents: RDD[Seq[String]] = sc.textFile("...").map(_.split(" ").toSeq) val hashingTF = new HashingTF() val tf: RDD[Vector] = hashingTF.transform(documents) tf.cache() val idf = new IDF().fit(tf) val tfidf: RDD[Vector] = idf.transform(tf) }
下面對程式碼進行詳細的解釋:
1.首先看資料來源documents,它作為hashingTF.transform的引數,要求每一行為一篇文件的內容。
2.下面在看hashingTF.transform的方法原始碼,其呼叫了HashingTF類自身的transform方法對每一篇文件進行處理
3.HashingTF類自身的transform方法,這裡的引數document是按空格劃分了的單詞序列,numFeatures為HashingTF類的成員變數預設為2^20,也就是hash的維數。/** * Transforms the input document to term frequency vectors. */ @Since("1.1.0") def transform[D <: Iterable[_]](dataset: RDD[D]): RDD[Vector] = { dataset.map(this.transform) }
最終我們獲得的是一個稀疏向量,其下index就是單詞的雜湊值,value就是單詞的頻數
3.1 indexof 方法,term.## 等價於獲得物件term的 雜湊值,使用Utils.nonNegativeMod對於獲得的雜湊值模numFeatures取正餘* Transforms the input document into a sparse term frequency vector. */ @Since("1.1.0") def transform(document: Iterable[_]): Vector = { //hash(單詞的hash碼,單詞頻數) val termFrequencies = mutable.HashMap.empty[Int, Double] //遍歷文件的單詞 document.foreach { term => val i = indexOf(term) //獲得單詞的hash碼 //單詞頻數統計 termFrequencies.put(i, termFrequencies.getOrElse(i, 0.0) + 1.0) } //把結果轉換成稀疏向量 Vectors.sparse(numFeatures, termFrequencies.toSeq) }
/**
* Returns the index of the input term.
*/
@Since("1.1.0")
def indexOf(term: Any): Int = Utils.nonNegativeMod(term.##, numFeatures)
Utils.nonNegativeMod
/* Calculates 'x' modulo 'mod', takes to consideration sign of x,
* i.e. if 'x' is negative, than 'x' % 'mod' is negative too
* so function return (x % mod) + mod in that case.
*/
def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}
4.val idf = new IDF().fit(tf),這裡的tf為RDD[Vector],每個稀疏向量的內容參考3。
@Since("1.1.0")
def fit(dataset: RDD[Vector]): IDFModel = {
val idf = dataset.treeAggregate(new IDF.DocumentFrequencyAggregator(
minDocFreq = minDocFreq))(
seqOp = (df, v) => df.add(v),
combOp = (df1, df2) => df1.merge(df2)
).idf()
new IDFModel(idf)
}
treeAggregate和Aggregate類似,它把IDF.DocumentFrequencyAggregator作為初始值,seqop為分割槽類的聚合操作,而comop為分割槽間的聚合操作。下面具體看下DocumentFrequencyAggregator的內容,其使用成員變數df(密集向量)來記錄index(單詞hash碼)在多少個文件中出現過。使用add方法來合併一個新的文件,並更新df和m的值;因為密集和稀疏操作類似,下面以匹配密集為例,values(j) > 0.0,說明j對應的單詞在這篇文件出現過,df(j)+=1。然後使用merge來合併分割槽間的統計結果(這裡只是進行簡單的相加)。最後使用idf()方法對treeAggregate的結果使用公式1得到IDF,把結果封裝到IDFModel類並返回。
class DocumentFrequencyAggregator(val minDocFreq: Int) extends Serializable {
//語料庫的文件總數
private var m = 0L
//BDV為一個密集向量的別名,df對應的值為該index(單詞hash碼)在多少個文件中出現過
private var df: BDV[Long] = _
def this() = this(0)
//新增一個新的文件
def add(doc: Vector): this.type = {
if (isEmpty) {
df = BDV.zeros(doc.size) //初始化0操作
}
doc match {
//如果是稀疏向量
case SparseVector(size, indices, values) =>
val nnz = indices.size
var k = 0
while (k < nnz) {
if (values(k) > 0) {
df(indices(k)) += 1L
}
k += 1
}
//如果是密集向量
case DenseVector(values) =>
val n = values.size
var j = 0
while (j < n) {
//values(j) > 0.0,說明j對應的單詞在這篇文件出現過
if (values(j) > 0.0) {
df(j) += 1L
}
j += 1
}
case other =>
throw new UnsupportedOperationException(
s"Only sparse and dense vectors are supported but got ${other.getClass}.")
}
m += 1L //語料庫的文件數加1
this
}
//合併其他的文件,對文件總數和df進行簡單的相加
def merge(other: DocumentFrequencyAggregator): this.type = {
if (!other.isEmpty) {
m += other.m
if (df == null) {
df = other.df.copy
} else {
df += other.df
}
}
this
}
private def isEmpty: Boolean = m == 0L
/** Returns the current IDF vector. */
def idf(): Vector = {
if (isEmpty) {
throw new IllegalStateException("Haven't seen any document yet.")
}
val n = df.length
val inv = new Array[Double](n)
var j = 0
while (j < n) {
if (df(j) >= minDocFreq) {
inv(j) = math.log((m + 1.0) / (df(j) + 1.0))
}
j += 1
}
Vectors.dense(inv)
}
}
}
5.val tfidf: RDD[Vector] = idf.transform(tf),對4得到的idf(IDFModel)乘上tf即得到最終的結果。 下面先把idf進行廣播,然後和各個分割槽的tf對應相乘
def transform(dataset: RDD[Vector]): RDD[Vector] = {
val bcIdf = dataset.context.broadcast(idf)
dataset.mapPartitions(iter => iter.map(v => IDFModel.transform(bcIdf.value, v)))
}
對應相乘
def transform(idf: Vector, v: Vector): Vector = {
val n = v.size
v match {
case SparseVector(size, indices, values) =>
val nnz = indices.size
val newValues = new Array[Double](nnz)
var k = 0
while (k < nnz) {
newValues(k) = values(k) * idf(indices(k))
k += 1
}
Vectors.sparse(n, indices, newValues)
case DenseVector(values) =>
val newValues = new Array[Double](n)
var j = 0
while (j < n) {
newValues(j) = values(j) * idf(j)
j += 1
}
Vectors.dense(newValues)
case other =>
throw new UnsupportedOperationException(
s"Only sparse and dense vectors are supported but got ${other.getClass}.")
}
}
tfidf最後的輸出格式為Sparse向量,具體的例子可以參看:Spark-MLib之TFIDF例項講解