對Spark的那些【魔改】
原文連結:https://blog.csdn.net/D55dffdh/article/details/82423831
前言
這兩年做 streamingpro 時,不可避免的需要對Spark做大量的增強。就如同我之前吐槽的,Spark大量使用了new進行物件的建立,導致裡面的實現基本沒有辦法進行替換。
比如SparkEnv裡有個屬性叫closureSerializer,是專門做任務的序列化反序列化的,當然也負責對函式閉包的序列化反序列化。我們看看內部是怎麼實現的:
這裡直接new了一個JavaSerializer,並不能做配置。如果不改原始碼,你沒有任何辦法可以替換掉掉這個實現。同理,如果我想替換掉Executor的實現,基本也是不可能的。
今年有兩個大地方涉及到了對Spark的【魔改】,也就是不通過改原始碼,使用原有髮型包,通過新增新程式碼的方式來對Spark進行增強。
二層RPC的支援
我們知道,在Spark裡,我們只能通過Task才能touch到Executor。現有的API你是沒辦法直接操作到所有或者指定部分的Executor。比如,我希望所有Executor都載入一個資原始檔,現在是沒辦法做到的。為了能夠對Executor進行直接的操作,那就需要建立一個新的通訊層。那具體怎麼做呢?
首先,在Driver端建立一個Backend,這個比較簡單,
這樣,你可以理解為在Driver端啟動了一個PRC Server。要執行這段程式碼也非常簡單,直接在主程式裡執行即可:
這裡我們需要實現local模式和cluster模式兩種。
Driver啟動了一個PRC Server,那麼Executor端如何啟動呢?Executor端似乎沒有任何一個地方可以讓我啟動一個PRC Server? 其實有的,只是非常trick,我們知道Spark是允許自定義Metrics的,並且會呼叫使用者實現的metric特定的方法,我們只要開發一個metric Sink,在裡面啟動RPC Server,騙過Spark即可。具體時下如下:
到這裡,我們就能成功啟動RPC Server,並且連線上Driver中的PRC Server。現在,你就可以在不修改Spark 原始碼的情況下,盡情的寫通訊相關的程式碼了,讓你可以更好的控制Executor。
比如在PSExecutorBackend 實現如下程式碼:
接著你就可以在Spark裡寫如下的程式碼呼叫了:
是不是很酷。
修改閉包的序列化方式
Spark的任務排程開銷非常大。對於一個複雜的任務,業務邏輯程式碼執行時間大約是3-7ms,但是整個spark執行的開銷大概是1.3s左右。
經過詳細dig發現,sparkContext裡RDD轉化時,會對函式進行clean操作,clean操作的過程中,預設會檢查是不是能序列化(就是序列化一遍,沒丟擲異常就算可以序列化)。而序列化成本相當高(預設使用的JavaSerializer並且對於函式和任務序列化,是不可更改的),單次序列化耗時就達到200ms左右,在local模式下對其進行優化,可以減少600ms左右的請求時間。
當然,需要申明的是,這個是針對local模式進行修改的。那具體怎麼做的呢?
我們先看看Spark是怎麼呼叫序列化函式的,首先在SparkContext裡,clean函式是這樣的:
呼叫的是ClosureCleaner.clean方法,該方法裡是這麼呼叫學序列化的:
SparkEnv是在SparkContext初始化的時候建立的,該物件裡面包含了closureSerializer,該物件通過new JavaSerializer建立。既然序列化太慢,又因為我們其實是在Local模式下,本身是可以不需要序列化的,所以我們這裡想辦法把closureSerializer的實現替換掉。正如我們前面吐槽,因為在Spark程式碼裡寫死了,沒有暴露任何自定義的可能性,所以我們又要魔改一下了。
首先,我們新建一個SparkEnv的子類:
接著實現一個自定義的Serializer:
接著我們需要再封裝一個LocalNonOpSerializer,
現在,萬事俱備,只欠東風了,我們怎麼才能把這些程式碼讓Spark執行起來。具體做法非常魔幻,實現一個enhance類:
完工。
其實還有很多
比如在Spark裡,Python Worker預設一分鐘沒有被使用是會被殺死的,但是在StreamingPro裡,這些python worker因為都要載入模型,所以啟動成本是非常高的,殺了之後再啟動就沒辦法忍受了,通過類似的方式進行魔改,從而使得空閒時間是可配置的。如果大家感興趣,可以翻看StreamingPro相關程式碼。