1. 程式人生 > 其它 >伴魚:藉助 Flink 完成機器學習特徵系統的升級

伴魚:藉助 Flink 完成機器學習特徵系統的升級

簡介:Flink 用於機器學習特徵工程,解決了特徵上線難的問題;以及 SQL + Python UDF 如何用於生產實踐。

本文作者陳易生,介紹了伴魚平臺機器學習特徵系統的升級,在架構上,從 Spark 轉為 Flink,解決了特徵上線難的問題,以及 SQL + Python UDF 如何用於生產實踐。 主要內容為:

  1. 前言
  2. 老版特徵系統 V1
  3. 新版特徵系統 V2
  4. 總結

一、前言

在伴魚,我們在多個線上場景使用機器學習提高使用者的使用體驗,例如:在伴魚繪本中,我們根據使用者的帖子瀏覽記錄,為使用者推薦他們感興趣的帖子;在轉化後臺裡,我們根據使用者的繪本購買記錄,為使用者推薦他們可能感興趣的課程等。

特徵是機器學習模型的輸入。如何高效地將特徵從資料來源加工出來,讓它能夠被線上服務高效地訪問,決定了我們能否在生產環境可靠地使用機器學習。為此,我們搭建了特徵系統,系統性地解決這一問題。目前,伴魚的機器學習特徵系統運行了接近 100 個特徵,支援了多個業務線的模型對線上獲取特徵的需求。

下面,我們將介紹特徵系統在伴魚的演進過程,以及其中的權衡考量。

二、舊版特徵系統 V1

特徵系統 V1 由三個核心元件構成:特徵管道,特徵倉庫,和特徵服務。整體架構如下圖所示:

特徵管道包括流特徵管道批特徵管道,它們分別消費流資料來源和批資料來源,對資料經過預處理加工成特徵 (這一步稱為特徵工程),並將特徵寫入特徵倉庫。
  • 批特徵管道使用 Spark 實現,由 DolphinScheduler 進行排程,跑在 YARN 叢集上;
  • 出於技術棧的一致考慮,流特徵管道使用 Spark Structured Streaming 實現,和批特徵管道一樣跑在 YARN 叢集上。

特徵倉庫選用合適的儲存元件 (Redis) 和資料結構 (Hashes),為模型服務提供低延遲的特徵訪問能力。之所以選用 Redis 作為儲存,是因為:

  • 伴魚有豐富的 Redis 使用經驗;
  • 包括DoorDash Feature StoreFeast在內的業界特徵倉庫解決方案都使用了 Redis。

特徵服務遮蔽特徵倉庫的儲存和資料結構,對外暴露 RPC 介面GetFeatures(EntityName, FeatureNames)

,提供對特徵的低延遲點查詢。在實現上,這一介面基本對應於 Redis 的HMGET EntityName FeatureName_1 ... FeatureName_N操作。

這一版本的特徵系統存在幾個問題:

  • 演算法工程師缺少控制,導致迭代效率低。這個問題與系統涉及的技術棧和公司的組織架構有關。在整個系統中,特徵管道的迭代需求最高,一旦模型對特徵有新的需求,就需要修改或者編寫一個新的 Spark 任務。而 Spark 任務的編寫需要有一定的 Java 或 Scala 知識,不屬於演算法工程師的常見技能,因此交由大資料團隊全權負責。大資料團隊同時負責多項資料需求,往往有很多排期任務。結果便是新特徵的上線涉及頻繁的跨部門溝通,迭代效率低;
  • 特徵管道只完成了輕量的特徵工程,降低線上推理的效率。由於特徵管道由大資料工程師而非演算法工程師編寫,複雜的資料預處理涉及更高的溝通成本,因此這些特徵的預處理程度都比較輕量,更多的預處理被留到模型服務甚至模型內部進行,增大了模型推理的時延。

為了解決這幾個問題,特徵系統 V2 提出幾個設計目的:

  • 將控制權交還演算法工程師,提高迭代效率;
  • 將更高權重的特徵工程交給特徵管道,提高線上推理的效率。

三、新版特徵系統 V2

特徵系統 V2 相比特徵系統 V1 在架構上的唯一不同點在於,它將特徵管道切分為三部分:特徵生成管道,特徵源,和特徵注入管道。值得一提的是,管道在實現上均從 Spark 轉為 Flink,和公司資料基礎架構的發展保持一致。特徵系統 V2 的整體架構如下圖所示:

1. 特徵生成管道

特徵生成管道讀取原始資料來源,加工為特徵,並將特徵寫入指定特徵源 (而非特徵倉庫)。

  • 如果管道以流資料來源作為原始資料來源,則它是流特徵生成管道;
  • 如果管道以批資料來源作為原始資料來源,則它是批特徵生成管道。

特徵生成管道的邏輯由演算法工程師全權負責編寫。其中,批特徵生成管道使用 HiveQL 編寫,由 DolphinScheduler 排程。流特徵生成管道使用 PyFlink 實現,詳情見下圖:

演算法工程師需要遵守下面步驟:
  1. 用 Flink SQL 宣告 Flink 任務源 (source.sql) 和定義特徵工程邏輯 (transform.sql);
  2. (可選) 用 Python 實現特徵工程邏輯中可能包含的 UDF 實現 (udf_def.py);
  3. 使用自研的程式碼生成工具,生成可執行的 PyFlink 任務指令碼 (run.py);
  4. 本地使用由平臺準備好的 Docker 環境除錯 PyFlink 指令碼,確保能在本地正常執行;
  5. 把程式碼提交到一個統一管理特徵管道的程式碼倉庫,由 AI 平臺團隊進行程式碼稽核。稽核通過的指令碼會被部署到伴魚實時計算平臺,完成特徵生成管道的上線。

這一套流程確保了:

  • 演算法工程師掌握上線特徵的自主權;
  • 平臺工程師把控特徵生成管道的程式碼質量,並在必要時可以對它們實現重構,而無需演算法工程師的介入。

2. 特徵源

特徵源儲存從原始資料來源加工形成的特徵。值得強調的是,它同時還是連線演算法工程師和 AI 平臺工程師的橋樑。演算法工程師只負責實現特徵工程的邏輯,將原始資料加工為特徵,寫入特徵源,剩下的事情就交給 AI 平臺。平臺工程師實現特徵注入管道,將特徵寫入特徵倉庫,以特徵服務的形式對外提供資料訪問服務。

3. 特徵注入管道

特徵注入管道將特徵從特徵源讀出,寫入特徵倉庫。由於 Flink 社群缺少對 Redis sink 的原生支援,我們通過拓展RichSinkFunction簡單地實現了StreamRedisSinkBatchRedisSink,很好地滿足我們的需求。

其中,BatchRedisSink通過Flink Operator StateRedis Pipelining的簡單結合,大量參考 Flink 文件中的BufferingSink,實現了批量寫入,大幅減少對 Redis Server 的請求量,增大吞吐,寫入效率相比逐條插入提升了 7 倍BatchRedisSink的簡要實現如下。其中,flush實現了批量寫入 Redis 的核心邏輯,checkpointedState/bufferedElements/snapshotState/initializeState實現了使用 Flink 有狀態運算元管理元素快取的邏輯。

class BatchRedisSink(
    pipelineBatchSize: Int
) extends RichSinkFunction[(String, Timestamp, Map[String, String])]
    with CheckpointedFunction {

  @transient
  private var checkpointedState
      : ListState[(String, java.util.Map[String, String])] = _

  private val bufferedElements
      : ListBuffer[(String, java.util.Map[String, String])] =
    ListBuffer.empty[(String, java.util.Map[String, String])]

  private var jedisPool: JedisPool = _

  override def invoke(
      value: (String, Timestamp, Map[String, String]),
      context: SinkFunction.Context
  ): Unit = {
    import scala.collection.JavaConverters._

    val (key, _, featureKVs) = value
    bufferedElements += (key -> featureKVs.asJava)

    if (bufferedElements.size == pipelineBatchSize) {
      flush()
    }
  }

  private def flush(): Unit = {
    var jedis: Jedis = null
    try {
      jedis = jedisPool.getResource
      val pipeline = jedis.pipelined()
      for ((key, hash) <- bufferedElements) {
        pipeline.hmset(key, hash)
      }
      pipeline.sync()
    } catch { ... } finally { ... }
    bufferedElements.clear()
  }

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    checkpointedState.clear()
    for (element <- bufferedElements) {
      checkpointedState.add(element)
    }
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
    val descriptor =
      new ListStateDescriptor[(String, java.util.Map[String, String])](
        "buffered-elements",
        TypeInformation.of(
          new TypeHint[(String, java.util.Map[String, String])]() {}
        )
      )

    checkpointedState = context.getOperatorStateStore.getListState(descriptor)

    import scala.collection.JavaConverters._

    if (context.isRestored) {
      for (element <- checkpointedState.get().asScala) {
        bufferedElements += element
      }
    }
  }

  override def open(parameters: Configuration): Unit = {
    try {
      jedisPool = new JedisPool(...)
    } catch { ... }
  }

  override def close(): Unit = {
    flush()
    if (jedisPool != null) {
      jedisPool.close()
    }
  }
}

特徵系統 V2 很好地滿足了我們提出的設計目的。

  • 由於特徵生成管道的編寫只需用到 SQL 和 Python 這兩種演算法工程師十分熟悉的工具,因此他們全權負責特徵生成管道的編寫和上線,無需依賴大資料團隊,大幅提高了迭代效率。在熟悉後,演算法工程師通常只需花費半個小時以內,就可以完成流特徵的編寫、除錯和上線。而這個過程原本需要花費數天,取決於大資料團隊的排期;
  • 出於同樣的原因,演算法工程師可以在有需要的前提下,完成更重度的特徵工程,從而減少模型服務和模型的負擔,提高模型線上推理效率。

四、總結

特徵系統 V1 解決了特徵上線的問題,而特徵系統 V2 在此基礎上,解決了特徵上線難的問題。在特徵系統的演進過程中,我們總結出作為平臺研發的幾點經驗:

  • 平臺應該提供使用者想用的工具。這與 Uber ML 平臺團隊在內部推廣的經驗相符。演算法工程師在 Python 和 SQL 環境下工作效率最高,而不熟悉 Java 和 Scala。那麼,想讓演算法工程師自主編寫特徵管道,平臺應該支援演算法工程師使用 Python 和 SQL 編寫特徵管道,而不是讓演算法工程師去學 Java 和 Scala,或是把工作轉手給大資料團隊去做;
  • 平臺應該提供易用的本地除錯工具。我們提供的 Docker 環境封裝了 Kafka 和 Flink,讓使用者可以在本地快速除錯 PyFlink 指令碼,而無需等待管道部署到測試環境後再除錯;
  • 平臺應該在鼓勵使用者自主使用的同時,通過自動化檢查或程式碼稽核等方式牢牢把控質量。

原文連結
本文為阿里雲原創內容,未經允許不得轉載。