伴魚:藉助 Flink 完成機器學習特徵系統的升級
簡介:Flink 用於機器學習特徵工程,解決了特徵上線難的問題;以及 SQL + Python UDF 如何用於生產實踐。
本文作者陳易生,介紹了伴魚平臺機器學習特徵系統的升級,在架構上,從 Spark 轉為 Flink,解決了特徵上線難的問題,以及 SQL + Python UDF 如何用於生產實踐。 主要內容為:
- 前言
- 老版特徵系統 V1
- 新版特徵系統 V2
- 總結
一、前言
在伴魚,我們在多個線上場景使用機器學習提高使用者的使用體驗,例如:在伴魚繪本中,我們根據使用者的帖子瀏覽記錄,為使用者推薦他們感興趣的帖子;在轉化後臺裡,我們根據使用者的繪本購買記錄,為使用者推薦他們可能感興趣的課程等。
特徵是機器學習模型的輸入。如何高效地將特徵從資料來源加工出來,讓它能夠被線上服務高效地訪問,決定了我們能否在生產環境可靠地使用機器學習。為此,我們搭建了特徵系統,系統性地解決這一問題。目前,伴魚的機器學習特徵系統運行了接近 100 個特徵,支援了多個業務線的模型對線上獲取特徵的需求。
下面,我們將介紹特徵系統在伴魚的演進過程,以及其中的權衡考量。
二、舊版特徵系統 V1
特徵系統 V1 由三個核心元件構成:特徵管道,特徵倉庫,和特徵服務。整體架構如下圖所示:
- 批特徵管道使用 Spark 實現,由 DolphinScheduler 進行排程,跑在 YARN 叢集上;
- 出於技術棧的一致考慮,流特徵管道使用 Spark Structured Streaming 實現,和批特徵管道一樣跑在 YARN 叢集上。
特徵倉庫選用合適的儲存元件 (Redis) 和資料結構 (Hashes),為模型服務提供低延遲的特徵訪問能力。之所以選用 Redis 作為儲存,是因為:
- 伴魚有豐富的 Redis 使用經驗;
- 包括DoorDash Feature Store和Feast在內的業界特徵倉庫解決方案都使用了 Redis。
特徵服務遮蔽特徵倉庫的儲存和資料結構,對外暴露 RPC 介面GetFeatures(EntityName, FeatureNames)
HMGET EntityName FeatureName_1 ... FeatureName_N
操作。
這一版本的特徵系統存在幾個問題:
- 演算法工程師缺少控制,導致迭代效率低。這個問題與系統涉及的技術棧和公司的組織架構有關。在整個系統中,特徵管道的迭代需求最高,一旦模型對特徵有新的需求,就需要修改或者編寫一個新的 Spark 任務。而 Spark 任務的編寫需要有一定的 Java 或 Scala 知識,不屬於演算法工程師的常見技能,因此交由大資料團隊全權負責。大資料團隊同時負責多項資料需求,往往有很多排期任務。結果便是新特徵的上線涉及頻繁的跨部門溝通,迭代效率低;
- 特徵管道只完成了輕量的特徵工程,降低線上推理的效率。由於特徵管道由大資料工程師而非演算法工程師編寫,複雜的資料預處理涉及更高的溝通成本,因此這些特徵的預處理程度都比較輕量,更多的預處理被留到模型服務甚至模型內部進行,增大了模型推理的時延。
為了解決這幾個問題,特徵系統 V2 提出幾個設計目的:
- 將控制權交還演算法工程師,提高迭代效率;
- 將更高權重的特徵工程交給特徵管道,提高線上推理的效率。
三、新版特徵系統 V2
特徵系統 V2 相比特徵系統 V1 在架構上的唯一不同點在於,它將特徵管道切分為三部分:特徵生成管道,特徵源,和特徵注入管道。值得一提的是,管道在實現上均從 Spark 轉為 Flink,和公司資料基礎架構的發展保持一致。特徵系統 V2 的整體架構如下圖所示:
1. 特徵生成管道
特徵生成管道讀取原始資料來源,加工為特徵,並將特徵寫入指定特徵源 (而非特徵倉庫)。
- 如果管道以流資料來源作為原始資料來源,則它是流特徵生成管道;
- 如果管道以批資料來源作為原始資料來源,則它是批特徵生成管道。
特徵生成管道的邏輯由演算法工程師全權負責編寫。其中,批特徵生成管道使用 HiveQL 編寫,由 DolphinScheduler 排程。流特徵生成管道使用 PyFlink 實現,詳情見下圖:
- 用 Flink SQL 宣告 Flink 任務源 (source.sql) 和定義特徵工程邏輯 (transform.sql);
- (可選) 用 Python 實現特徵工程邏輯中可能包含的 UDF 實現 (udf_def.py);
- 使用自研的程式碼生成工具,生成可執行的 PyFlink 任務指令碼 (run.py);
- 本地使用由平臺準備好的 Docker 環境除錯 PyFlink 指令碼,確保能在本地正常執行;
- 把程式碼提交到一個統一管理特徵管道的程式碼倉庫,由 AI 平臺團隊進行程式碼稽核。稽核通過的指令碼會被部署到伴魚實時計算平臺,完成特徵生成管道的上線。
這一套流程確保了:
- 演算法工程師掌握上線特徵的自主權;
- 平臺工程師把控特徵生成管道的程式碼質量,並在必要時可以對它們實現重構,而無需演算法工程師的介入。
2. 特徵源
特徵源儲存從原始資料來源加工形成的特徵。值得強調的是,它同時還是連線演算法工程師和 AI 平臺工程師的橋樑。演算法工程師只負責實現特徵工程的邏輯,將原始資料加工為特徵,寫入特徵源,剩下的事情就交給 AI 平臺。平臺工程師實現特徵注入管道,將特徵寫入特徵倉庫,以特徵服務的形式對外提供資料訪問服務。
3. 特徵注入管道
特徵注入管道將特徵從特徵源讀出,寫入特徵倉庫。由於 Flink 社群缺少對 Redis sink 的原生支援,我們通過拓展RichSinkFunction簡單地實現了StreamRedisSink
和BatchRedisSink
,很好地滿足我們的需求。
其中,BatchRedisSink
通過Flink Operator State和Redis Pipelining的簡單結合,大量參考 Flink 文件中的BufferingSink
,實現了批量寫入,大幅減少對 Redis Server 的請求量,增大吞吐,寫入效率相比逐條插入提升了 7 倍。BatchRedisSink
的簡要實現如下。其中,flush
實現了批量寫入 Redis 的核心邏輯,checkpointedState
/bufferedElements
/snapshotState
/initializeState
實現了使用 Flink 有狀態運算元管理元素快取的邏輯。
class BatchRedisSink(
pipelineBatchSize: Int
) extends RichSinkFunction[(String, Timestamp, Map[String, String])]
with CheckpointedFunction {
@transient
private var checkpointedState
: ListState[(String, java.util.Map[String, String])] = _
private val bufferedElements
: ListBuffer[(String, java.util.Map[String, String])] =
ListBuffer.empty[(String, java.util.Map[String, String])]
private var jedisPool: JedisPool = _
override def invoke(
value: (String, Timestamp, Map[String, String]),
context: SinkFunction.Context
): Unit = {
import scala.collection.JavaConverters._
val (key, _, featureKVs) = value
bufferedElements += (key -> featureKVs.asJava)
if (bufferedElements.size == pipelineBatchSize) {
flush()
}
}
private def flush(): Unit = {
var jedis: Jedis = null
try {
jedis = jedisPool.getResource
val pipeline = jedis.pipelined()
for ((key, hash) <- bufferedElements) {
pipeline.hmset(key, hash)
}
pipeline.sync()
} catch { ... } finally { ... }
bufferedElements.clear()
}
override def snapshotState(context: FunctionSnapshotContext): Unit = {
checkpointedState.clear()
for (element <- bufferedElements) {
checkpointedState.add(element)
}
}
override def initializeState(context: FunctionInitializationContext): Unit = {
val descriptor =
new ListStateDescriptor[(String, java.util.Map[String, String])](
"buffered-elements",
TypeInformation.of(
new TypeHint[(String, java.util.Map[String, String])]() {}
)
)
checkpointedState = context.getOperatorStateStore.getListState(descriptor)
import scala.collection.JavaConverters._
if (context.isRestored) {
for (element <- checkpointedState.get().asScala) {
bufferedElements += element
}
}
}
override def open(parameters: Configuration): Unit = {
try {
jedisPool = new JedisPool(...)
} catch { ... }
}
override def close(): Unit = {
flush()
if (jedisPool != null) {
jedisPool.close()
}
}
}
特徵系統 V2 很好地滿足了我們提出的設計目的。
- 由於特徵生成管道的編寫只需用到 SQL 和 Python 這兩種演算法工程師十分熟悉的工具,因此他們全權負責特徵生成管道的編寫和上線,無需依賴大資料團隊,大幅提高了迭代效率。在熟悉後,演算法工程師通常只需花費半個小時以內,就可以完成流特徵的編寫、除錯和上線。而這個過程原本需要花費數天,取決於大資料團隊的排期;
- 出於同樣的原因,演算法工程師可以在有需要的前提下,完成更重度的特徵工程,從而減少模型服務和模型的負擔,提高模型線上推理效率。
四、總結
特徵系統 V1 解決了特徵上線的問題,而特徵系統 V2 在此基礎上,解決了特徵上線難的問題。在特徵系統的演進過程中,我們總結出作為平臺研發的幾點經驗:
- 平臺應該提供使用者想用的工具。這與 Uber ML 平臺團隊在內部推廣的經驗相符。演算法工程師在 Python 和 SQL 環境下工作效率最高,而不熟悉 Java 和 Scala。那麼,想讓演算法工程師自主編寫特徵管道,平臺應該支援演算法工程師使用 Python 和 SQL 編寫特徵管道,而不是讓演算法工程師去學 Java 和 Scala,或是把工作轉手給大資料團隊去做;
- 平臺應該提供易用的本地除錯工具。我們提供的 Docker 環境封裝了 Kafka 和 Flink,讓使用者可以在本地快速除錯 PyFlink 指令碼,而無需等待管道部署到測試環境後再除錯;
- 平臺應該在鼓勵使用者自主使用的同時,通過自動化檢查或程式碼稽核等方式牢牢把控質量。
原文連結
本文為阿里雲原創內容,未經允許不得轉載。