1. 程式人生 > >Apache Pulsar 在騰訊 Angel PowerFL 聯邦學習平臺上的實踐

Apache Pulsar 在騰訊 Angel PowerFL 聯邦學習平臺上的實踐

## 騰訊 Angel PowerFL 聯邦學習平臺 聯邦學習作為新一代人工智慧基礎技術,通過解決資料隱私與資料孤島問題,重塑金融、醫療、城市安防等領域。 騰訊 Angel PowerFL 聯邦學習平臺構建在 Angel 機器學習平臺上,利用 Angel-­PS 支援萬億級模型訓練的能力,將很多在 Worker 上的計算提升到 PS(引數伺服器) 端;Angel PowerFL 為聯邦學習演算法提供了計算、加密、儲存、狀態同步等基本操作介面,通過流程排程模組協調參與方任務執行狀態,而通訊模組完成了任務訓練過程中所有資料的傳輸。Angel PowerFL 聯邦學習已經在騰訊金融雲、騰訊廣告聯合建模等業務中開始落地,並取得初步的效果。 Angel 機器學習平臺:https://github.com/Angel-ML ![](https://img2020.cnblogs.com/blog/2125205/202009/2125205-20200915192204671-541898522.png) ## Angel PowerFL 對聯邦通訊服務的要求 Angel PowerFL 聯邦學習平臺在訓練任務過程當中,對參與方之間的訊息通訊要求極高,要求訊息系統必須穩定可靠、保持高效能且能保證資料安全。Angel PowerFL 的學習任務在訓練過程當中,參與方之間會有大量的加密資料通過通訊模組傳輸,Angel PowerFL 對通訊服務有以下需求: **➡️ 穩定可靠** Angel PowerFL 的學習任務時長從幾分鐘到幾小時,演算法執行對資料的準確性要求很高,不同演算法的資料傳輸峰值也不一樣,這需要通訊模組的服務足夠穩定,並且不能丟資料。 **➡️ 高效能傳輸** Angel PowerFL 底層通過 Spark 進行計算,Executor 併發執行會產生很多待傳輸的中間資料,通訊模組需要將這些加密後的資料及時傳輸給對方,這就要求通訊服務做到低延時、高吞吐量。 **➡️ 資料安全** 雖然 Angel PowerFL 所有資料都通過加密模組進行了加密,但參與聯邦學習的業務可能分佈在不同公司;跨公網進行傳輸,需要通訊模組足夠安全,不易被攻擊。 ## 為什麼選擇 Pulsar 聯邦通訊服務在做技術預研的時候,考慮過 RPC 直連、HDFS 同步、MQ 同步三種技術方案。考慮到對安全和效能的要求比較高,排除了 RPC 直連和 HDFS 同步方案,確定採用 MQ 同步方案。 MQ 可選的服務很多,比如 Pulsar、Kafka、RabbitMQ、TubeMQ 等。**考慮到 Angel PowerFL 對穩定性、可靠性、高效能傳輸和資料安全有很高的需求,我們諮詢了騰訊資料平臺部 MQ 團隊,他們向我們推薦了 Pulsar。** 隨後,我們對 Pulsar 開展了深入調研,發現 Pulsar 內建的諸多特性,正好滿足了我們對訊息系統的要求。Pulsar broker 和 bookie 採用了計算儲存分層架構,保證了資料穩定可靠,效能良好;Pulsar 支援跨地域複製(geo­-replication),解決了 PowerFL 跨聯邦同步 MQ 問題;而 Pulsar 的驗證和授權模式也能保證傳輸安全。 ### **雲原生的計算與儲存分層架構** Apache Pulsar 是下一代雲原生分散式訊息和事件流平臺,採用了計算和儲存分層的架構:在 Broker 上進行 Pub/Sub 相關的計算,在 Apache BookKeeper 上儲存資料。 和傳統的訊息平臺(如 Kafka)相比,這種架構有明顯的優勢: * Broker 和 bookie 相互獨立,可以獨立擴充套件和容錯,提升系統的可用性。 * 分割槽儲存不受單個節點儲存容量的限制,資料分佈更均勻。 * BookKeeper 儲存安全可靠,保證訊息不丟失,同時支援批量刷盤以獲得更高吞吐量。 ![](https://img2020.cnblogs.com/blog/2125205/202009/2125205-20200915192219929-710464991.png) ### **Pulsar Geo­-replication** Pulsar 原生支援跨地域複製(Geo­-replication),可以在多個數據中心的多個 Pulsar 叢集中同時同步/非同步複製資料。還可以在訊息級別,通過 setReplicationClusters 控制訊息複製到哪些叢集。 ![](https://img2020.cnblogs.com/blog/2125205/202009/2125205-20200915192227923-389868878.png) 在上圖中,無論 Producer P1、P2 和 P3 在什麼時候分別將訊息釋出給 Cluster A、Cluster B 和 Cluster C 中的 topic T1,這些訊息均會立刻複製到整個叢集。一旦完成複製,Consumer C1 和 C2 即可從自己所在的叢集消費這些訊息。 ### **水平擴充套件** 由於 Pulsar 的儲存設計基於分片,Pulsar 把主題分區劃分為更小的塊,稱其為分片。每個分片都作為 Apache BookKeeper ledger 來儲存,這樣構成分割槽的分片集合分佈在 Apache BookKeeper 叢集中。這樣設計方便我們管理容量和水平擴充套件,並且滿足高吞吐量的需求。 * **容量管理簡單:**主題分割槽的容量可以擴充套件至整個 BookKeeper 叢集的容量,不受單個節點容量的限制。 * **擴容簡單:**擴容無需重新平衡或複製資料。新增新儲存節點時,新節點僅用於新分片或其副本,Pulsar 自動平衡分片分佈和叢集中的流量。 * **高吞吐量:**寫入流量分佈在儲存層中,不會出現分割槽寫入爭用單個節點資源的情況。 經過深入調研後,我們決定在騰訊 Angel PowerFL 聯邦學習平臺上使用 Apache Pulsar。 ## 基於 Apache Pulsar 的聯邦通訊方案 聯邦學習的各個業務(Angel PowerFL 稱之為 Party,每個 Party 有不同的 ID,如 10000/20000),可能分佈在同個公司的不同部門(無網路隔離),也可能分佈在不同公司(跨公網),各個 Party 之間通過 Pulsar 跨地域複製功能進行同步複製,總體設計方案如下: ![](https://img2020.cnblogs.com/blog/2125205/202009/2125205-20200915192237367-1613979756.png) 聯邦學習的每個訓練任務,通過訊息的 producer 和 consumer 連線所在 Party 的 Pulsar 叢集,叢集名以 fl-pulsar-[partyID] 進行區分,訓練任務產生需要傳輸的中間資料後,生產者將這些資料傳送給本地 Pulsar 叢集。 Pulsar 叢集收到資料後,通過 Pulsar proxy 建立的同步複製網路通道,將資料傳送給使用方 Party。而使用方 Party 的消費者,會一直監聽該訓練任務對應的 topic,當有資料到達後,直接消費資料進行下一步的計算。 ![](https://img2020.cnblogs.com/blog/2125205/202009/2125205-20200915192244288-546690948.png) 在 Angel PowerFL 執行訓練任務時,driver 和每個 partition 會建立一個 channel 型別變數,該變數和 Pulsar 當中具體的 topic 一一對應,需要交換的資料都會經過生產者傳送到這個 topic。 Angel PowerFL 支援多方聯邦,因此會有 2+ 個 Pulsar 叢集需要同步複製資料。每個聯邦學習任務通過各自的 parties 任務引數指定了參與方,生產者在傳送訊息時呼叫 `setReplicationClusters` 介面,確保資料只在參與 Party 之間傳輸。 在 Angel PowerFL 的通訊模組中,我們充分利用了 Pulsar 的 geo-­replication、topic 限流、Token Authentication 等功能。下面我來詳細介紹如何在 Angel PowerFL 聯邦學習平臺中使用 Pulsar。 ### Geo­-replication 去掉Global ZooKeeper 依賴 在 Angel PowerFL 聯邦學習平臺上,部署一套完整的 Pulsar 依賴兩個 ZooKeeper 叢集,分別是 Local ZooKeeper 和 Global ZooKeeper。Local ZooKeeper 和 Kafka 中的 ZooKeeper 作用類似,用來儲存元資料。而 Global ZooKeeper 則在 Pulsar 多個叢集間中共享配置資訊。 ![](https://img2020.cnblogs.com/blog/2125205/202009/2125205-20200915192330443-1258883463.png) 在 Angel PowerFL 場景中,每個 Party 加入前,都要先部署一個 Global ZooKeeper 的子節點,或者共用一套跨公司或跨地域的公共 ZooKeeper,這樣不僅會增加部署的難度,也會增加被攻擊的風險,不利於新 Party 加入。 Global ZooKeeper 中儲存的元資料,主要是叢集名/服務地址/namespace 許可權等資訊。Pulsar 支援建立和加入新叢集。我們通過以下兩個步驟註冊聯邦 Pulsar 叢集的資訊到 local ZooKeeper,就去除了對 Global ZooKeeper 的依賴: **步驟 1:** 註冊新加入 Party 的 Pulsar 叢集 ``` # OTHER_CLUSTER_NAME 為待註冊 Party 的 Pulsar 叢集名 # OTHER_CLUSTER_BROKER_URL為 Pulsar 叢集對應的 broker 地址 ./bin/pulsar-admin clusters create ${OTHER_CLUSTER_NAME} --url http://${OTHER_CLUSTER_HTTP_URL} --broker-url pulsar://${OTHER_CLUSTER_BROKER_URL} ``` **步驟 2:** 授予訓練用到的 namespace 訪問叢集許可權 ``` ./bin/pulsar-admin namespaces set-clusters fl-tenant/${namespace} -clusters ${LOCAL_CLUSTR_NAME},${OTHER_CLUSTER_NAME} ``` 對於新加入的 Party,只用提供與其對應的 Pulsar 的叢集名/服務地址即可完成註冊,geo-replication 就可以通過註冊資訊同步複製資料。 ### **Client 增加 Token 認證** Pulsar 作為 Angel PowerFL 的通訊模組,沒有加入使用者級別的許可權控制。為了進一步保證 client 生產和消費資料的安全,我們參考 Pulsar Client authentication using tokens based on JSON Web Tokens 增加了 token 認證,Angel PowerFL 的訓練任務除了配置當前 Party 使用的服務地址外,還需要配置 admin token。 https://pulsar.apache.org/docs/en/security-jwt/#token-authentication-overview 由於 Angel PowerFL 整套系統部署在 Kubernetes 上,我們通過容器準備 Pulsar 叢集需要的 Public/Private keys 等檔案,然後註冊到 K8S secret 中。 ``` # 生成 fl-private.key 和 fl-public.key docker run --rm -v "$(pwd)":/tmp apachepulsar/pulsar-all:2.5.2 /pulsar/bin/pulsar tokens create-key-pair --output-private-key /tmp/fl-private.key --output-public-key /tmp/fl-public.key # 生成 admin-token.txt token 檔案 echo -n `docker run --rm -v "$(pwd)":/tmp apachepulsar/pulsar-all:2.5.2 /pulsar/bin/pulsar tokens create --private-key file:///tmp/fl-private.key --subject admin` # 將認證相關的檔案註冊到 K8S kubectl create secret generic token-symmetric-key --from-file=TOKEN=admin-token.txt --from-file=PUBLICKEY=fl-public.key -n ${PARTY_NAME} ``` ### **開啟多叢集 topic 自動回收** Pulsar 叢集開啟了 geo-­replication 功能後,無法通過命令直接刪除用過的 topic,而 Angel PowerFL 訓練任務每次使用的任務是一次性的,任務結束後這些 topic 就沒用了,如果不及時刪除會出現大量累積。 對於通過 geo­-replication 開啟複製的 topic,可以配置 `brokerDeleteInactivetopicsEnabled` 引數,開啟 topic 自動回收。自動回收無用的 topic,需滿足以下幾個條件: * 當前 topic 沒有生產者( producer)或者消費者(consumer)連線 * 當前 topic 沒有被訂閱 * 當前 topic 沒有需要保留的資訊 Angel PowerFL 部署的 Pulsar 叢集,通過 brokerDeleteInactivetopicsEnabled 開啟 topic 自動回收。在執行訓練任務的過程中,使用後對每個 topic 按回收條件進行處理。同時,我們增加了 brokerDeleteInactivetopicsFrequencySeconds 配置,將回收的頻率設定為 3 小時。 ### **優化 topic 限流** Angel PowerFL 中的訓練任務,在不同的資料集/演算法/執行階段,生產資料的流量峰值也不同。目前生產環境中單個任務最大的資料量超過 200G/小時。訓練過程中,如果 Pulsar 連線中斷或者生產和消費過程出現異常,需要重新開始整個訓練任務。 為了規避 Pulsar 叢集被單個訓練任務沖垮的風險,我們使用了 Pulsar 的限流功能。Pulsar 支援 message-rate 和 byte-rate 兩種生產限流策略,前者限制每秒生產訊息的數量,後者限制每秒生產訊息的大小。Angel PowerFL 將資料切分成多個 4M 的訊息,通過 message-­rate 限制生產訊息的數量。在 Angel PowerFL 中,我們將 namespace 的訊息限制為 30 條(小於<30*4=120M/s): ``` ./bin/pulsar-admin namespaces set-publish-rate fl-tenant/${namespace} -m 30 ``` 剛開始測試 message-rate 的限流功能時,出現了限不住的情況(限流設定失效)。騰訊資料平臺部 MQ 團隊負責 Pulsar 的同事幫忙一起排查,發現設定 topicPublisherThrottlingTickTimeMillis 引數後,限制不能生效。 因此我們想辦法在 broker 端啟用了精確的 topic 釋出頻率限制,優化了限流功能並貢獻回社群,詳情見 **PR-7078:** **introduce precise topic publish rate limiting。** https://github.com/apache/pulsar/pull/7078 ### **優化 topic unloading 配置** Pulsar 根據 broker 叢集負載狀況,可以將 topic 動態分配到 broker上。如果擁有該 topic 的broker 宕機,或者擁有該 topic 的 broker 負載過大,則該 topic 會立即重新分配給另一個 broker ;而重新分配的過程就是 topic 的 unloading,該操作意味著關閉 topic,釋放所有者(owner)。 理論上,topic unloading 由負載均衡調整,客戶端將經歷極小的延遲抖動,通常耗時 10ms 左右。但 Angel PowerFL 初期在執行訓練任務時,日誌爆出大量因為 unloading topic 導致的連線異常。日誌顯示 topic unloading 在不斷的重試,但都不成功: ``` [sub] Could not get connection to broker: topic is temporarily unavailable -- Will try again in 0.1 s ``` 先來看 broker/namespace/bundle/topic 這四者的關係。Bundle 是 Pulsar namespace 的一個分片機制,namespace 被分片為 bundle 列表,每個 bundle 包含 namespace 的整個雜湊範圍的一部分。Topic 不直接分配給 broker,而是通過計算 topic 的雜湊碼將 topic 分配給特定的 bundle;每個 bundle 互相獨立,再被分配到不同的 broker 上。 Angel PowerFL 早期的任務 topic 沒有複用,一個 LR 演算法訓練任務建立了 2000 多個 topic,每個 topic 生產的資料負載也不同,我們判斷上述斷連問題是由於短時間內(最小任務十分鐘內能結束,同時會有多個任務在執行)大量建立和使用 topic,導致負載不均衡,topic unloading 頻繁發生。為了降低 topic unloading 的頻率,我們調整了 Pulsar Bundle 的相關引數: ``` # 增加 broker 可最大分配 topic 數量 loadBalancerBrokerMaxTopics=500000 # 啟用自動拆分namespace bundle loadBalancerAutoBundleSplitEnabled=true # 增加觸發拆分 bundle 的 topic 數量 loadBalancerNamespaceBundleMaxTopics=10000 # 增加觸發拆分 bundle 的訊息數 loadBalancerNamespaceBundleMaxMsgRate=10000 ``` 同時,在建立 namespace 時,把 bundle 數量預設設定為 64。 ``` ./bin/pulsar-admin namespaces create fl-tenant/${namespace} --bundles 64 ``` 經過以上調整,Angel PowerFL 在任務執行期間沒有再出現過由於 topic unloading 導致的斷連。 ### **Pulsar on Kubernetes** Angel PowerFL 的所有服務均通過 Helm 部署在 Kubernetes 上。Pulsar 作為其中的一個 chart,可以很好的利用 K8S 的資源隔離、快速擴縮容等特性。在 Angel PowerFL 使用 Helm 部署 Pulsar 的實踐中,我們總結了以下經驗: **