1. 程式人生 > >Spark2.2(三十八):Spark Structured Streaming2.4之前版本使用agg和dropduplication消耗記憶體比較多的問題(Memory issue with spark structured streaming)調研

Spark2.2(三十八):Spark Structured Streaming2.4之前版本使用agg和dropduplication消耗記憶體比較多的問題(Memory issue with spark structured streaming)調研

在spark中《Memory usage of state in Spark Structured Streaming》講解Spark記憶體分配情況,以及提到了HDFSBackedStateStoreProvider儲存多個版本的影響;從stackoverflow上也可以看到別人遇到了structured streaming中記憶體問題,同時也對問題做了分析《Memory issue with spark structured streaming》;另外可以從spark的官網問題修復列表中檢視到如下內容:

1)在流聚合中從值中刪除冗餘金鑰資料(Split out min retain version of state for memory in HDFSBackedStateStoreProvider)

問題描述:

HDFSBackedStateStoreProvider has only one configuration for minimum versions to retain of state which applies to both memory cache and files. As default version of "spark.sql.streaming.minBatchesToRetain" is set to high (100), which doesn't require strictly 100x of memory, but I'm seeing 10x ~ 80x of memory consumption for various workloads. In addition, in some cases, requiring 2x of memory is even unacceptable, so we should split out configuration for memory and let users adjust to trade-off memory usage vs cache miss.

In normal case, default value '2' would cover both cases: success and restoring failure with less than or around 2x of memory usage, and '1' would only cover success case but no longer require more than 1x of memory. In extreme case, user can set the value to '0' to completely disable the map cache to maximize executor memory.

修復情況:

對應官網bug情況概述《[SPARK-24717][SS] Split out max retain version of state for memory in HDFSBackedStateStoreProvider #21700》、《Split out min retain version of state for memory in HDFSBackedStateStoreProvider

相關知識:

Spark Structrued Streaming原始碼分析--(三)Aggreation聚合狀態儲存與更新

HDFSBackedStateStoreProvider儲存state的目錄結構在該文章中介紹的,另外這些檔案是一個系列,建議可以多讀讀,下邊借用作者文章中的圖展示下state儲存目錄結構:

 

 

2)在HDFSBackedStateStoreProvider中為記憶體分配最大保留版本的狀態(Remove redundant key data from value in streaming aggregation

問題描述:

Key/Value of state in streaming aggregation is formatted as below:

  • key: UnsafeRow containing group-by fields
  • value: UnsafeRow containing key fields and another fields for aggregation results

which data for key is stored to both key and value.

This is to avoid doing projection row to value while storing, and joining key and value to restore origin row to boost performance, but while doing a simple benchmark test, I found it not much helpful compared to "project and join". (will paste test result in comment)

So I would propose a new option: remove redundant in stateful aggregation. I'm avoiding to modify default behavior of stateful aggregation, because state value will not be compatible between current and option enabled.

修復情況:

對應官網bug情況概述《[SPARK-24763][SS] Remove redundant key data from value in streaming aggregation #21733》、《Remove redundant key data from value in streaming aggregation