1. 程式人生 > >Flink執行時之統一的資料交換物件

Flink執行時之統一的資料交換物件

統一的資料交換物件

在Flink的執行引擎中,流動的元素主要有兩種:緩衝(Buffer)和事件(Event)。Buffer主要針對使用者資料交換,而Event則用於一些特殊的控制標識。但在實現時,為了在通訊層統一資料交換,Flink提供了資料交換物件——BufferOrEvent。它是一個既可以表示Buffer又可以表示Event的類。上層使用者只需呼叫isBuffer和isEvent方法即可判斷當前收到的這條資料是Buffer還是Event。

緩衝

緩衝(Buffer)是資料交換的載體,幾乎所有的資料(當然事件是特殊的)交換都需要經過Buffer。Buffer底層依賴於Flink自管理記憶體的記憶體段(MemorySegment)作為資料的容器。Buffer在記憶體段上做了一層封裝,這一層封裝是為了對基於引用計數的Buffer回收機制提供支援。

引用計數是計算機程式語言中的一種記憶體管理技術,是指將資源(可以是物件、記憶體或磁碟)的被引用次數儲存起來,當被引用次數變為零時就將其釋放的過程。使用引用計數技術可以實現自動資源管理的目的。具體做法可簡述為:當建立一個物件的例項並在堆上申請記憶體時,物件的引用計數就為1,在其他物件中需要持有這個物件時,就需要把該物件的引用計數加1,需要釋放一個物件時,就將該物件的引用計數減1,直至物件的引用計數為0,物件的記憶體會被釋放。

引用計數還可以指使用引用計數技術回收未使用資源的垃圾回收演算法,Objective-C就是使用這種方式進行記憶體管理的典型語言之一。

它在內部維護著一個計數器referenceCount,初始值為1。記憶體回收由緩衝回收器(BufferRecycler)來完成,回收的物件就是記憶體段(MemorySegment)。

實現引用計數的方法有兩個。第一個為retain,用於將引用計數加一:

public Buffer retain() {   
    synchronized (recycleLock) {
        //預防性檢測,先確認記憶體段是否已被回收      
        ensureNotRecycled();      
        referenceCount++;      
        return this;   
    }
}

第二個為回收(或將引用計數減一)的方法recycle,當引用計數減為0時,BufferRecycler會對記憶體段進行回收:

public
void recycle() { synchronized (recycleLock) { if (--referenceCount == 0) { recycler.recycle(memorySegment); } } }

BufferRecycler介面有一個名為FreeingBufferRecycler的簡單實現者,它的做法是直接釋放記憶體段。當然通常為了分配和回收的效率,會對Buffer進行預先分配然後加入到Buffer池中。所以,BufferRecycler的常規實現是基於緩衝池的。除此之外,還有另一個介面BufferProvider(它約定了Buffer提供者如何以同步和非同步的模式提供Buffer)共同作為緩衝池(BufferPool)的基介面。

整個的Buffer簇的類圖如下:

buffer-package-diagram

緩衝池工廠(BufferPoolFactory)用於建立和銷燬緩衝池,網路緩衝池(NetworkBufferPool)是其唯一的實現者。NetworkBufferPool快取了固定數目的記憶體段,主要用於網路棧通訊。

NetworkBufferPool在構造器的引數中要求指定其快取的記憶體段數目,然後它會初始化固定大小的一個佇列作為記憶體段池。與此同時,構造器引數還允許指定記憶體段大小以及Flink自主管理的記憶體型別。並根據這些引數初始化佇列中的記憶體段:

if (memoryType == MemoryType.HEAP) {   
    for (int i = 0; i < numberOfSegmentsToAllocate; i++) {      
        byte[] memory = new byte[segmentSize];      
        availableMemorySegments.add(MemorySegmentFactory.wrapPooledHeapMemory(memory, null));   
    }
} else if (memoryType == MemoryType.OFF_HEAP) {   
    for (int i = 0; i < numberOfSegmentsToAllocate; i++) {      
        ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize);      
        availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory, null));   
    }
}

上面的程式碼段中呼叫了我們在分析記憶體管理時分析的記憶體段工廠(MemorySegmentFactory),注意這裡wrapPooledXXX方法其實沒什麼特殊的,只是新建了相關的記憶體段例項。不要被其方法名迷惑,所謂的池化的機制都是要在外部維護,比如這裡的NetworkBufferPool定義了維護記憶體段池(也即availableMemorySegments)的一系列方法,比如requestMemorySegment、recycle、destroy等。

因為BufferPool當前只有LocalBufferPool這一個實現,所以NetworkBufferPool在實現BufferPoolFactory的createBufferPool方法時會直接例項化LocalBufferPool。NetworkBufferPool用一個Set維護了其所建立的所有LocalBufferPool的引用。createBufferPool方法要求在建立時指定需要建立的是固定大小的BufferPool還是非固定大小的BufferPool。如果是非固定大小的,NetworkBufferPool也專門提供了一個Set來維護它們,這主要是為了在建立或銷燬BufferPool時對這些非固定大小的BufferPool裡的Buffer進行“重分佈”。

這裡對非固定大小的BufferPool裡的記憶體段進行重分佈值得我們重點關注一下。其實,所有BufferPool所申請的記憶體段都歸屬於NetworkBufferPool所維護的記憶體段池。只有NetworkBufferPool瞭解記憶體段池的所有資訊,包括剩餘可用的記憶體段數目。當createBufferPool方法或者destroyBufferPool方法被呼叫時,對應的可用的記憶體段數目也會相應得產生變化。這時,為了讓記憶體段被合理地分配並加以利用,所有非固定大小的BufferPool都需要根據最新的可用記憶體段數來重分佈其所包含的記憶體段數目。具體的重分佈的實現如下:

private void redistributeBuffers() throws IOException {
    //獲得非固定大小的BufferPool個數   
    int numManagedBufferPools = managedBufferPools.size();   
    //如果沒有,則直接返回,避免除零錯誤
    if (numManagedBufferPools == 0) {      
        return; 
    }   

    //當前總共可用的記憶體段數目(未實際分配)
    int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;   
    //每個BufferPool可額外附“贈”的記憶體段數目   
    int numExcessBuffersPerPool = numAvailableMemorySegment / numManagedBufferPools;   
    //當然可用的記憶體段不一定正好能完全分攤給所有的非固定大小的BufferPool,所以剩下的餘量以輪轉的方式分攤
    int numLeftoverBuffers = numAvailableMemorySegment % numManagedBufferPools;   
    int bufferPoolIndex = 0;   
    //遍歷BufferPool挨個擴充
    for (LocalBufferPool bufferPool : managedBufferPools) {      
        int leftoverBuffers = bufferPoolIndex++ < numLeftoverBuffers ? 1 : 0;      
        bufferPool.setNumBuffers(
            bufferPool.getNumberOfRequiredMemorySegments() + numExcessBuffersPerPool + leftoverBuffers
        );   
    }
}

該方法的呼叫環境必須處於同步塊中。

LocalBufferPool的setNumBuffers方法並不只是設定一下數目這麼簡單,具體的邏輯我們暫且按下不表。我們先來看一下LocalBufferPool的實現,它用於管理從NetworkBufferPool申請到的一組Buffer例項。LocalBufferPool中維護著的一些資訊:

//當前緩衝區池最少需要的記憶體段的數目
private final int numberOfRequiredMemorySegments;

//當前可用的記憶體段,這些記憶體段已從網路緩衝池中請求到本地,但當前沒有被當做緩衝區用於資料傳輸
private final Queue<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();

//註冊過的獲取Buffer可用性的偵聽器,當無Buffer可用時,才可註冊偵聽器
private final Queue<EventListener<Buffer>> registeredListeners = new ArrayDeque<EventListener<Buffer>>();

//緩衝池的當前大小
private int currentPoolSize;

//從網路緩衝池請求的以及以某種形式關聯著的所有的記憶體段數目
private int numberOfRequestedMemorySegments;

LocalBufferPool被例項化時,雖然指定了其所需要的記憶體段的最小數目,但是NetworkBufferPool並沒有將這些記憶體段例項分配給它,也就是說不是預先靜態分配的,而是呼叫方呼叫requestBuffer方法(來自BufferProvider介面),在內部觸發對NetworkBufferPool的例項方法requestMemorySegment的呼叫進而獲取到記憶體段。我們來看一下requestBuffer方法的實現:

private Buffer requestBuffer(boolean isBlocking) throws InterruptedException, IOException {   
    synchronized (availableMemorySegments) {
        //在請求之前,可能需要先返還多餘的,也就是超出currentPoolSize的記憶體段給NetworkBufferPool
        returnExcessMemorySegments();      
        boolean askToRecycle = owner != null;

        //當可用記憶體段佇列為空時,說明已沒有空閒的記憶體段,則可能需要從NetworkBufferPool獲取      
        while (availableMemorySegments.isEmpty()) {         
            if (isDestroyed) {            
                throw new IllegalStateException("Buffer pool is destroyed.");         
            }         
            //獲取的條件是:所請求的總記憶體段的數目小於當前池大小
            if (numberOfRequestedMemorySegments < currentPoolSize) {            
                //請求一個記憶體段
                final MemorySegment segment = networkBufferPool.requestMemorySegment();            
                if (segment != null) {               
                    //所請求的記憶體段總數目加一,並將請求的記憶體段加入到可用記憶體段佇列中,然後跳出本輪while迴圈
                    //注意這裡是continue而不是break,這裡還必須繼續判斷佇列中是否有元素可用,
                    //因為當前物件可能處於分散式的場景下
                    numberOfRequestedMemorySegments++;               
                    availableMemorySegments.add(segment);               
                    continue;            
                }         
            }

            //如果總記憶體段的數目已大於等於本地緩衝池大小,判斷是否需要釋放,如果需要,讓緩衝區池歸屬者釋放一個記憶體段         
            if (askToRecycle) {            
                owner.releaseMemory(1);         
            }
            //如果是阻塞式的請求模式,則對當前佇列阻塞等待兩秒鐘,接著仍然繼續while迴圈         
            if (isBlocking) {            
                availableMemorySegments.wait(2000);         
            } else {            
                //否則,直接返回空並退出迴圈
                return null;         
            }      
        }

        //當有可用記憶體段時,直接從佇列中獲取記憶體段並新建一個Buffer例項      
        return new Buffer(availableMemorySegments.poll(), this);   
    }
}

在上文我們介紹過,在NetworkBufferPool中建立或者銷燬BufferPool時,所有非固定大小的BufferPool會被重分佈。在分析其實現是,我們看到了它會呼叫LocalBufferPool的例項方法setNumBuffers,該方法會調整本地緩衝池的大小,並可能會對其所申請的記憶體段數目產生影響:

public void setNumBuffers(int numBuffers) throws IOException {   
    synchronized (availableMemorySegments) {
        //重分佈後新的Buffer數量不得小於最小要求的記憶體段數量      
        checkArgument(numBuffers >= numberOfRequiredMemorySegments, 
            "Buffer pool needs at least " + numberOfRequiredMemorySegments + 
            " buffers, but tried to set to " + numBuffers + ".");      
        //修改緩衝池容量
        currentPoolSize = numBuffers;      
        //如果當前保有的記憶體段數目大於新的緩衝池容量,則將超出部分歸還
        //注意這裡歸還並不是精確強制歸還的,當本地緩衝池中沒有多餘的記憶體段時,歸還動作將會終止
        returnExcessMemorySegments();      
        //這是第二重保險,如果Buffer存在歸屬者且此時本地緩衝區池中保有的記憶體段仍然大於緩衝池容量
        //則會對多餘的記憶體段進行釋放
        if (owner != null && numberOfRequestedMemorySegments > currentPoolSize) {         
            owner.releaseMemory(numberOfRequestedMemorySegments - numBuffers);      
        }   
    }
}

Buffer是資料交換的載體,在所有涉及到資料交換的地方都會用到它。因此理解其相關的實現對於,理解Flink的整個資料流交換體系非常有幫助。

事件

Flink的資料流中不僅僅只有使用者的資料,還包含了一些特殊的事件,這些事件都是由運算元注入到資料流中的。它們在每個流分割槽裡伴隨著其他的資料元素而被有序地分發。接收到這些事件的運算元會對這些事件給出響應,典型的事件型別有:

  • 檢查點屏障:用於隔離多個檢查點之間的資料,保障快照資料的一致性;
  • 迭代屏障:標識流分割槽已到達了一個超級步的結尾;
  • 子分割槽資料結束標記:當消費任務獲取到該事件時,表示其所消費的對應的分割槽中的資料已被全部消費完成;

事件假設一個流分割槽維持著元素順序。鑑於此,在Flink中一元運算元在消費單一流分割槽時,能夠保證FIFO(先進先出)的元素順序。而為了保證流處理的速率同時避免反壓,運算元有時會接收超過一個流分割槽的元素並將它們合併。綜合各種場景,Flink中的資料流在任何形式的重分割槽或廣播之後不提供順序保證。而對無序元素的處理任務交給運算元自行實現。

在Flink中所有事件的最終基類都是AbstractEvent。AbstractEvent這一抽象類又派生出另一個抽象類RuntimeEvent,幾乎所有預先內建的事件都直接派生於此。除了預定義的事件外,Flink還支援自定義的擴充套件事件,所有自定義的事件都繼承自派生於AbstractEvent的TaskEvent。總結一下,其類繼承關係圖如下:

Event-class-diagram

上圖中繼承自RuntimeEvent的三個事件類就是上文列舉的典型事件。其中只有CheckpointBarrier包含檢查點編號和時間戳這兩個屬性,其他兩個事件類主要起到標識作用。

微信掃碼關注公眾號:Apache_Flink

apache_flink_weichat

QQ掃碼關注QQ群:Apache Flink學習交流群(123414680)

qrcode_for_apache_flink_qq_group

相關推薦

Flink執行統一資料交換物件

統一的資料交換物件 在Flink的執行引擎中,流動的元素主要有兩種:緩衝(Buffer)和事件(Event)。Buffer主要針對使用者資料交換,而Event則用於一些特殊的控制標識。但在實現時,為了在通訊層統一資料交換,Flink提供了資料交換物件——Buf

Flink執行基於Netty的網路通訊中

PartitionRequestClient 分割槽請求客戶端(PartitionRequestClient)用於發起遠端PartitionRequest請求,它也是RemoteChannel跟Netty通訊層之間進行銜接的物件。 對單一的TaskMan

Flink執行生成作業圖

生成作業圖 在分析完了流處理程式生成的流圖(StreamGraph)以及批處理程式生成的優化後的計劃(OptimizedPlan)之後,下一步就是生成它們面向Flink執行時執行引擎的共同抽象——作業圖(JobGraph)。 什麼是作業圖 作業圖(

Flink執行基於Netty的網路通訊(下)

客戶端核心處理器 這一篇,我們分析一下客戶端協議棧中的核心的處理器PartitionRequestClientHandler,該處理器用於處理服務端的響應訊息。 我們以客戶端獲取到響應之後回撥該處理器的channelRead方法為入口來進行分析:

ajaxjson資料交換格式學習

日期:2018—10—4 今日學習將Javabean物件轉換成json資料物件 1、學習ajax資料交換 2、學習如何將Javabean物件轉換成json資料物件 目前,學習了ajax響應的資料格式有3種(字串,xml,json),今晚學習ajax中最受

Objective-C Runtime 執行五:協議與分類

Objective-C中的分類允許我們通過給一個類新增方法來擴充它(但是通過category不能新增新的例項變數),並且我們不需要訪問類中的程式碼就可以做到。 Objective-C中的協議是普遍存在的介面定義方式,即在一個類中通過@protocol定義介面,在另外

Objective-C Runtime 執行六:拾遺

前面幾篇基本介紹了runtime中的大部分功能,包括對類與物件、成員變數與屬性、方法與訊息、分類與協議的處理。runtime大部分的功能都是圍繞這幾點來實現的。 本章的內容並不算重點,主要針對前文中對Objective-C Runtime Reference內容遺漏

Objective-C Runtime 執行之一:類與物件

Objective-C語言是一門動態語言,它將很多靜態語言在編譯和連結時期做的事放到了執行時來處理。這種動態語言的優勢在於:我們寫程式碼時更具靈活性,如我們可以把訊息轉發給我們想要的物件,或者隨意交換一個方法的實現等。 這種特性意味著Objective-C不僅需要一

Objective-C Runtime 執行二:成員變數與屬性

在前面一篇文章中,我們介紹了Runtime中與類和物件相關的內容,從這章開始,我們將討論類實現細節相關的內容,主要包括類中成員變數,屬性,方法,協議與分類的實現。 本章的主要內容將聚集在Runtime對成員變數與屬性的處理。在討論之前,我們先介紹一個重要的概念:型別

Objective-C Runtime 執行三:方法與訊息

前面我們討論了Runtime中對類和物件的處理,及對成員變數與屬性的處理。這一章,我們就要開始討論Runtime中最有意思的一部分:訊息處理機制。我們將詳細討論訊息的傳送及訊息的轉發。不過在討論訊息之前,我們先來了解一下與方法相關的一些內容。 基礎資料型別 SEL

Objective-C Runtime 執行四:Method Swizzling

理解Method Swizzling是學習runtime機制的一個很好的機會。在此不多做整理,僅翻譯由Mattt Thompson發表於nshipster的Method Swizzling一文。 Method Swizzling是改變一個selector的實際實現的

[ObjectC]Runtime執行三:方法與訊息

這一章,我們就要開始討論Runtime中最有意思的一部分:訊息處理機制。我們將詳細討論訊息的傳送及訊息的轉發。 基礎資料型別 SEL SEL又叫選擇器,是表示一個方法的selector的指標,其定義如下:typedef struct objc_selector *SEL;o

iOS Runtime 執行三:訊息處理機制

前面我們討論了Runtime中對類和物件的處理,及對成員變數與屬性的處理。這一章,我們就要開始討論Runtime中最有意思的一部分:訊息處理機制。我們將詳細討論訊息的傳送及訊息的轉發。不過在討論訊息之前,我們先來了解一下與方法相關的一些內容。 基礎資料型別

趣談iOS執行方法呼叫原理

導語 訊息轉發 OC的動態語言特性 1動態型別 2動態繫結 3動態載入 導語 一個成熟的計算機語言必然有豐富的體系,複雜的容錯機制,處理邏輯以及判斷邏輯。但這些複雜的邏輯都是圍繞一個主線豐富和展開的,所以在學習計算機語言的時候,先掌握核心

iOS學習筆記56(Runtime)-Objective-C Runtime 執行三:方法與訊息

前面我們討論了Runtime中對類和物件的處理,及對成員變數與屬性的處理。這一章,我們就要開始討論Runtime中最有意思的一部分:訊息處理機制。我們將詳細討論訊息的傳送及訊息的轉發。不過在討論訊息之前,我們先來了解一下與方法相關的一些內容。 基礎資料型別 SEL

統一資料交換平臺(服務匯流排)的三大特點

服務匯流排特點比較明顯,主要以下3點: 1.訊息管理:訊息訂閱與釋出(支援對等、非對等資料交換,跨平臺、不同資料庫、系統介面規範);可靠性(支援斷點續傳)。 2.流程管理:跨流程(節點),流程的自動監控。 3.叢集服務:資料平臺。多個服務實現統一發布。

iOS執行的用途一 -- 交換方法

前言 執行時的的交換方法也叫黑魔法,在許多的第三方框架都使用了,例如AFN等 步驟 一、獲得兩個需要交換的方法 Method aMethod = class_getClassMeth

JVM執行資料區域

1. 程式計數器 程式計數器(Program Counter Register)是一塊較小的記憶體空間,它可以看做是當前執行緒所執行的位元組碼的行號指示器。在虛擬機器的概念模型裡(僅是概念模型,各種虛擬機器可能會通過一些更高效的方式去實現),位元組碼直譯器工作時就是通

JAVA虛擬機器結構執行資料

jvm的執行時資料區根據用途一共可以分為這幾類:pc寄存機,java虛擬機器棧,java堆,方法區,執行時常量池,本地方法棧。其中java堆,方法區,執行時常量是公有的資料區,隨著虛擬機器的啟動而建立,隨著虛擬的退出而銷燬。而pc暫存器,java虛擬機器棧,本地方法棧則是執行緒私有的

JVM執行資料區域詳解

1、程式計數器                 程式計數器是一塊較小的記憶體空間,它可以看作是當前執行緒所執行的位元組碼的行號指示器。虛擬機器工作時就是通過改變改變計數器的值來選取下一條需要執行的位元組碼指