1. 程式人生 > 實用技巧 >Apache Pulsar 在騰訊 Angel PowerFL 聯邦學習平臺上的實踐

Apache Pulsar 在騰訊 Angel PowerFL 聯邦學習平臺上的實踐

騰訊 Angel PowerFL 聯邦學習平臺

聯邦學習作為新一代人工智慧基礎技術,通過解決資料隱私與資料孤島問題,重塑金融、醫療、城市安防等領域。

騰訊 Angel PowerFL 聯邦學習平臺構建在 Angel 機器學習平臺上,利用 Angel-­PS 支援萬億級模型訓練的能力,將很多在 Worker 上的計算提升到 PS(引數伺服器) 端;Angel PowerFL 為聯邦學習演算法提供了計算、加密、儲存、狀態同步等基本操作介面,通過流程排程模組協調參與方任務執行狀態,而通訊模組完成了任務訓練過程中所有資料的傳輸。Angel PowerFL 聯邦學習已經在騰訊金融雲、騰訊廣告聯合建模等業務中開始落地,並取得初步的效果。

Angel 機器學習平臺:https://github.com/Angel-ML

Angel PowerFL 對聯邦通訊服務的要求

Angel PowerFL 聯邦學習平臺在訓練任務過程當中,對參與方之間的訊息通訊要求極高,要求訊息系統必須穩定可靠、保持高效能且能保證資料安全。Angel PowerFL 的學習任務在訓練過程當中,參與方之間會有大量的加密資料通過通訊模組傳輸,Angel PowerFL 對通訊服務有以下需求:

➡️ 穩定可靠

Angel PowerFL 的學習任務時長從幾分鐘到幾小時,演算法執行對資料的準確性要求很高,不同演算法的資料傳輸峰值也不一樣,這需要通訊模組的服務足夠穩定,並且不能丟資料。

➡️ 高效能傳輸

Angel PowerFL 底層通過 Spark 進行計算,Executor 併發執行會產生很多待傳輸的中間資料,通訊模組需要將這些加密後的資料及時傳輸給對方,這就要求通訊服務做到低延時、高吞吐量。

➡️ 資料安全

雖然 Angel PowerFL 所有資料都通過加密模組進行了加密,但參與聯邦學習的業務可能分佈在不同公司;跨公網進行傳輸,需要通訊模組足夠安全,不易被攻擊。

為什麼選擇 Pulsar

聯邦通訊服務在做技術預研的時候,考慮過 RPC 直連、HDFS 同步、MQ 同步三種技術方案。考慮到對安全和效能的要求比較高,排除了 RPC 直連和 HDFS 同步方案,確定採用 MQ 同步方案。

MQ 可選的服務很多,比如 Pulsar、Kafka、RabbitMQ、TubeMQ 等。考慮到 Angel PowerFL 對穩定性、可靠性、高效能傳輸和資料安全有很高的需求,我們諮詢了騰訊資料平臺部 MQ 團隊,他們向我們推薦了 Pulsar。

隨後,我們對 Pulsar 開展了深入調研,發現 Pulsar 內建的諸多特性,正好滿足了我們對訊息系統的要求。Pulsar broker 和 bookie 採用了計算儲存分層架構,保證了資料穩定可靠,效能良好;Pulsar 支援跨地域複製(geo­-replication),解決了 PowerFL 跨聯邦同步 MQ 問題;而 Pulsar 的驗證和授權模式也能保證傳輸安全。

雲原生的計算與儲存分層架構

Apache Pulsar 是下一代雲原生分散式訊息和事件流平臺,採用了計算和儲存分層的架構:在 Broker 上進行 Pub/Sub 相關的計算,在 Apache BookKeeper 上儲存資料。

和傳統的訊息平臺(如 Kafka)相比,這種架構有明顯的優勢:

  • Broker 和 bookie 相互獨立,可以獨立擴充套件和容錯,提升系統的可用性。

  • 分割槽儲存不受單個節點儲存容量的限制,資料分佈更均勻。

  • BookKeeper 儲存安全可靠,保證訊息不丟失,同時支援批量刷盤以獲得更高吞吐量。

Pulsar Geo­-replication

Pulsar 原生支援跨地域複製(Geo­-replication),可以在多個數據中心的多個 Pulsar 叢集中同時同步/非同步複製資料。還可以在訊息級別,通過 setReplicationClusters 控制訊息複製到哪些叢集。

在上圖中,無論 Producer P1、P2 和 P3 在什麼時候分別將訊息釋出給 Cluster A、Cluster B 和 Cluster C 中的 topic T1,這些訊息均會立刻複製到整個叢集。一旦完成複製,Consumer C1 和 C2 即可從自己所在的叢集消費這些訊息。

水平擴充套件

由於 Pulsar 的儲存設計基於分片,Pulsar 把主題分區劃分為更小的塊,稱其為分片。每個分片都作為 Apache BookKeeper ledger 來儲存,這樣構成分割槽的分片集合分佈在 Apache BookKeeper 叢集中。這樣設計方便我們管理容量和水平擴充套件,並且滿足高吞吐量的需求。

  • 容量管理簡單:主題分割槽的容量可以擴充套件至整個 BookKeeper 叢集的容量,不受單個節點容量的限制。

  • 擴容簡單:擴容無需重新平衡或複製資料。新增新儲存節點時,新節點僅用於新分片或其副本,Pulsar 自動平衡分片分佈和叢集中的流量。

  • 高吞吐量:寫入流量分佈在儲存層中,不會出現分割槽寫入爭用單個節點資源的情況。

經過深入調研後,我們決定在騰訊 Angel PowerFL 聯邦學習平臺上使用 Apache Pulsar。

基於 Apache Pulsar 的聯邦通訊方案

聯邦學習的各個業務(Angel PowerFL 稱之為 Party,每個 Party 有不同的 ID,如 10000/20000),可能分佈在同個公司的不同部門(無網路隔離),也可能分佈在不同公司(跨公網),各個 Party 之間通過 Pulsar 跨地域複製功能進行同步複製,總體設計方案如下:

聯邦學習的每個訓練任務,通過訊息的 producer 和 consumer 連線所在 Party 的 Pulsar 叢集,叢集名以 fl-pulsar-[partyID] 進行區分,訓練任務產生需要傳輸的中間資料後,生產者將這些資料傳送給本地 Pulsar 叢集。

Pulsar 叢集收到資料後,通過 Pulsar proxy 建立的同步複製網路通道,將資料傳送給使用方 Party。而使用方 Party 的消費者,會一直監聽該訓練任務對應的 topic,當有資料到達後,直接消費資料進行下一步的計算。

在 Angel PowerFL 執行訓練任務時,driver 和每個 partition 會建立一個 channel 型別變數,該變數和 Pulsar 當中具體的 topic 一一對應,需要交換的資料都會經過生產者傳送到這個 topic。

Angel PowerFL 支援多方聯邦,因此會有 2+ 個 Pulsar 叢集需要同步複製資料。每個聯邦學習任務通過各自的 parties 任務引數指定了參與方,生產者在傳送訊息時呼叫 setReplicationClusters 介面,確保資料只在參與 Party 之間傳輸。

在 Angel PowerFL 的通訊模組中,我們充分利用了 Pulsar 的 geo-­replication、topic 限流、Token Authentication 等功能。下面我來詳細介紹如何在 Angel PowerFL 聯邦學習平臺中使用 Pulsar。

Geo­-replication 去掉Global ZooKeeper 依賴

在 Angel PowerFL 聯邦學習平臺上,部署一套完整的 Pulsar 依賴兩個 ZooKeeper 叢集,分別是 Local ZooKeeper 和 Global ZooKeeper。Local ZooKeeper 和 Kafka 中的 ZooKeeper 作用類似,用來儲存元資料。而 Global ZooKeeper 則在 Pulsar 多個叢集間中共享配置資訊。

在 Angel PowerFL 場景中,每個 Party 加入前,都要先部署一個 Global ZooKeeper 的子節點,或者共用一套跨公司或跨地域的公共 ZooKeeper,這樣不僅會增加部署的難度,也會增加被攻擊的風險,不利於新 Party 加入。

Global ZooKeeper 中儲存的元資料,主要是叢集名/服務地址/namespace 許可權等資訊。Pulsar 支援建立和加入新叢集。我們通過以下兩個步驟註冊聯邦 Pulsar 叢集的資訊到 local ZooKeeper,就去除了對 Global ZooKeeper 的依賴:

步驟 1: 註冊新加入 Party 的 Pulsar 叢集

# OTHER_CLUSTER_NAME 為待註冊 Party 的 Pulsar 叢集名
# OTHER_CLUSTER_BROKER_URL為 Pulsar 叢集對應的 broker 地址
./bin/pulsar-admin clusters create ${OTHER_CLUSTER_NAME} 
 --url http://${OTHER_CLUSTER_HTTP_URL} 
 --broker-url pulsar://${OTHER_CLUSTER_BROKER_URL}

步驟 2: 授予訓練用到的 namespace 訪問叢集許可權

./bin/pulsar-admin namespaces set-clusters fl-tenant/${namespace} 
 -clusters ${LOCAL_CLUSTR_NAME},${OTHER_CLUSTER_NAME}

對於新加入的 Party,只用提供與其對應的 Pulsar 的叢集名/服務地址即可完成註冊,geo-replication 就可以通過註冊資訊同步複製資料。

Client 增加 Token 認證

Pulsar 作為 Angel PowerFL 的通訊模組,沒有加入使用者級別的許可權控制。為了進一步保證 client 生產和消費資料的安全,我們參考 Pulsar Client authentication using tokens based on JSON Web Tokens 增加了 token 認證,Angel PowerFL 的訓練任務除了配置當前 Party 使用的服務地址外,還需要配置 admin token。

https://pulsar.apache.org/docs/en/security-jwt/#token-authentication-overview
由於 Angel PowerFL 整套系統部署在 Kubernetes 上,我們通過容器準備 Pulsar 叢集需要的 Public/Private keys 等檔案,然後註冊到 K8S secret 中。

# 生成 fl-private.key 和 fl-public.key
docker run --rm -v "$(pwd)":/tmp 
 apachepulsar/pulsar-all:2.5.2 
 /pulsar/bin/pulsar tokens create-key-pair --output-private-key 
 /tmp/fl-private.key --output-public-key /tmp/fl-public.key
# 生成 admin-token.txt token 檔案
echo -n `docker run --rm -v 
 "$(pwd)":/tmp apachepulsar/pulsar-all:2.5.2 
 /pulsar/bin/pulsar tokens create --private-key 
 file:///tmp/fl-private.key --subject admin`
# 將認證相關的檔案註冊到 K8S
kubectl create secret generic token-symmetric-key 
 --from-file=TOKEN=admin-token.txt 
 --from-file=PUBLICKEY=fl-public.key -n ${PARTY_NAME}

開啟多叢集 topic 自動回收

Pulsar 叢集開啟了 geo-­replication 功能後,無法通過命令直接刪除用過的 topic,而 Angel PowerFL 訓練任務每次使用的任務是一次性的,任務結束後這些 topic 就沒用了,如果不及時刪除會出現大量累積。

對於通過 geo­-replication 開啟複製的 topic,可以配置 brokerDeleteInactivetopicsEnabled 引數,開啟 topic 自動回收。自動回收無用的 topic,需滿足以下幾個條件:

  • 當前 topic 沒有生產者( producer)或者消費者(consumer)連線
  • 當前 topic 沒有被訂閱
  • 當前 topic 沒有需要保留的資訊

Angel PowerFL 部署的 Pulsar 叢集,通過 brokerDeleteInactivetopicsEnabled 開啟 topic 自動回收。在執行訓練任務的過程中,使用後對每個 topic 按回收條件進行處理。同時,我們增加了

brokerDeleteInactivetopicsFrequencySeconds 配置,將回收的頻率設定為 3 小時。

優化 topic 限流

Angel PowerFL 中的訓練任務,在不同的資料集/演算法/執行階段,生產資料的流量峰值也不同。目前生產環境中單個任務最大的資料量超過 200G/小時。訓練過程中,如果 Pulsar 連線中斷或者生產和消費過程出現異常,需要重新開始整個訓練任務。

為了規避 Pulsar 叢集被單個訓練任務沖垮的風險,我們使用了 Pulsar 的限流功能。Pulsar 支援 message-rate 和 byte-rate 兩種生產限流策略,前者限制每秒生產訊息的數量,後者限制每秒生產訊息的大小。Angel PowerFL 將資料切分成多個 4M 的訊息,通過 message-­rate 限制生產訊息的數量。在 Angel PowerFL 中,我們將 namespace 的訊息限制為 30 條(小於<30*4=120M/s):

./bin/pulsar-admin namespaces set-publish-rate fl-tenant/${namespace} -m 30

剛開始測試 message-rate 的限流功能時,出現了限不住的情況(限流設定失效)。騰訊資料平臺部 MQ 團隊負責 Pulsar 的同事幫忙一起排查,發現設定 topicPublisherThrottlingTickTimeMillis 引數後,限制不能生效。

因此我們想辦法在 broker 端啟用了精確的 topic 釋出頻率限制,優化了限流功能並貢獻回社群,詳情見 PR-7078: introduce precise topic publish rate limiting。
https://github.com/apache/pulsar/pull/7078

優化 topic unloading 配置

Pulsar 根據 broker 叢集負載狀況,可以將 topic 動態分配到 broker上。如果擁有該 topic 的broker 宕機,或者擁有該 topic 的 broker 負載過大,則該 topic 會立即重新分配給另一個 broker ;而重新分配的過程就是 topic 的 unloading,該操作意味著關閉 topic,釋放所有者(owner)。

理論上,topic unloading 由負載均衡調整,客戶端將經歷極小的延遲抖動,通常耗時 10ms 左右。但 Angel PowerFL 初期在執行訓練任務時,日誌爆出大量因為 unloading topic 導致的連線異常。日誌顯示 topic unloading 在不斷的重試,但都不成功:

[sub] Could not get connection to broker: topic is temporarily unavailable -- Will try again in 0.1 s

先來看 broker/namespace/bundle/topic 這四者的關係。Bundle 是 Pulsar namespace 的一個分片機制,namespace 被分片為 bundle 列表,每個 bundle 包含 namespace 的整個雜湊範圍的一部分。Topic 不直接分配給 broker,而是通過計算 topic 的雜湊碼將 topic 分配給特定的 bundle;每個 bundle 互相獨立,再被分配到不同的 broker 上。

Angel PowerFL 早期的任務 topic 沒有複用,一個 LR 演算法訓練任務建立了 2000 多個 topic,每個 topic 生產的資料負載也不同,我們判斷上述斷連問題是由於短時間內(最小任務十分鐘內能結束,同時會有多個任務在執行)大量建立和使用 topic,導致負載不均衡,topic unloading 頻繁發生。為了降低 topic unloading 的頻率,我們調整了 Pulsar Bundle 的相關引數:

# 增加 broker 可最大分配 topic 數量
loadBalancerBrokerMaxTopics=500000
# 啟用自動拆分namespace bundle
loadBalancerAutoBundleSplitEnabled=true
# 增加觸發拆分 bundle 的 topic 數量
loadBalancerNamespaceBundleMaxTopics=10000
# 增加觸發拆分 bundle 的訊息數
loadBalancerNamespaceBundleMaxMsgRate=10000

同時,在建立 namespace 時,把 bundle 數量預設設定為 64。

./bin/pulsar-admin namespaces create fl-tenant/${namespace} --bundles 64

經過以上調整,Angel PowerFL 在任務執行期間沒有再出現過由於 topic unloading 導致的斷連。

Pulsar on Kubernetes

Angel PowerFL 的所有服務均通過 Helm 部署在 Kubernetes 上。Pulsar 作為其中的一個 chart,可以很好的利用 K8S 的資源隔離、快速擴縮容等特性。在 Angel PowerFL 使用 Helm 部署 Pulsar 的實踐中,我們總結了以下經驗: