1. 程式人生 > >生產者-消費者模型在Hudi中的應用

生產者-消費者模型在Hudi中的應用

介紹

生產者-消費者模型用於解耦生產者與消費者,平衡兩者之間的能力不平衡,該模型廣泛應用於各個系統中,Hudi也使用了該模型控制對記錄的處理,即記錄會被生產者生產至佇列中,然後由消費者從佇列中消費,更具體一點,對於更新操作,生產者會將檔案中老的記錄放入佇列中等待消費者消費,消費後交由HoodieMergeHandle處理;對於插入操作,生產者會將新記錄放入佇列中等待消費者消費,消費後交由HandleCreateHandle處理。

入口

前面的文章中提到過無論是HoodieCopyOnWriteTable#handleUpdate處理更新時直接生成了一個SparkBoundedInMemoryExecutor

物件,還是HoodieCopyOnWriteTable#handleInsert處理插入時生成了一個CopyOnWriteLazyInsertIterable物件,再迭代時呼叫該物件的CopyOnWriteLazyInsertIterable#computeNext方法生成SparkBoundedInMemoryExecutor物件。最後兩者均會呼叫SparkBoundedInMemoryExecutor#execute開始記錄的處理,該方法核心程式碼如下

  public E execute() {
    try {
      ExecutorCompletionService<Boolean> producerService = startProducers();
      Future<E> future = startConsumer();
      // Wait for consumer to be done
      return future.get();
    } catch (Exception e) {
      throw new HoodieException(e);
    }
  }

該方法會啟動所有生產者和單個消費者進行處理。

Hudi定義了BoundedInMemoryQueueProducer介面表示生產者,其子類實現如下

  • FunctionBasedQueueProducer,基於Function來生產記錄,在合併日誌log檔案和資料parquet檔案時使用,以便提供RealTimeView
  • IteratorBasedQueueProducer,基於迭代器來生產記錄,在插入更新時使用。

定義了BoundedInMemoryQueueConsumer類表示消費者,其主要子類實現如下

  • CopyOnWriteLazyInsertIterable$CopyOnWriteInsertHandler,主要處理CopyOnWrite
    表型別時的插入。
    • MergeOnReadLazyInsertIterable$MergeOnReadInsertHandler,主要處理MergeOnRead

表型別時的插入,其為CopyOnWriteInsertHandler的子類。

  • CopyOnWriteLazyInsertIterable$UpdateHandler,主要處理CopyOnWrite表型別時的更新。

整個生產消費相關的類繼承結構非常清晰。

對於生產者的啟動,startProducers方法核心程式碼如下

  public ExecutorCompletionService<Boolean> startProducers() {
    // Latch to control when and which producer thread will close the queue
    final CountDownLatch latch = new CountDownLatch(producers.size());
    final ExecutorCompletionService<Boolean> completionService =
        new ExecutorCompletionService<Boolean>(executorService);
    producers.stream().map(producer -> {
      return completionService.submit(() -> {
        try {
          preExecute();
          producer.produce(queue);
        } catch (Exception e) {
          logger.error("error producing records", e);
          queue.markAsFailed(e);
          throw e;
        } finally {
          synchronized (latch) {
            latch.countDown();
            if (latch.getCount() == 0) {
              // Mark production as done so that consumer will be able to exit
              queue.close();
            }
          }
        }
        return true;
      });
    }).collect(Collectors.toList());
    return completionService;
  }

該方法使用CountDownLatch來協調生產者執行緒與消費者執行緒的退出動作,然後呼叫produce方法開始生產,對於插入更新時的IteratorBasedQueueProducer而言,其核心程式碼如下

  public void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception {
    ...
    while (inputIterator.hasNext()) {
      queue.insertRecord(inputIterator.next());
    }
    ...
  }

可以看到只要迭代器還有記錄(可能為插入時的新記錄或者更新時的舊記錄),就會往佇列中不斷寫入。

對於消費者的啟動,startConsumer方法的核心程式碼如下

  private Future<E> startConsumer() {
    return consumer.map(consumer -> {
      return executorService.submit(() -> {
        ...
        preExecute();
        try {
          E result = consumer.consume(queue);
          return result;
        } catch (Exception e) {
          queue.markAsFailed(e);
          throw e;
        }
      });
    }).orElse(CompletableFuture.completedFuture(null));
  }

消費時會先進行執行前的準備,然後開始消費,其中consume方法的核心程式碼如下

  public O consume(BoundedInMemoryQueue<?, I> queue) throws Exception {
    Iterator<I> iterator = queue.iterator();

    while (iterator.hasNext()) {
      consumeOneRecord(iterator.next());
    }

    // Notifies done
    finish();

    return getResult();
  }

可以看到只要佇列中還有記錄,就可以獲取該記錄,然後呼叫不同BoundedInMemoryQueueConsumer子類的consumeOneRecord進行更新插入處理。

值得一提的是Hudi對佇列進行了流控,生產者不能無限制地將記錄寫入佇列中,佇列快取的大小由使用者配置,佇列能放入記錄的條數由取樣的記錄大小和佇列快取大小控制。

在生產時,會呼叫BoundedInMemoryQueue#insertRecord將記錄寫入佇列,其核心程式碼如下

  public void insertRecord(I t) throws Exception {
    ...
    rateLimiter.acquire();
    // We are retrieving insert value in the record queueing thread to offload computation
    // around schema validation
    // and record creation to it.
    final O payload = transformFunction.apply(t);
    adjustBufferSizeIfNeeded(payload);
    queue.put(Option.of(payload));
  }

首先獲取一個許可(Semaphore),未成功獲取會被阻塞直至成功獲取,然後獲取記錄的負載以便調整佇列,然後放入內部佇列(LinkedBlockingQueue)中,其中adjustBufferSizeIfNeeded方法的核心程式碼如下

  private void adjustBufferSizeIfNeeded(final O payload) throws InterruptedException {
    if (this.samplingRecordCounter.incrementAndGet() % RECORD_SAMPLING_RATE != 0) {
      return;
    }

    final long recordSizeInBytes = payloadSizeEstimator.sizeEstimate(payload);
    final long newAvgRecordSizeInBytes =
        Math.max(1, (avgRecordSizeInBytes * numSamples + recordSizeInBytes) / (numSamples + 1));
    final int newRateLimit =
        (int) Math.min(RECORD_CACHING_LIMIT, Math.max(1, this.memoryLimit / newAvgRecordSizeInBytes));

    // If there is any change in number of records to cache then we will either release (if it increased) or acquire
    // (if it decreased) to adjust rate limiting to newly computed value.
    if (newRateLimit > currentRateLimit) {
      rateLimiter.release(newRateLimit - currentRateLimit);
    } else if (newRateLimit < currentRateLimit) {
      rateLimiter.acquire(currentRateLimit - newRateLimit);
    }
    currentRateLimit = newRateLimit;
    avgRecordSizeInBytes = newAvgRecordSizeInBytes;
    numSamples++;
  }

首先看是否已經達到取樣頻率,然後計算新的記錄平均大小和限流速率,如果新的限流速率大於當前速率,則可釋放一些許可(供阻塞的生產者獲取後繼續生產),否則需要獲取(回收)一些許可(許可變少後生產速率自然就降低了)。該操作可根據取樣的記錄大小動態調節速率,不至於在記錄負載太大和記錄負載太小時,放入同等個數,從而起到動態調節作用。

在消費時,會呼叫BoundedInMemoryQueue#readNextRecord讀取記錄,其核心程式碼如下

  private Option<O> readNextRecord() {
    ...
    rateLimiter.release();
    Option<O> newRecord = Option.empty();
    while (expectMoreRecords()) {
      try {
        throwExceptionIfFailed();
        newRecord = queue.poll(RECORD_POLL_INTERVAL_SEC, TimeUnit.SECONDS);
        if (newRecord != null) {
          break;
        }
      } catch (InterruptedException e) {
        throw new HoodieException(e);
      }
    }
    ...

    if (newRecord != null && newRecord.isPresent()) {
      return newRecord;
    } else {
      // We are done reading all the records from internal iterator.
      this.isReadDone.set(true);
      return Option.empty();
    }
  }

可以看到首先會釋放一個許可,然後判斷是否還可以讀取記錄(還在生產或者停止生產但佇列不為空都可讀取),然後從內部佇列獲取記錄或返回。

上述便是生產者-消費者在Hudi中應用的分析。

總結

Hudi採用了生產者-消費者模型來控制記錄的處理,與傳統多生產者-多消費者模型不同的是,Hudi現在只支援多生產者-單消費者模型,單消費者意味著Hudi暫時不支援檔案的併發寫入。而對於生產消費的佇列的實現,Hudi並未僅僅只是基於LinkedBlockingQueue,而是採用了更精細化的速率控制,保證速率會隨著記錄負載大小的變化和配置的佇列快取大小而動態變化,這也降低了系統發生OOM的概率。

相關推薦

生產者-消費者模型Hudi應用

介紹 生產者-消費者模型用於解耦生產者與消費者,平衡兩者之間的能力不平衡,該模型廣泛應用於各個系統中,Hudi也使用了該模型控制對記錄的處理,即記錄會被生產者生產至佇列中,然後由消費者從佇列中消費,更具體一點,對於更新操作,生產者會將檔案中老的記錄放入佇列中等待消費者消費,消費後交由HoodieMergeHa

生產者消費者模型在專案的實際應用

前言   不知道大家有沒有遇到過這種情況,某個介面的響應時間會隨著請求量的變大而越來越慢,明明CPU已經馬力全開了怎麼還會越來越慢。又或者是客戶端呼叫http介面,對於客戶端來說只是一個入庫操作就可以直接返回結果繼續處理了,而你還要比較尷尬的拿著傳過來的資料做一堆比較耗時的操作,導致這個介面的整體吞吐量上

pygame應用——生產者消費者模型

init 註釋 def __name__ cap pac type 消費者 屏幕 因為操作系統的一個生產者-消費者拓展作業,以一個飛機大戰的模型修改來的 1 import pygame 2 import time 3 from pygame.locals im

Python生產者消費者模型

ssi png 問題 import odi rand com 共享數據 守護 了解知識點: 1、守護進程: ·什麽是守護進程: 守護進程其實就是一個‘子進程’,守護即伴隨,守護進程會伴隨主進程的代碼運行完畢後而死掉 ·為何用守護進程: 當該子進程內的代碼在父進程代碼運行完畢

分散式Web應用----基於Socket+動態代理實現簡單RPC 生產者消費者模型

寫在前面 前面一文主要簡單介紹了JAVA動態代理基本原理,這也是實現RPC的基本知識,這裡我們運用Socket簡單實現一個遠端過程呼叫,方便大家理解RPC的基本原理,希望對大家有所幫助。 新建People介面類與Man實現類 介面類 pu

多執行緒生產者消費者模型

為什麼要使用生產者和消費者模式 線上程世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題

生產者消費者模型執行緒怎樣正常退出

生產者:不停地往佇列中放資料 消費者:不停地從佇列中拿資料 兩者通過兩個訊號量同步 當生產者不再生產資料時,消費者正好掛在一個訊號量上,處於睡眠狀態,這時候pthread_join也會一直掛著的。該

如何在 Java 正確使用 wait, notify 和 notifyAll – 以生產者消費者模型為例

wait, notify 和 notifyAll,這些在多執行緒中被經常用到的保留關鍵字,在實際開發的時候很多時候卻並沒有被大家重視。本文對這些關鍵字的使用進行了描述。 在 Java 中可以用 wait、notify 和 notifyAll 來實現執行緒間的通訊。。舉個例

進擊的Python【第九章】:paramiko模塊、線程與進程、各種線程鎖、queue隊列、生產者消費者模型

password locking form maxsize 廁所 sorted [0 hostname nbsp 一、paramiko模塊 他是什麽東西?   paramiko模塊是用python語言寫的一個模塊,遵循SSH2協議,支持以加密和認證的方式,進行遠程服務器的連

轉: 【Java並發編程】之十三:生產者消費者模型(含代碼)

tool boolean 通知 阻塞 上一個 [] ble 否則 線程 轉載請註明出處:http://blog.csdn.net/ns_code/article/details/17249321 生產者消費者問題是線程模型中的經典問題:生產者和消費者在同一時間段

綜合運用: C++11 多線程下生產者消費者模型詳解(轉)

並發 rep 生產 我會 交流 模型 操作 const ref 生產者消費者問題是多線程並發中一個非常經典的問題,相信學過操作系統課程的同學都清楚這個問題的根源。本文將就四種情況分析並介紹生產者和消費者問題,它們分別是:單生產者-單消費者模型,單生產者-多消費者模型,多生產

進程與生產者消費者模型

import lin sin master odin roc tip 進程 Coding #!/usr/src/python# -*- coding:utf-8 -*-# from multiprocessing import Process# import time###

#queue隊列 #生產者消費者模型

後進先出 item col con pri args lifo out done 1 #queue隊列 #生產者消費者模型 2 3 #queue隊列 #有順序的容器 4 #程序解耦 5 #提高運行效率 6 7 #clas

生產者消費者模型

世界 one 並發編程 而是 targe sta pre splay color 生產者消費者模型 在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。 為什麽要使用生產者和消費者

基於線程實現的生產者消費者模型(Object.wait(),Object.notify()方法)

生產者 nbsp 來看 spa res strong read 在哪裏 那一刻 需求背景 利用線程來模擬生產者和消費者模型 系統建模 這個系統涉及到三個角色,生產者,消費者,任務隊列,三個角色之間的關系非常簡單,生產者和消費者擁有一個任務隊列的引用,生產者負責往隊列中放置對

python學習——day9(ssh,線程和進程,信號量,隊列,生產者消費者模型) Alex地址:http://www.cnblogs.com/alex3714/articles/5230609.html

png 接口 count() day bound 共享 car 共享內存 top 一、python上模擬ssh 1.ssh,ssh_ftp pass 2.ssh 密鑰 pass 二、線程,進程 定義: 進程: 是對各種資源管理的集合,qq 要以一個整體的形式暴露給操

Queue 實現生產者消費者模型

pri join() imp 等待 tar 構造函數 nowait import 長度 Python中,隊列是線程間最常用的交換數據的形式。 Python Queue模塊有三種隊列及構造函數: 1、Python Queue模塊的FIFO隊列先進先出。 class Queue

Python生產者消費者模型

bsp 圖片 sch Coding print gpo con targe int 用多線程和隊列來實現生產者消費者模型 # -*- coding:utf-8 -*- __author__ = "MuT6 Sch01aR" import threading import

java並發之生產者消費者模型

isf tof on() acc sum sca span empty poll 生產者和消費者模型是操作系統中經典的同步問題。該問題最早由Dijkstra提出,用以

線程通信之生產者消費者模型

釋放 另一個 gpo 停止 product @override getname -s rup   線程通信,是指線程之間的消息傳遞。   多個線程在操作同一個資源時,它們對共享資源的操作動作可能不同;它們共享同一個資源,互為條件,相互依賴,相互通信,從而讓任務向前推進。