1. 程式人生 > 其它 >Caffeine 原始碼、架構、原理(史上最全,10W超級字長文)

Caffeine 原始碼、架構、原理(史上最全,10W超級字長文)

文章很長,而且持續更新,建議收藏起來,慢慢讀!瘋狂創客圈總目錄 部落格園版 為您奉上珍貴的學習資源 :

免費贈送 :《尼恩Java面試寶典》 持續更新+ 史上最全 + 面試必備 2000頁+ 面試必備 + 大廠必備 +漲薪必備
免費贈送 經典圖書:《Java高併發核心程式設計(卷1)加強版》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
免費贈送 經典圖書:《Java高併發核心程式設計(卷2)加強版》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
免費贈送 經典圖書:《Java高併發核心程式設計(卷3)加強版》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
免費贈送 經典圖書:《尼恩Java面試寶典 V11》

面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
免費贈送 資源寶庫: Java 必備 百度網盤資源大合集 價值>10000元 加尼恩領取

本地快取的使用場景

場景1:突發性hotkey場景

突發性hotkey導致的分散式快取效能變差、快取擊穿的場景

什麼是熱Key

在某段時間內某個key收到的訪問次數,顯著高於其他key時,我們可以將其稱之為熱key。

例如,某redis的每秒訪問總量為10000,而其中某個key的每秒訪問量達到了7000,這種情況下,我們稱該key為熱key。

熱key帶來的問題

1.熱Key佔用大量的Redis CPU時間使其效能變差並影響其它請求;

2.Redis Cluster中各node流量不均衡,造成Redis Cluster的分散式優勢無法被Client利用,

一個分片負載很高,而其它分片十分空閒從而產生讀/寫熱點問題;

3.熱Key的請求壓力數量超出Redis的承受能力造成快取擊穿,此時大量強求將直接指向後端儲存將其打掛並影響到其它業務;

熱key出現的典型業務

預期外的訪問量抖增,如突然出現的爆款商品,訪問量暴漲的熱點新聞,直播間某大主播搞活動大量的刷屏點贊。

解決方案

通過分散式計算來探測熱點key

分散式計算元件,計算出來之後, 並通知叢集內其他機器。

其他機器, 本地快取HotKey,

場景2:常規性hotkey場景

部門組織機構資料

人員型別資料

解決方案

本地快取HotKey,

通過釋出訂閱解決資料一致性問題

本地快取的主要技術

Java快取技術可分為分散式快取和本地快取,分散式快取在後面的 100Wqps 三級快取元件中,再細緻介紹。

先看本地快取。

本地快取的代表技術主要有HashMap,Guava Cache,Caffeine和Encahche。

HashMap

通過Map的底層方式,直接將需要快取的物件放在記憶體中。

優點:簡單粗暴,不需要引入第三方包,比較適合一些比較簡單的場景。

缺點:沒有快取淘汰策略,定製化開發成本高。

Guava Cache

Guava Cache是由Google開源的基於LRU替換演算法的快取技術。

但Guava Cache由於被下面即將介紹的Caffeine全面超越而被取代。

優點:支援最大容量限制,兩種過期刪除策略(插入時間和訪問時間),支援簡單的統計功能。

缺點:springboot2和spring5都放棄了對Guava Cache的支援。

Caffeine

Caffeine採用了W-TinyLFU(LUR和LFU的優點結合)開源的快取技術。

快取效能接近理論最優,屬於是Guava Cache的增強版。

Encache

Ehcache是一個純java的程序內快取框架,具有快速、精幹的特點。

是hibernate預設的cacheprovider。

優點:

支援多種快取淘汰演算法,包括LFU,LRU和FIFO;

快取支援堆內快取,堆外快取和磁碟快取;

支援多種叢集方案,解決資料共享問題。

說明:本文會分析 Caffeine的原始碼,後面可以對照分析一下 Guava Cache的原始碼。

本地快取的優缺點

1. 快但是量少:訪問速度快,但無法進行大資料儲存

本地快取相對於分散式快取的好處是,由於資料不需要跨網路傳輸,故效能更好,

但是由於佔用了應用程序的記憶體空間,如 Java 程序的 JVM 記憶體空間,故不能進行大資料量的資料儲存。

2. 需要解決資料一致性問題:叢集的資料更新問題

與此同時,本地快取只支援被該應用程序訪問,一般無法被其他應用程序訪問,故在應用程序的叢集部署當中,

如果對應的資料庫資料,存在資料更新,則需要同步更新不同部署節點的本地快取的資料來包保證資料一致性,

複雜度較高並且容易出錯,如基於 rocketmq 的釋出訂閱機制來同步更新各個部署節點。

3.更新低可靠,容易丟失: 資料隨應用程序的重啟而丟失

由於本地快取的資料是儲存在應用程序的記憶體空間的,所以當應用程序重啟時,本地快取的資料會丟失。

所以對於需要更改然後持久化的資料,需要注意及時儲存,否則可能會造成資料丟失。

說明:本文會持續更新,最新PDF電子版本,請從下面的連接獲取: 語雀 或者 碼雲

和快取相關的幾個核心概念

快取汙染

快取汙染,指留存在快取中的資料,實際不會被再次訪問了,但又佔據了快取空間。

換句話說,由於快取空間有限,熱點資料被置換或者驅逐出去了,而一些後面不用到的資料卻反而被留下來,從而快取資料命中率急劇下降

要解決快取汙染的關鍵點是能識別出熱點資料,或者未來更有可能被訪問到的資料

換句話說: 是要提升 快取資料命中率

快取命中率

快取命中率是 一個快取元件是否好用的 核心指標之一,而命中率又和快取元件本身的快取資料淘汰演算法息息相關。

命中:可以直接通過快取獲取到需要的資料。

不命中:無法直接通過快取獲取到想要的資料,需要再次查詢資料庫或者執行其它的操作。原因可能是由於快取中根本不存在,或者快取已經過期。

通常來講,快取的命中率越高則表示使用快取的收益越高,應用的效能越好(響應時間越短、吞吐量越高),抗併發的能力越強。

由此可見,在高併發的網際網路系統中,快取的命中率是至關重要的指標。

而 快取的命中率 的提升,和 快取資料淘汰演算法 , 密切相關。

常見的快取資料淘汰演算法

主要的快取資料淘汰演算法(也叫做快取資料驅逐演算法),有三種:

  • FIFO (Fist in first out) 先進先出演算法

    如果一個數據最先進入快取中,則應該最早淘汰掉。

  • LRU (Least recently used) 最近最少使用演算法

    如果資料最近被訪問過,那麼將來被訪問的機率也更高。

  • LFU (Least frequently used) 最近很少使用演算法

    如果一個數據在最近一段時間內使用次數很少,使用頻率最低,那麼在將來一段時間內被使用的可能性也很小。

說明:本文會持續更新,最新PDF電子版本,請從下面的連接獲取: 語雀 或者 碼雲

參考一下:Redis的8種快取淘汰策略

從能否解決快取汙染這一維度來分析Redis的8種快取淘汰策略:

  • noeviction策略:不會淘汰資料,解決不了。
  • volatile-ttl策略:給資料設定合理的過期時間。當快取寫滿時,會淘汰剩餘存活時間最短的資料,避免滯留在快取中,造成汙染。
  • volatile-random策略:隨機選擇資料,無法把不再訪問的資料篩選出來,會造成快取汙染。
  • volatile-lru策略:LRU策略只考慮資料的訪問時效,對只訪問一次的資料,不能很快篩選出來。
  • volatile-lfu策略:LFU策略在LRU策略基礎上進行了優化,篩選資料時優先篩選並淘汰訪問次數少的資料。
  • allkeys-random策略:隨機選擇資料,無法把不再訪問的資料篩選出來,會造成快取汙染。
  • allkeys-lru策略:LRU策略只考慮資料的訪問時效,對只訪問一次的資料,不能很快篩選出來。
  • allkeys-lfu策略:LFU策略在LRU策略基礎上進行了優化,篩選資料時優先篩選並淘汰訪問次數少的資料。
快取淘汰策略 解決快取汙染
noeviction策略 不能
volatile-ttl策略
volatile-random策略 不能
volatile-lru策略 不能
volatile-lfu策略
allkeys-random策略 不能
allkeys-lru策略 不能
allkeys-lfu策略

說明:本文會持續更新,最新PDF電子版本,請從下面的連接獲取: 語雀 或者 碼雲

1 FIFO 先進先出演算法

FIFO(First in First out)先進先出。可以理解為是一種類似佇列的演算法實現

演算法:

如果一個數據最先進入快取中,則應該最早淘汰掉。

換句話說:最先進來的資料,被認為在未來被訪問的概率也是最低的

因此,當規定空間用盡且需要放入新資料的時候,會優先淘汰最早進來的資料

優點:

最簡單、最公平的一種資料淘汰演算法,邏輯簡單清晰,易於實現

缺點:

這種演算法邏輯設計所實現的快取的命中率是比較低的,因為沒有任何額外邏輯能夠儘可能的保證常用資料不被淘汰掉

演示:

下面簡單演示了FIFO的工作過程,假設存放元素尺寸是3,且佇列已滿,放置元素順序如下圖所示,當來了一個新的資料“ldy”後,因為元素數量到達了閾值,則首先要進行太淘汰置換操作,然後加入新元素,

操作如圖展示:

2 LRU —— 適用於 區域性突發流量場景

LRU(The Least Recently Used)最近最久未使用演算法。

演算法:

如果一個數據最近很少被訪問到,那麼被認為在未來被訪問的概率也是最低的,當規定空間用盡且需要放入新資料的時候,會優先淘汰最久未被訪問的資料

演示:

下圖展示了LRU簡單的工作過程,訪問時對資料的提前操作,以及資料滿且新增新資料的時候淘汰的過程的展示如下:

此處介紹的LRU是有明顯的缺點,

如上所述,對於偶發性、週期性的資料沒有良好的抵抗力,很容易就造成快取的汙染,影響命中率,

因此衍生出了很多的LRU演算法的變種,用以處理這種偶發冷資料突增的場景,

比如:LRU-K、Two Queues等,目的就是當判別資料為偶發或週期的冷資料時,不會存入空間內,從而降低熱資料的淘汰率。

優點:

LRU 實現簡單,在一般情況下能夠表現出很好的命中率,是一個“價效比”很高的演算法。

LRU可以有效的對訪問比較頻繁的資料進行保護,也就是針對熱點資料的命中率提高有明顯的效果。

LRU區域性突發流量場景,對突發性的稀疏流量(sparse bursts)表現很好。

缺點:

在存在 週期性的區域性熱點 資料場景,有大概率可能造成快取汙染

最近訪問的資料,並不一定是週期性資料,比如把全量的資料做一次迭代,那麼LRU 會產生較大的快取汙染,因為週期性的區域性熱點資料,可能會被淘汰。

演進一:LRU-K

下圖展示了LRU-K的簡單工作過程,

簡單理解,LRU中的K是指資料被訪問K次,傳統LRU與此對比則可以認為傳統LRU是LRU-1。

可以看到LRU-K有兩個佇列,新來的元素先進入到歷史訪問佇列中,該佇列用於記錄元素的訪問次數,採用的淘汰策略是LRU或者FIFO,當歷史佇列中的元素訪問次數達到K的時候,才會進入快取佇列。

演進二:Two Queues

下圖展示了Two Queues的工作過程,

Two Queues與LRU-K相比,他也同樣是兩個佇列,不同之處在於,他的佇列一個是快取佇列,一個是FIFO佇列,

當新元素進來的時候,首先進入FIFO佇列,當該佇列中的元素被訪問的時候,會進入LRU佇列,

過程如下:

實際案例

Guava Cache是Google Guava工具包中的一個非常方便易用的本地化快取實現,基於LRU演算法實現,支援多種快取過期策略。由於Guava的大量使用,Guava Cache也得到了大量的應用。

Guava的loading cache是使用LRU 的淘汰策略, 但是很多場景,最近的資料不一定熱,存在週期性的熱點資料,而LRU反而容易把稍舊的週期熱資料擠出去,

3 LFU —— 適用於 區域性週期性流量場景

LFU(The Least Frequently Used)最近很少使用演算法,

如果一個數據在最近一段時間內使用次數很少,使用頻率最低,那麼在將來一段時間內被使用的可能性也很小。

與LRU的區別在於LRU是以時間先後來衡量,LFU是以時間段內的使用次數衡量

演算法:

如果一個數據在一定時間內被訪問的次數很低,那麼被認為在未來被訪問的概率也是最低的,

當規定空間用盡且需要放入新資料的時候,會優先淘汰時間段內訪問次數最低的資料

演示:

下面描述了LFU的簡單工作過程,首先是訪問元素增加元素的訪問次數,從而提高元素在佇列中的位置,降低淘汰優先順序,

後面是插入新元素的時候,因為佇列已經滿了,所以優先淘汰在一定時間間隔內訪問頻率最低的元素

優點:

LFU適用於 區域性週期性流量場景,在這個場景下,比LRU有更好的快取命中率。

在 區域性週期性流量場景下, LFU是以次數為基準,所以更加準確,自然能有效的保證和提高命中率

缺點:

LFU 有幾個的缺點:

第一,因為LFU需要記錄資料的訪問頻率,因此需要額外的空間;

第二,它需要給每個記錄項維護頻率資訊,每次訪問都需要更新,這是個巨大的開銷;

第三,在存在 區域性突發流量場景下,有大概率可能造成快取汙染, 演算法命中率會急劇下降,這也是他最大弊端。 所以,LFU 對突發性的稀疏流量(sparse bursts)是無效的。

why:LFU 對突發性的稀疏流量無效呢?

總體來說,LFU 按照訪問次數或者訪問頻率取勝,這個次數有一個累計的長週期, 導致前期經常訪問的資料,訪問次數很大,或者說權重很高,

新來的快取資料, 哪怕他是突發熱點,但是,新資料的訪問次數累計的時間太短, 在老人面試,是個矮個子

LFU 就想一個企業,有點論資排輩,排斥性新人,新人進來,都需要吃苦頭,哪怕他是明日之星

所以,LFU 演算法中,老的記錄已經佔用了快取,過去的一些大量被訪問的記錄,在將來不一定會繼續是熱點資料,但是就一直把“坑”佔著了,而那些偶然的突破熱點資料,不太可能會被保留下來,而是被淘汰。

所以,在存在突發性的稀疏流量下,LFU中的偶然的、稀疏的突發流量在訪問頻率上,不佔優勢,很容易被淘汰,造成快取汙染和未來快取命中率下降。

LRU 和 LFU 的對比

LRU 實現簡單,在一般情況下能夠表現出很好的命中率,是一個“價效比”很高的演算法,平時也很常用。

LRU 的優點之一對突發性的稀疏流量(sparse bursts)表現很好。

但是,LRU 這個優點也帶來一個缺點:

對於週期性的區域性熱點資料,有大概率可能造成快取汙染

最近訪問的資料,並不一定是週期性資料,比如把全量的資料做一次迭代,那麼LRU 會產生較大的快取汙染,因為週期性的資料,可能會被淘汰。

如果是 週期性區域性熱點資料 ,那麼 LFU 可以達到最高的命中率。

但是 LFU 有仨個大的缺點:

第一,因為LFU需要記錄資料的訪問頻率,因此需要額外的空間;

第二,它需要給每個記錄項維護頻率資訊,每次訪問都需要更新,這是個巨大的時間開銷;

第三,對突發性的區域性熱點資料/稀疏流量(sparse bursts),演算法命中率會急劇下降,這也是他最大弊端。

無論 LRU 還是 LFU 都有其各自的缺點,不過,現在已經有很多針對其缺點而改良、優化出來的變種演算法。

說明:本文會持續更新,最新PDF電子版本,請從下面的連接獲取: 語雀 或者 碼雲

4、TinyLFU

TinyLFU 就是其中一個優化演算法,它是專門為了解決 LFU 上述提到的三個問題而被設計出來的。

第1:如何減少訪問頻率的儲存,所帶來的空間開銷

第2:如何減少訪問記錄的更新,所帶來的時間開銷

第3:如果提升對區域性熱點資料的 演算法命中率

解決第1個問題/第2個問題是採用了 Count–Min Sketch 演算法。

解決第二個問題是讓老的訪問記錄,儘量降低“新鮮度”(Freshness Mechanism)

首先:如何解決 訪問頻率 維護的時間開銷和空間開銷

解決措施:使用Count-Min Sketch演算法儲存訪問頻率,極大的節省空間;並且減少hash碰撞。

關於Count-Min Sketch演算法,可以看作是布隆過濾器的同源的演算法,

假如我們用一個hashmap來儲存每個元素的訪問次數,那這個量級是比較大的,並且hash衝突的時候需要做一定處理,否則資料會產生很大的誤差,

如果用hashmap的方式,相同的下標變成連結串列,這種方式會佔用很大的記憶體,而且速度也不是很快。

其實一個hash函式會衝突是比較低的,布隆過濾器 的優化之一,設定多個hash函式,多個hash函式,個個都衝突的概率就微乎其微了。

Count-Min Sketch演算法將一個hash操作,擴增為多個hash,這樣原來hash衝突的概率就降低了幾個等級,且當多個hash取得資料的時候,取最低值,也就是Count Min的含義所在。

Sketch 是草圖、速寫的意思。

將要介紹的 Count–Min Sketch 的原理跟 Bloom Filter 一樣,只不過 Bloom Filter 只有 0 和 1 的值,那麼你可以把 Count–Min Sketch 看作是“數值”版的 Bloom Filter。

布隆過濾器原理

布隆過濾器是由一個固定大小的二進位制向量或者點陣圖(bitmap)和一系列對映函式組成的。

在初始狀態時,對於長度為 m 的位陣列,它的所有位都被置為0,如下圖所示:

當有變數被加入集合時,通過 K 個對映函式將這個變數對映成點陣圖中的 K 個點,把它們置為 1。

查詢某個變數的時候我們只要看看這些點是不是都是 1 就可以大概率知道集合中有沒有它了

  • 如果這些點有任何一個 0,則被查詢變數一定不在;
  • 如果都是 1,則被查詢變數很可能存在。為什麼說是可能存在,而不是一定存在呢?那是因為對映函式本身就是雜湊函式,雜湊函式是會有碰撞的。

誤判率:布隆過濾器的誤判是指多個輸入經過雜湊之後在相同的bit位置1了,這樣就無法判斷究竟是哪個輸入產生的,因此誤判的根源在於相同的 bit 位被多次對映且置 1。這種情況也造成了布隆過濾器的刪除問題,因為布隆過濾器的每一個 bit 並不是獨佔的,很有可能多個元素共享了某一位。如果我們直接刪除這一位的話,會影響其他的元素。

特性

  • 一個元素如果判斷結果為存在的時候元素不一定存在,但是判斷結果為不存在的時候則一定不存在。
  • 布隆過濾器可以新增元素,但是不能刪除元素。因為刪掉元素會導致誤判率增加。

說明:本文會持續更新,最新PDF電子版本,請從下面的連接獲取: 語雀 或者 碼雲

Count-Min Sketch演算法原理

下圖展示了Count-Min Sketch演算法簡單的工作原理:

  1. 假設有四個hash函式,每當元素被訪問時,將進行次數加1;
  2. 此時會按照約定好的四個hash函式進行hash計算找到對應的位置,相應的位置進行+1操作;
  3. 當獲取元素的頻率時,同樣根據hash計算找到4個索引位置;
  4. 取得四個位置的頻率資訊,然後根據Count Min取得最低值作為本次元素的頻率值返回,即Min(Count);

Count-Min Sketch演算法詳細實現方案如下:

如何進行Count-Min Sketch訪問次數的空間開銷?

用4個hash函式會存訪問次數,那空間就是4倍了。怎麼優化呢

解決辦法是:

訪問次數超過15次其實是很熱的資料了,沒必要存太大的數字。所以我們用4位就可以存到15了。

一個long有64位,可以存16個4位。

一個訪問次數佔4個位,一個long有64位,可以存 16個訪問次數, 4個訪問一次一組的話, 一個long 可以分為4組。

一個 key 對應到 4個hash 值, 也就是 4個 訪問次數,那麼,一個long 可以分為儲存 4個Key的 訪問 次數。

最終, 一個long對應的陣列大小其實是容量的4倍了。

其次,如果提升對區域性熱點資料的 演算法命中率

答案是,降鮮機制

為了讓快取降低“新鮮度”,剔除掉過往頻率很高,但之後不經常的快取,Caffeine 有一個 Freshness Mechanism。

做法很簡答,就是當整體的統計計數(當前所有記錄的頻率統計之和,這個數值內部維護)達到某一個值時,那麼所有記錄的頻率統計除以 2。

說明:本文會持續更新,最新PDF電子版本,請從下面的連接獲取: 語雀 或者 碼雲

TinyLFU 的演算法流程

TinyLFU’s architecture is illustrated in Figure 1.

Here, the cache eviction policy picks a cache victim, while TinyLFU decides if replacing the cache victim with the new item is expected to increase the hit-ratio.

當快取空間不夠的時候,TinyLFU 找到 要淘汰的元素 (the cache victim),也就是使用頻率最小的元素 ,

然後 TinyLFU 決定 將新元素放入快取,替代 將 要淘汰的元素 (the cache victim)

具體的流程如下:

5 W-TinyLFU

Caffeine 通過測試發現 TinyLFU 在面對突發性的稀疏流量(sparse bursts)時表現很差,

why?

因為新的記錄(new items)還沒來得及建立足夠的頻率就被剔除出去了,這就使得命中率下降。

W-TinyLFU是如何演進過來的呢?

首先 W-TinyLFU 看名字就能大概猜出來,它是 LFU 的變種,也是TinyLFU的變種, 當然, 也是一種快取淘汰演算法。

W-TinyLFU是如何演進過來的呢?

前面講到:

  • LRU能很好的 處理 區域性突發流量

  • LFU能很好的 處理 區域性週期流量

so, 取其精華去其糟粕,結合二者的優點

W-TinyLFU = LRU + LFU

當然,總是有個是大股東,這就是 LFU, 或者說是 TinyLFU

so: W-TinyLFU(Window Tiny Least Frequently Used)是對TinyLFU的的優化和加強,加入 LRU 以應對區域性突發流量, 從而實現快取命中率的最優。

W-TinyLFU的資料架構

W-TinyLFU 是怎麼引入 LRU 的呢?他增加了一個 W-LRU視窗佇列 的元件。

當一個數據進來的時候,會進行篩選比較,進入W-LRU視窗佇列,經過淘汰後進入Count-Min Sketch演算法過濾器,通過訪問訪問頻率判決, 是否進入快取。

W-TinyLFU 的設計如下所示:

  • W-LRU視窗佇列 用於應 對 區域性突發流量

  • TinyLFU 用於 應對 區域性週期流量

如果一個數據最近被訪問的次數很低,那麼被認為在未來被訪問的概率也是最低的,當規定空間用盡的時候,會優先淘汰最近訪問次數很低的資料;

進一步的分治和解耦

W-TinyLFU將快取儲存空間分為兩個大的區域:Window Cache和Main Cache,

Window Cache是一個標準的LRU Cache,Main Cache則是一個SLRU(Segmemted LRU)cache,

Main Cache進一步劃分為Protected Cache(保護區)和Probation Cache(考察區)兩個區域,這兩個區域都是基於LRU的Cache。

Protected 是一個受保護的區域,該區域中的快取項不會被淘汰。

而且經過實驗發現當 window 區配置為總容量的 1%,剩餘的 99%當中的 80%分給 protected 區,20%分給 probation 區時,這時整體效能和命中率表現得最好,所以 Caffeine 預設的比例設定就是這個。

不過這個比例 Caffeine 會在執行時根據統計資料(statistics)去動態調整,如果你的應用程式的快取隨著時間變化比較快的話,或者說具備的突發特點資料多,那麼增加 window 區的比例可以提高命中率,

如果週期性熱地資料多,快取都是比較固定不變的話,增加 Main Cache 區(protected 區 +probation 區)的比例會有較好的效果。

說明:本文會持續更新,最新PDF電子版本,請從下面的連接獲取: 語雀 或者 碼雲

W-TinyLFU的演算法流程

當 window 區滿了,就會根據 LRU 把 candidate(即淘汰出來的元素)放到 Probation區域,

Probation區域則是一個觀察區,當有新的快取項需要進入Probation區時,

如果Probation區空間已滿,則會將新進入的快取項與Probation區中根據LRU規則需要被淘汰(victim)的快取項進行比較,兩個進行“PK”,勝者留在 probation,輸者就要被淘汰了。

TinyLFU寫入機制為:

當有新的快取項寫入快取時,會先寫入Window Cache區域,當Window Cache空間滿時,最舊的快取項會被移出Window Cache。

如果Probation Cache未滿,從Window Cache移出的快取項會直接寫入Probation Cache;

如果Probation Cache已滿,則會根據TinyLFU演算法確定從Window Cache移出的快取項是丟棄(淘汰)還是寫入Probation Cache。

Probation Cache中的快取項如果訪問頻率達到一定次數,會提升到Protected Cache;

如果Protected Cache也滿了,最舊的快取項也會移出Protected Cache,然後根據TinyLFU演算法確定是丟棄(淘汰)還是寫入Probation Cache。

TinyLFU淘汰機制為:

從Window Cache或Protected Cache移出的快取項稱為Candidate,Probation Cache中最舊的快取項稱為Victim。

如果Candidate快取項的訪問頻率大於Victim快取項的訪問頻率,則淘汰掉Victim。

如果Candidate小於或等於Victim的頻率,那麼如果Candidate的頻率小於5,則淘汰掉Candidate;否則,則在Candidate和Victim兩者之中隨機地淘汰一個。

從上面對W-TinyLFU的原理描述可知,caffeine綜合了LFU和LRU的優勢,將不同特性的快取項存入不同的快取區域,最近剛產生的快取項進入Window區,不會被淘汰;訪問頻率高的快取項進入Protected區,也不會淘汰;介於這兩者之間的快取項存在Probation區,當快取空間滿了時,Probation區的快取項會根據訪問頻率判斷是保留還是淘汰;通過這種機制,很好的平衡了訪問頻率和訪問時間新鮮程度兩個維度因素,儘量將新鮮的訪問頻率高的快取項保留在快取中。同時在維護快取項訪問頻率時,引入計數器飽和和衰減機制,即節省了儲存資源,也能較好的處理稀疏流量、短時超熱點流量等傳統LRU和LFU無法很好處理的場景。

W-TinyLFU的優點:

使用Count-Min Sketch演算法儲存訪問頻率,極大的節省空間;

TinyLFU會 定期進行新鮮度 衰減操作,應對訪問模式變化;

並且使用W-LRU機制能夠儘可能避免快取汙染的發生,在過濾器內部會進行篩選處理,避免低頻資料置換高頻資料。

W-TinyLFU的缺點:

目前已知應用於Caffeine Cache元件裡,應用不是很多。

W-TinyLFU與JVM分代記憶體關聯的想通之處

在caffeine所有的資料都在ConcurrentHashMap中,這個和guava cache不同,guava cache是自己實現了個類似ConcurrentHashMap的結構。

與JVM分代記憶體類似,在caffeine中有三個記錄引用的LRU佇列:

Eden佇列=window :

在caffeine中規定只能為快取容量的%1,

假如: size=100, 那Eden佇列的有效大小就等於1。

Eden的作用:記錄的是新到的資料,防止突發流量由於之前沒有訪問頻率,而導致被淘汰。

比如有一部新劇上線,在最開始其實是沒有訪問頻率的,

防止上線之後被其他快取淘汰出去,而加入這個區域。

可以理解為,伊甸園區,保留的剛剛誕生的未年輕人,防止沒有長大,直接被大人乾死了。

Eden佇列滿了之後, 淘汰的叫做 candidate候選人,進入Probation佇列

Probation佇列:

可以叫做考察佇列,類似於surviver區

在這個佇列就代表壽命開始延長,如果Eden佇列佇列已經滿了, 那種最近沒有被訪問過的元素,資料相對比較冷,第一輪淘汰,進行Probation佇列做臨時的考察。

如果經常了一段時間的積累,訪問的頻率還是不高,將被進入到 Protected佇列。

Probation佇列滿了之後, 淘汰的叫做 victim 犧牲品,進入Protected 佇列

Protected佇列:

可以叫做保護佇列,類似於老年代

到了這個佇列中,意味著已經取,你暫時不會被淘汰,

但是別急,如果Probation佇列沒有資料,或者Protected資料滿了,你也將會被面臨淘汰的尷尬局面。

當然想要變成這個佇列,需要把Probation訪問一次之後,就會提升為Protected佇列。

這個有效大小為(size減去eden) X 80% 如果size =100,就會是79。

這三個佇列關係如下:

所有的新資料都會進入Eden。
Eden滿了,淘汰進入Probation。

如果在Probation中訪問了其中某個資料,如果考察區空間不夠,則這個資料升級為Protected。

如果Protected滿了,又會繼續降級為Probation。

對於發生資料淘汰的時候,會從Eden中選擇候選人,和Probation中victim進行淘汰pk。

會把Probation佇列中的資料隊頭稱為受害者,這個隊頭肯定是最早進入Probation的,按照LRU佇列的演算法的話那他其實他就應該被淘汰,但是在這裡只能叫他受害者,Probation 是考察佇列,代表馬上要給他行刑了。

Eden中選擇候選人,也是Probation中隊尾元素,也叫攻擊者。這裡受害者會和攻擊者皇城PK決出我們應該被淘汰的。

通過我們的Count-Min Sketch中的記錄的頻率資料有以下幾個判斷:

如果 candidateFreq 大於victimFreq,那麼受害者就直接被淘汰。

如果 candidateFreq 大於 >=6,那麼隨機淘汰。

如果 candidateFreq 大於 < 6,淘汰 candidate,留下victimKey

Java本地快取使用實操

基於HashMap實現LRU

通過Map的底層方式,直接將需要快取的物件放在記憶體中。

  • 優點:簡單粗暴,不需要引入第三方包,比較適合一些比較簡單的場景。
  • 缺點:沒有快取淘汰策略,定製化開發成本高。
package com.crazymakercircle.cache;

import com.crazymakercircle.util.Logger;
import org.junit.Test;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;

public class LruDemo {


    @Test
    public  void testSimpleLRUCache() {

        SimpleLRUCache cache = new SimpleLRUCache( 2 /* 快取容量 */ );
        cache.put(1, 1);
        cache.put(2, 2);
        Logger.cfo(cache.get(1));       // 返回  1
        cache.put(3, 3);    // 該操作會使得 2 淘汰
        Logger.cfo(cache.get(2));        // 返回 -1 (未找到)
        cache.put(4, 4);    // 該操作會使得 1 淘汰
        Logger.cfo(cache.get(1));        // 返回 -1 (未找到)
        Logger.cfo(cache.get(3));        // 返回  3
        Logger.cfo(cache.get(4));        // 返回  4
    }

    @Test
    public  void testLRUCache() {

        LRUCache cache = new LRUCache( 2 /* 快取容量 */ );
        cache.put(1, 1);
        cache.put(2, 2);
        Logger.cfo(cache.get(1));       // 返回  1
        cache.put(3, 3);    // 該操作會使得 2 淘汰
        Logger.cfo(cache.get(2));        // 返回 -1 (未找到)
        cache.put(4, 4);    // 該操作會使得 1 淘汰
        Logger.cfo(cache.get(1));        // 返回 -1 (未找到)
        Logger.cfo(cache.get(3));        // 返回  3
        Logger.cfo(cache.get(4));        // 返回  4
    }

    

    static class SimpleLRUCache extends LinkedHashMap<Integer, Integer> {
        private int capacity;

        public SimpleLRUCache(int capacity) {
            super(capacity, 0.75F, true);
            this.capacity = capacity;
        }

        public int get(int key) {
            return super.getOrDefault(key, -1);
        }

        public void put(int key, int value) {
            super.put(key, value);
        }

        @Override
        protected boolean removeEldestEntry(Map.Entry<Integer, Integer> eldest) {
            return size() > capacity;
        }
    }

    static  private class Entry {
        private int key;
        private int value;
        private Entry before;
        private Entry after;

        public Entry() {
        }

        public Entry(int key, int value) {
            this.key = key;
            this.value = value;
        }
    }

    static  class LRUCache {
        //map容器 ,空間換時間,儲存key對應的CacheNode,保證用O(1) 的時間獲取到value
        private Map<Integer, Entry> cacheMap = new HashMap<Integer, Entry>();
        // 最大容量
        private int capacity;
        /**
         * 通過雙向指標來保證資料的插入更新順序,以及隊尾淘汰機制
         */
        //頭指標
        private Entry head;
        //尾指標
        private Entry tail;

        //容器大小
        private int size;


        /**
         * 初始化雙向連結串列,容器大小
         */
        public LRUCache(int capacity) {
            this.capacity = capacity;
            head = new Entry();
            tail = new Entry();
            head.after = tail;
            tail.before = head;
        }

        public int get(int key) {
            Entry node = cacheMap.get(key);
            if (node == null) {
                return -1;
            }
            // node != null,返回node後需要把訪問的node移動到雙向連結串列頭部
            moveToHead(node);
            return node.value;
        }

        public void put(int key, int value) {
            Entry node = cacheMap.get(key);
            if (node == null) {
                //快取不存在就新建一個節點,放入Map以及雙向連結串列的頭部
                Entry newNode = new Entry(key, value);
                cacheMap.put(key, newNode);
                addToHead(newNode);
                size++;
                //如果超出快取容器大小,就移除隊尾元素
                if (size > capacity) {
                    Entry removeNode = removeTail();
                    cacheMap.remove(removeNode.key);
                    size--;
                }
            } else {
                //如果已經存在,就把node移動到頭部。
                node.value = value;
                moveToHead(node);
            }
        }

        /**
         * 移動節點到頭部:
         * 1、刪除節點
         * 2、把節點新增到頭部
         */
        private void moveToHead(Entry node) {
            removeNode(node);
            addToHead(node);
        }

        /**
         * 移除隊尾元素
         */
        private Entry removeTail() {
            Entry node = tail.before;
            removeNode(node);
            return node;
        }

        private void removeNode(Entry node) {

            node.before.after = node.after;
            node.after.before = node.before;
        }

        /**
         * 把節點新增到頭部
         */
        private void addToHead(Entry node) {
            head.after.before = node;
            node.after = head.after;
            head.after = node;
            node.before = head;
        }
    }
}

Guava Cache使用案例

Guava Cache是由Google開源的基於LRU替換演算法的快取技術。

但Guava Cache由於被下面即將介紹的Caffeine全面超越而被取代,因此不特意編寫示例程式碼了,有興趣的讀者可以訪問Guava Cache主頁。

  • 優點:支援最大容量限制,兩種過期刪除策略(插入時間和訪問時間),支援簡單的統計功能。
  • 缺點:springboot2和spring5都放棄了對Guava Cache的支援。

Caffeine使用案例

Caffeine採用了W-TinyLFU(LUR和LFU的優點結合)開源的快取技術。快取效能接近理論最優,屬於是Guava Cache的增強版。

package com.github.benmanes.caffeine.demo;

import com.github.benmanes.caffeine.cache.*;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.concurrent.TimeUnit;

public class Demo1 {
    static System.Logger logger = System.getLogger(Demo1.class.getName());

    public static void hello(String[] args) {
        System.out.println("args = " + args);
    }


    public static void main(String... args) throws Exception {
        Cache<String, String> cache =  Caffeine.newBuilder()
                //最大個數限制
                //最大容量1024個,超過會自動清理空間
                .maximumSize(1024)
                //初始化容量
                .initialCapacity(1)
                //訪問後過期(包括讀和寫)
                //5秒沒有讀寫自動刪除
                .expireAfterAccess(5, TimeUnit.SECONDS)
                //寫後過期
                .expireAfterWrite(2, TimeUnit.HOURS)
                //寫後自動非同步重新整理
                .refreshAfterWrite(1, TimeUnit.HOURS)
                //記錄下快取的一些統計資料,例如命中率等
                .recordStats()
                .removalListener(((key, value, cause) -> {
                    //清理通知 key,value ==> 鍵值對   cause ==> 清理原因
                  System.out.println("removed key="+ key);
                }))
                //使用CacheLoader建立一個LoadingCache
                .build(new CacheLoader<String, String>() {
                    //同步載入資料
                    @Nullable
                    @Override
                    public String load(@NonNull String key) throws Exception {
                        System.out.println("loading  key="+ key);
                        return "value_" + key;
                    }

                    //非同步載入資料
                    @Nullable
                    @Override
                    public String reload(@NonNull String key, @NonNull String oldValue) throws Exception {
                        System.out.println("reloading  key="+ key);
                        return "value_" + key;
                    }
                });

        //新增值
        cache.put("name", "瘋狂創客圈");
        cache.put("key", "一個高併發 研究社群");

        //獲取值
        @Nullable String value = cache.getIfPresent("name");
        System.out.println("value = " + value);
        //remove
        cache.invalidate("name");
        value = cache.getIfPresent("name");
        System.out.println("value = " + value);
    }

}

說明:本文會持續更新,最新PDF電子版本,請從下面的連接獲取: 語雀 或者 碼雲

Encache使用案例

Ehcache是一個純java的程序內快取框架,具有快速、精幹的特點。是hibernate預設的cacheprovider。

  • 優點:支援多種快取淘汰演算法,包括LFU,LRU和FIFO;快取支援堆內快取,堆外快取和磁碟快取;支援多種叢集方案,解決資料共享問題。
  • 缺點:效能比Caffeine差
package com.crazymakercircle.cache;


import com.crazymakercircle.im.common.bean.User;
import com.crazymakercircle.util.IOUtil;
import net.sf.ehcache.Cache;
import net.sf.ehcache.CacheManager;
import net.sf.ehcache.Element;
import net.sf.ehcache.config.CacheConfiguration;

import java.io.InputStream;

import static com.crazymakercircle.util.IOUtil.getResourcePath;

public class EhcacheDemo {
    public static void main(String[] args) {


        // 1. 建立快取管理器
        String inputStream= getResourcePath( "ehcache.xml");
        CacheManager cacheManager = CacheManager.create(inputStream);



        // 2. 獲取快取物件
        Cache cache = cacheManager.getCache("HelloWorldCache");
        CacheConfiguration config = cache.getCacheConfiguration();
        config.setTimeToIdleSeconds(60);
        config.setTimeToLiveSeconds(120);



        // 3. 建立元素
        Element element = new Element("key1", "value1");
         
        // 4. 將元素新增到快取
        cache.put(element);
         
        // 5. 獲取快取
        Element value = cache.get("key1");
        System.out.println("value: " + value);
        System.out.println(value.getObjectValue());
         
        // 6. 刪除元素
        cache.remove("key1");
         
        User user = new User("1000", "Javaer1");
        Element element2 = new Element("user", user);
        cache.put(element2);
        Element value2 = cache.get("user");
        System.out.println("value2: "  + value2);
        User user2 = (User) value2.getObjectValue();
        System.out.println(user2);
         
        System.out.println(cache.getSize());
         
        // 7. 重新整理快取
        cache.flush();
         
        // 8. 關閉快取管理器
        cacheManager.shutdown();
 
    }
}public class EncacheTest {    public static void main(String[] args) throws Exception {        // 宣告一個cacheBuilder        CacheManager cacheManager = CacheManagerBuilder.newCacheManagerBuilder()                .withCache("encacheInstance", CacheConfigurationBuilder                        //宣告一個容量為20的堆內快取                        .newCacheConfigurationBuilder(String.class,String.class, ResourcePoolsBuilder.heap(20)))                .build(true);        // 獲取Cache例項        Cache<String,String> myCache =  cacheManager.getCache("encacheInstance", String.class, String.class);        // 寫快取        myCache.put("key","v");        // 讀快取        String value = myCache.get("key");        // 移除換粗        cacheManager.removeCache("myCache");        cacheManager.close();    }}

caffeine 的使用實操

在 pom.xml 中新增 caffeine 依賴:

<dependency>
    <groupId>com.github.ben-manes.caffeine</groupId>
    <artifactId>caffeine</artifactId>
    <version>2.5.5</version>
</dependency>

建立一個 Caffeine 快取(類似一個map):

Cache<String, Object> manualCache = Caffeine.newBuilder()
        .expireAfterWrite(10, TimeUnit.MINUTES)
        .maximumSize(10_000)
        .build();

常見用法:

public static void main(String... args) throws Exception {
        Cache<String, String> cache =  Caffeine.newBuilder()
                //最大個數限制
                //最大容量1024個,超過會自動清理空間
                .maximumSize(1024)
                //初始化容量
                .initialCapacity(1)
                //訪問後過期(包括讀和寫)
                //5秒沒有讀寫自動刪除
                .expireAfterAccess(5, TimeUnit.SECONDS)
                //寫後過期
                .expireAfterWrite(2, TimeUnit.HOURS)
                //寫後自動非同步重新整理
                .refreshAfterWrite(1, TimeUnit.HOURS)
                //記錄下快取的一些統計資料,例如命中率等
                .recordStats()
                .removalListener(((key, value, cause) -> {
                    //清理通知 key,value ==> 鍵值對   cause ==> 清理原因
                  System.out.println("removed key="+ key);
                }))
                //使用CacheLoader建立一個LoadingCache
                .build(new CacheLoader<String, String>() {
                    //同步載入資料
                    @Nullable
                    @Override
                    public String load(@NonNull String key) throws Exception {
                        System.out.println("loading  key="+ key);
                        return "value_" + key;
                    }

                    //非同步載入資料
                    @Nullable
                    @Override
                    public String reload(@NonNull String key, @NonNull String oldValue) throws Exception {
                        System.out.println("reloading  key="+ key);
                        return "value_" + key;
                    }
                });

        //新增值
        cache.put("name", "瘋狂創客圈");
        cache.put("key", "一個高併發 研究社群");

        //獲取值
        @Nullable String value = cache.getIfPresent("name");
        System.out.println("value = " + value);
        //remove
        cache.invalidate("name");
        value = cache.getIfPresent("name");
        System.out.println("value = " + value);
    }

引數方法:

  • initialCapacity(1) 初始快取長度為1;
  • maximumSize(100) 最大長度為100;
  • expireAfterWrite(1, TimeUnit.DAYS) 設定快取策略在1天未寫入過期快取。

過期策略

在Caffeine中分為兩種快取,一個是有界快取,一個是無界快取,無界快取不需要過期並且沒有界限。

在有界快取中提供了三個過期API:

  • expireAfterWrite:代表著寫了之後多久過期。(上面列子就是這種方式)
  • expireAfterAccess:代表著最後一次訪問了之後多久過期。
  • expireAfter:在expireAfter中需要自己實現Expiry介面,這個介面支援create、update、以及access了之後多久過期。

注意這個API和前面兩個API是互斥的。這裡和前面兩個API不同的是,需要你告訴快取框架,它應該在具體的某個時間過期,也就是通過前面的重寫create、update、以及access的方法,獲取具體的過期時間。

填充(Population)特性

填充特性是指如何在key不存在的情況下,如何建立一個物件進行返回,主要分為下面四種

1 手動(Manual)

public static void main(String... args) throws Exception {
        Cache<String, Integer> cache = Caffeine.newBuilder().build();

        Integer age1 = cache.getIfPresent("張三");
        System.out.println(age1);

        //當key不存在時,會立即創建出物件來返回,age2不會為空
        Integer age2 = cache.get("張三", k -> {
            System.out.println("k:" + k);
            return 18;
        });
        System.out.println(age2);
}
null
k:張三
18

Cache介面允許顯式的去控制快取的檢索,更新和刪除。

我們可以通過cache.getIfPresent(key) 方法來獲取一個key的值,通過cache.put(key, value)方法顯示的將數控放入快取,但是這樣子會覆蓋緩原來key的資料。更加建議使用cache.get(key,k - > value) 的方式,get 方法將一個引數為 key 的 Function (createExpensiveGraph) 作為引數傳入。如果快取中不存在該鍵,則呼叫這個 Function 函式,並將返回值作為該快取的值插入快取中。get 方法是以阻塞方式執行呼叫,即使多個執行緒同時請求該值也只會呼叫一次Function方法。這樣可以避免與其他執行緒的寫入競爭,這也是為什麼使用 get 優於 getIfPresent 的原因。

注意:如果呼叫該方法返回NULL(如上面的 createExpensiveGraph 方法),則cache.get返回null,如果呼叫該方法丟擲異常,則get方法也會丟擲異常。

可以使用Cache.asMap() 方法獲取ConcurrentMap進而對快取進行一些更改。

2 自動(Loading)

public static void main(String... args) throws Exception {

    //此時的型別是 LoadingCache 不是 Cache
    LoadingCache<String, Integer> cache = Caffeine.newBuilder().build(key -> {
        System.out.println("自動填充:" + key);
        return 18;
    });

    Integer age1 = cache.getIfPresent("張三");
    System.out.println(age1);

    // key 不存在時 會根據給定的CacheLoader自動裝載進去
    Integer age2 = cache.get("張三");
    System.out.println(age2);
}
null
自動填充:張三
18

3 非同步手動(Asynchronous Manual)

public static void main(String... args) throws Exception {
    AsyncCache<String, Integer> cache = Caffeine.newBuilder().buildAsync();

    //會返回一個 future物件, 呼叫future物件的get方法會一直卡住直到得到返回,和多執行緒的submit一樣
    CompletableFuture<Integer> ageFuture = cache.get("張三", name -> {
        System.out.println("name:" + name);
        return 18;
    });

    Integer age = ageFuture.get();
    System.out.println("age:" + age);
}
name:張三
age:18

4 非同步自動(Asynchronously Loading)

public static void main(String... args) throws Exception {
    //和1.4基本差不多
    AsyncLoadingCache<String, Integer> cache = Caffeine.newBuilder().buildAsync(name -> {
        System.out.println("name:" + name);
        return 18;
    });
    CompletableFuture<Integer> ageFuture = cache.get("張三");

    Integer age = ageFuture.get();
    System.out.println("age:" + age);
}

refresh重新整理策略

何為更新策略?就是在設定多長時間後會自動重新整理快取。

Caffeine提供了refreshAfterWrite()方法,來讓我們進行寫後多久更新策略:

LoadingCache<String, String> build = CacheBuilder.newBuilder().refreshAfterWrite(1, TimeUnit.DAYS)
   .build(new CacheLoader<String, String>() {
          @Override
          public String load(String key)  {
             return "";
          }
    });
}

上面的程式碼我們需要建立一個CacheLodaer來進行重新整理,這裡是同步進行的,可以通過buildAsync方法進行非同步構建。

在實際業務中這裡可以把我們程式碼中的mapper傳入進去,進行資料來源的重新整理。

但是實際使用中,你設定了一天重新整理,但是一天後你發現快取並沒有重新整理。

這是因為只有在1天后這個快取再次訪問後才能重新整理,如果沒人訪問,那麼永遠也不會重新整理。

我們來看看自動重新整理是怎麼做的呢?

自動重新整理只存在讀操作之後,也就是我們的afterRead()這個方法,其中有個方法叫refreshIfNeeded,它會根據你是同步還是非同步然後進行重新整理處理。

說明:本文會持續更新,最新PDF電子版本,請從下面的連接獲取: 語雀 或者 碼雲

驅逐策略(eviction)

Caffeine提供三類驅逐策略:基於大小(size-based),基於時間(time-based)和基於引用(reference-based)。

基於大小(size-based)

基於大小驅逐,有兩種方式:一種是基於快取大小,一種是基於權重。

// Evict based on the number of entries in the cache
// 根據快取的計數進行驅逐
LoadingCache<Key, Graph> graphs = Caffeine.newBuilder()
    .maximumSize(10_000)
    .build(key -> createExpensiveGraph(key));

// Evict based on the number of vertices in the cache
// 根據快取的權重來進行驅逐(權重只是用於確定快取大小,不會用於決定該快取是否被驅逐)
LoadingCache<Key, Graph> graphs = Caffeine.newBuilder()
    .maximumWeight(10_000)
    .weigher((Key key, Graph graph) -> graph.vertices().size())
    .build(key -> createExpensiveGraph(key));

我們可以使用Caffeine.maximumSize(long)方法來指定快取的最大容量。當快取超出這個容量的時候,會使用Window TinyLfu策略來刪除快取。

我們也可以使用權重的策略來進行驅逐,可以使用Caffeine.weigher(Weigher) 函式來指定權重,使用Caffeine.maximumWeight(long) 函式來指定快取最大權重值。

maximumWeight與maximumSize不可以同時使用。

基於時間(Time-based)

// Evict based on a fixed expiration policy
// 基於固定的到期策略進行退出
LoadingCache<Key, Graph> graphs = Caffeine.newBuilder()
    .expireAfterAccess(5, TimeUnit.MINUTES)
    .build(key -> createExpensiveGraph(key));
LoadingCache<Key, Graph> graphs = Caffeine.newBuilder()
    .expireAfterWrite(10, TimeUnit.MINUTES)
    .build(key -> createExpensiveGraph(key));

// Evict based on a varying expiration policy
// 基於不同的到期策略進行退出
LoadingCache<Key, Graph> graphs = Caffeine.newBuilder()
    .expireAfter(new Expiry<Key, Graph>() {
      @Override
      public long expireAfterCreate(Key key, Graph graph, long currentTime) {
        // Use wall clock time, rather than nanotime, if from an external resource
        long seconds = graph.creationDate().plusHours(5)
            .minus(System.currentTimeMillis(), MILLIS)
            .toEpochSecond();
        return TimeUnit.SECONDS.toNanos(seconds);
      }
      
      @Override
      public long expireAfterUpdate(Key key, Graph graph, 
          long currentTime, long currentDuration) {
        return currentDuration;
      }
      
      @Override
      public long expireAfterRead(Key key, Graph graph,
          long currentTime, long currentDuration) {
        return currentDuration;
      }
    })
    .build(key -> createExpensiveGraph(key));

Caffeine提供了三種定時驅逐策略:

  • expireAfterAccess(long, TimeUnit):在最後一次訪問或者寫入後開始計時,在指定的時間後過期。假如一直有請求訪問該key,那麼這個快取將一直不會過期。
  • expireAfterWrite(long, TimeUnit): 在最後一次寫入快取後開始計時,在指定的時間後過期。
  • expireAfter(Expiry): 自定義策略,過期時間由Expiry實現獨自計算。

快取的刪除策略使用的是惰性刪除和定時刪除。這兩個刪除策略的時間複雜度都是O(1)。

測試定時驅逐不需要等到時間結束。我們可以使用Ticker介面和Caffeine.ticker(Ticker)方法在快取生成器中指定時間源,而不必等待系統時鐘。如:

FakeTicker ticker = new FakeTicker(); // Guava's testlib
Cache<Key, Graph> cache = Caffeine.newBuilder()
    .expireAfterWrite(10, TimeUnit.MINUTES)
    .executor(Runnable::run)
    .ticker(ticker::read)
    .maximumSize(10)
    .build();

cache.put(key, graph);
ticker.advance(30, TimeUnit.MINUTES)
assertThat(cache.getIfPresent(key), is(nullValue());

基於引用(reference-based)

Java4種引用的級別由高到低依次為:強引用 > 軟引用 > 弱引用 > 虛引用

引用型別 被垃圾回收時間 用途 生存時間
強引用 從來不會 物件的一般狀態 JVM停止執行時終止
軟引用 在記憶體不足時 物件快取 記憶體不足時終止
弱引用 在垃圾回收時 物件快取 gc執行後終止
虛引用 在垃圾回收時 堆外記憶體 虛引用的通知特性來管理的堆外記憶體

1、強引用

以前我們使用的大部分引用實際上都是強引用,這是使用最普遍的引用。如果一個物件具有強引用,那就類似於必不可少的生活用品,垃圾回收器絕不會回收它。當記憶體空間不足,Java虛擬機器寧願丟擲OutOfMemoryError錯誤,使程式異常終止,也不會靠隨意回收具有強引用的物件來解決記憶體不足問題。

 String str = "abc";
 List<String> list = new Arraylist<String>();
 list.add(str)
  在list集合裡的資料不會釋放,即使記憶體不足也不會

在ArrayList類中定義了一個私有的變數elementData陣列,在呼叫方法清空陣列時可以看到為每個陣列內容賦值為null。不同於elementData=null,強引用仍然存在,避免在後續呼叫 add()等方法新增元素時進行重新的記憶體分配。使用如clear()方法中釋放記憶體的方法對陣列中存放的引用型別特別適用,這樣就可以及時釋放記憶體。

2、軟引用(SoftReference)

特色:

  • 記憶體溢位之前進行回收,GC時記憶體不足時回收,如果記憶體足夠就不回收

  • 使用場景:在記憶體足夠的情況下進行快取,提升速度,記憶體不足時JVM自動回收

如果一個物件只具有軟引用,那就類似於可有可物的生活用品。如果記憶體空間足夠,垃圾回收器就不會回收它,如果記憶體空間不足了,就會回收這些物件的記憶體。只要垃圾回收器沒有回收它,該物件就可以被程式使用。軟引用可用來實現記憶體敏感的快取記憶體。

軟引用可以和一個引用佇列(ReferenceQueue)聯合使用,如果軟引用所引用的物件被垃圾回收,JAVA虛擬機器就會把這個軟引用加入到與之關聯的引用佇列中。
如:

public class Test {  

    public static void main(String[] args){  
        System.out.println("開始");            
        A a = new A();            
        SoftReference<A> sr = new SoftReference<A>(a);  
        a = null;  
        if(sr!=null){  
            a = sr.get();  
        }  
        else{  
            a = new A();  
            sr = new SoftReference<A>(a);  
        }            
        System.out.println("結束");     
    }       

}  

class A{  
    int[] a ;  
    public A(){  
        a = new int[100000000];  
    }  
}  

當記憶體足夠大時可以把陣列存入軟引用,取資料時就可從記憶體裡取資料,提高執行效率

3.弱引用(WeakReference)

特色:

  • 每次GC時回收,無論記憶體是否足夠

  • 使用場景:a. ThreadLocalMap防止記憶體洩漏 b. 監控物件是否將要被回收

如果一個物件只具有弱引用,那就類似於可有可物的生活用品。

弱引用與軟引用的區別在於:只具有弱引用的物件擁有更短暫的生命週期。在垃圾回收器執行緒掃描它 所管轄的記憶體區域的過程中,一旦發現了只具有弱引用的物件,不管當前記憶體空間足夠與否,都會回收它的記憶體。不過,由於垃圾回收器是一個優先順序很低的執行緒, 因此不一定會很快發現那些只具有弱引用的物件。

弱引用可以和一個引用佇列(ReferenceQueue)聯合使用,如果弱引用所引用的物件被垃圾回收,Java虛擬機器就會把這個弱引用加入到與之關聯的引用佇列中。
如:

Object c = new Car(); //只要c還指向car object, car object就不會被回收
WeakReference<Car> weakCar = new WeakReference(Car)(car);

當要獲得weak reference引用的object時, 首先需要判斷它是否已經被回收:

weakCar.get();

如果此方法為空, 那麼說明weakCar指向的物件已經被回收了.

如果這個物件是偶爾的使用,並且希望在使用時隨時就能獲取到,但又不想影響此物件的垃圾收集,那麼你應該用 Weak Reference 來記住此物件。

當你想引用一個物件,但是這個物件有自己的生命週期,你不想介入這個物件的生命週期,這時候你就是用弱引用。

這個引用不會在物件的垃圾回收判斷中產生任何附加的影響。

4.虛引用(PhantomReference)

“虛引用”顧名思義,就是形同虛設,與其他幾種引用都不同,虛引用並不會決定物件的生命週期。

如果一個物件僅持有虛引用,那麼它就和沒有任何引用一樣,在任何時候都可能被垃圾回收。

虛引用主要用來跟蹤物件被垃圾回收的活動。

虛引用與軟引用和弱引用的一個區別在於:虛引用必須和引用佇列(ReferenceQueue)聯合使用

當垃圾回收器準備回收一個物件時,如果發現它還有虛引用,就會在回收物件的記憶體之前,把這個虛引用加入到與之關聯的引用佇列中。

程式可以通過判斷引用佇列中是否已經加入了虛引用,來了解被引用的物件是否將要被垃圾回收。程式如果發現某個虛引用已經被加入到引用佇列,那麼就可以在所引用的物件的記憶體被回收之前採取必要的行動。

特別注意,在實際程式設計中一般很少使用弱引用與虛引用,使用軟用的情況較多,這是因為軟引用可以加速JVM對垃圾記憶體的回收速度,可以維護系統的執行安全,防止記憶體溢位(OutOfMemory)等問題的產生。

說明:本文會持續更新,最新PDF電子版本,請從下面的連接獲取: 語雀 或者 碼雲

byteBuffer回收對外記憶體的流程

兩種使用堆外記憶體的方法,一種是依靠unsafe物件,另一種是NIO中的ByteBuffer,直接使用unsafe物件來操作記憶體,對於一般開發者來說難度很大,並且如果記憶體管理不當,容易造成記憶體洩漏。所以不推薦。

推薦使用的是ByteBuffer來操作堆外記憶體。

在上面的ByteBuffer如何 觸發堆外記憶體的回收呢?是通過 虛引用的 關聯執行緒是實現的。

  • 當byteBuffer被回收後,在進行GC垃圾回收的時候,發現虛引用物件CleanerPhantomReference型別的物件,並且被該物件引用的物件(ByteBuffer物件)已經被回收了
  • 那麼他就將將這個物件放入到(ReferenceQueue)佇列中
  • JVM中會有一個優先順序很低的執行緒會去將該佇列中的虛引用物件取出來,然後回撥clean()方法
  • clean()方法裡做的工作其實就是根據記憶體地址去釋放這塊記憶體(內部還是通過unsafe物件去釋放的記憶體)。

可以看到被虛引用引用的物件其實就是這個byteBuffer物件。

所以說需要重點關注的是這個byteBuffer物件被回收了以後會觸發什麼操作。

說明:本文會持續更新,最新PDF電子版本,請從下面的連接獲取: 語雀 或者 碼雲

快取的驅逐配置成基於垃圾回收器

// Evict when neither the key nor value are strongly reachable
// 當key和value都沒有引用時驅逐快取
LoadingCache<Key, Graph> graphs = Caffeine.newBuilder()
    .weakKeys()
    .weakValues()
    .build(key -> createExpensiveGraph(key));

// Evict when the garbage collector needs to free memory
// 當垃圾收集器需要釋放記憶體時驅逐
LoadingCache<Key, Graph> graphs = Caffeine.newBuilder()
    .softValues()
    .build(key -> createExpensiveGraph(key));

我們可以將快取的驅逐配置成基於垃圾回收器。

為此,我們可以將key 和 value 配置為弱引用或只將值配置成軟引用。

注意:AsyncLoadingCache不支援弱引用和軟引用。

Caffeine.weakKeys() 使用弱引用儲存key。如果沒有其他地方對該key有強引用,那麼該快取就會被垃圾回收器回收。由於垃圾回收器只依賴於身份(identity)相等,因此這會導致整個快取使用身份 (==) 相等來比較 key,而不是使用 equals()。

Caffeine.weakValues() 使用弱引用儲存value。如果沒有其他地方對該value有強引用,那麼該快取就會被垃圾回收器回收。由於垃圾回收器只依賴於身份(identity)相等,因此這會導致整個快取使用身份 (==) 相等來比較 key,而不是使用 equals()。

Caffeine.softValues() 使用軟引用儲存value。當記憶體滿了過後,軟引用的物件以將使用最近最少使用(least-recently-used ) 的方式進行垃圾回收。由於使用軟引用是需要等到記憶體滿了才進行回收,所以我們通常建議給快取配置一個使用記憶體的最大值。 softValues() 將使用身份相等(identity) (==) 而不是equals() 來比較值。

注意:Caffeine.weakValues()和Caffeine.softValues()不可以一起使用。

Removal移除特性

概念:

  • 驅逐(eviction):由於滿足了某種驅逐策略,後臺自動進行的刪除操作
  • 無效(invalidation):表示由呼叫方手動刪除快取
  • 移除(removal):監聽驅逐或無效操作的監聽器

手動刪除快取:

在任何時候,您都可能明確地使快取無效,而不用等待快取被驅逐。

// individual key
cache.invalidate(key)
// bulk keys
cache.invalidateAll(keys)
// all keys
cache.invalidateAll()

Removal 監聽器:

Cache<Key, Graph> graphs = Caffeine.newBuilder()
    .removalListener((Key key, Graph graph, RemovalCause cause) ->
        System.out.printf("Key %s was removed (%s)%n", key, cause))
    .build();

您可以通過Caffeine.removalListener(RemovalListener) 為快取指定一個刪除偵聽器,以便在刪除資料時執行某些操作。 RemovalListener可以獲取到key、value和RemovalCause(刪除的原因)。

刪除偵聽器的裡面的操作是使用Executor來非同步執行的。預設執行程式是ForkJoinPool.commonPool(),可以通過Caffeine.executor(Executor)覆蓋。當操作必須與刪除同步執行時,請改為使用CacheWrite,CacheWrite將在下面說明。

注意:由RemovalListener丟擲的任何異常都會被記錄(使用Logger)並不會丟擲。

重新整理(Refresh)

LoadingCache<Key, Graph> graphs = Caffeine.newBuilder()
    .maximumSize(10_000)
    // 指定在建立快取或者最近一次更新快取後經過固定的時間間隔,重新整理快取
    .refreshAfterWrite(1, TimeUnit.MINUTES)
    .build(key -> createExpensiveGraph(key));

重新整理和驅逐是不一樣的。重新整理的是通過LoadingCache.refresh(key)方法來指定,並通過呼叫CacheLoader.reload方法來執行,重新整理key會非同步地為這個key載入新的value,並返回舊的值(如果有的話)。驅逐會阻塞查詢操作直到驅逐作完成才會進行其他操作。

與expireAfterWrite不同的是,refreshAfterWrite將在查詢資料的時候判斷該資料是不是符合查詢條件,如果符合條件該快取就會去執行重新整理操作。例如,您可以在同一個快取中同時指定refreshAfterWrite和expireAfterWrite,只有當資料具備重新整理條件的時候才會去重新整理資料,不會盲目去執行重新整理操作。如果資料在重新整理後就一直沒有被再次查詢,那麼該資料也會過期。

重新整理操作是使用Executor非同步執行的。預設執行程式是ForkJoinPool.commonPool(),可以通過Caffeine.executor(Executor)覆蓋。

如果重新整理時引發異常,則使用log記錄日誌,並不會丟擲。

Writer直接寫(write-through )

LoadingCache<Key, Graph> graphs = Caffeine.newBuilder()
  .writer(new CacheWriter<Key, Graph>() {
    @Override public void write(Key key, Graph graph) {
      // write to storage or secondary cache
    }
    @Override public void delete(Key key, Graph graph, RemovalCause cause) {
      // delete from storage or secondary cache
    }
  })
  .build(key -> createExpensiveGraph(key));

CacheWriter允許快取充當一個底層資源的代理,當與CacheLoader結合使用時,所有對快取的讀寫操作都可以通過Writer進行傳遞。Writer可以把操作快取和操作外部資源擴充套件成一個同步的原子性操作。並且在快取寫入完成之前,它將會阻塞後續的更新快取操作,但是讀取(get)將直接返回原有的值。如果寫入程式失敗,那麼原有的key和value的對映將保持不變,如果出現異常將直接拋給呼叫者。

CacheWriter可以同步的監聽到快取的建立、變更和刪除操作。載入(例如,LoadingCache.get)、重新載入(例如,LoadingCache.refresh)和計算(例如Map.computeIfPresent)的操作不被CacheWriter監聽到。

注意:CacheWriter不能與weakKeys或AsyncLoadingCache結合使用。

寫模式(Write Modes)

CacheWriter可以用來實現一個直接寫(write-through )或回寫(write-back )快取的操作。

write-through式快取中,寫操作是一個同步的過程,只有寫成功了才會去更新快取。這避免了同時去更新資源和快取的條件競爭。

write-back式快取中,對外部資源的操作是在快取更新後非同步執行的。這樣可以提高寫入的吞吐量,避免資料不一致的風險,比如如果寫入失敗,則在快取中保留無效的狀態。這種方法可能有助於延遲寫操作,直到指定的時間,限制寫速率或批寫操作。

通過對write-back進行擴充套件,我們可以實現以下特性:

  • 批處理和合並操作
  • 延遲操作併到一個特定的時間執行
  • 如果超過閾值大小,則在定期重新整理之前執行批處理
  • 如果操作尚未重新整理,則從寫入後緩衝器(write-behind)載入
  • 根據外部資源的特點,處理重審,速率限制和併發

可以參考一個簡單的例子,使用RxJava實現。

分層(Layering)

CacheWriter可能用來整合多個快取進而實現多級快取。

多級快取的載入和寫入可以使用系統外部快取記憶體。這允許快取使用一個小並且快速的快取去呼叫一個大的並且速度相對慢一點的快取。典型的off-heap、file-based和remote 快取。

受害者快取(Victim Cache)是一個多級快取的變體,其中被刪除的資料被寫入二級快取。

這個delete(K, V, RemovalCause) 方法允許檢查為什麼該資料被刪除,並作出相應的操作。

同步監聽器(Synchronous Listeners)

同步監聽器會接收一個key在快取中的進行了那些操作的通知。

監聽器可以阻止快取操作,也可以將事件排隊以非同步的方式執行。

這種型別的監聽器最常用於複製或構建分散式快取。

統計(Statistics)特性

Cache<Key, Graph> graphs = Caffeine.newBuilder()
    .maximumSize(10_000)
    .recordStats()
    .build();

使用Caffeine.recordStats(),您可以開啟統計資訊收集。Cache.stats() 方法返回提供統計資訊的CacheStats,如:

  • hitRate():返回命中與請求的比率
  • hitCount(): 返回命中快取的總數
  • evictionCount():快取逐出的數量
  • averageLoadPenalty():載入新值所花費的平均時間

Cleanup 清理特性

快取的刪除策略使用的是惰性刪除和定時刪除,但是我也可以自己呼叫cache.cleanUp()方法手動觸發一次回收操作。

cache.cleanUp()是一個同步方法。

策略(Policy)特性

在建立快取的時候,快取的策略就指定好了。

但是我們可以在執行時可以獲得和修改該策略。這些策略可以通過一些選項來獲得,以此來確定快取是否支援該功能。

Size-based

cache.policy().eviction().ifPresent(eviction -> {
  eviction.setMaximum(2 * eviction.getMaximum());
});

如果快取配置的時基於權重來驅逐,那麼我們可以使用weightedSize() 來獲取當前權重。這與獲取快取中的記錄數的Cache.estimatedSize() 方法有所不同。

快取的最大值(maximum)或最大權重(weight)可以通過getMaximum()方法來讀取,並使用setMaximum(long)進行調整。當快取量達到新的閥值的時候快取才會去驅逐快取。

如果有需用我們可以通過hottest(int) 和 coldest(int)方法來獲取最有可能命中的資料和最有可能驅逐的資料快照。

Time-based

cache.policy().expireAfterAccess().ifPresent(expiration -> ...);
cache.policy().expireAfterWrite().ifPresent(expiration -> ...);
cache.policy().expireVariably().ifPresent(expiration -> ...);
cache.policy().refreshAfterWrite().ifPresent(expiration -> ...);

ageOf(key,TimeUnit) 提供了從expireAfterAccess,expireAfterWrite或refreshAfterWrite策略的角度來看條目已經空閒的時間。

最大持續時間可以從getExpiresAfter(TimeUnit)讀取,並使用setExpiresAfter(long,TimeUnit)進行調整。

如果有需用我們可以通過hottest(int) 和 coldest(int)方法來獲取最有可能命中的資料和最有可能驅逐的資料快照。

說明:本文會持續更新,最新PDF電子版本,請從下面的連接獲取: 語雀 或者 碼雲

Caffeine 架構分析

caffeine底層架構圖

caffeine的巨集觀結構圖

說明:本文會持續更新,最新PDF電子版本,請從下面的連接獲取: 語雀 或者 碼雲

Cache的內部資料容器

包含著一個ConcurrentHashMap,

這也是存放我們所有快取資料的地方,眾所周知,ConcurrentHashMap是一個併發安全的容器,這點很重要,可以說Caffeine其實就是一個被強化過的ConcurrentHashMap;

Scheduler(定時器)

定期清空資料的一個機制,可以不設定,如果不設定則不會主動的清空過期資料;

Executor 非同步執行緒池

指定執行非同步任務時要使用的執行緒池。

可以不設定,如果不設定則會使用預設的執行緒池,也就是ForkJoinPool.commonPool();

快取操作執行流程

  1. 通過put操作將資料放入data屬性中(ConcurrentHashMap)
  2. 建立AddTask任務,放入(offer)寫快取:writeBuffer
  3. 從writeBuffer中獲取任務,並執行其run方法,追加記錄頻率:frequencySketch().increment(key)
  4. 往window區寫入資料
  5. 如果資料超過window區大小,將資料移到probation區
  6. 比較從window區晉升的資料和probation區的老資料的頻率,輸者被淘汰,從data中刪除

訪問順序佇列

雙向連結串列對雜湊表中的所有條目進行排序。

通過在雜湊,我們可以在 O(1) 時間內找到條目並操作其相鄰元素。

這裡的訪問包括條目的建立、更新和讀取(CUR)。

最近使用最少的條目在頭部,最多的在末尾。

這為基於大小的淘汰(maximumSize)和 time-to-idle 淘汰(expireAfterAccess)提供了支援。

問題的關鍵在於,每次訪問都需要對該列表進行更改,這很難實現的並行且高效。

寫入順序佇列

寫入順序指的是條目的建立或更新(CU)。

與訪問順序佇列類似,寫順序佇列操作的時間複雜度是 O(1) 的。

這個佇列用於 time-to-live 過期(expireAfterWrite)。

讀緩衝區

典型的快取鎖定每個操作,以安全地對 訪問佇列中的條目進行重新排序

一種替代方法是將每個重新排序操作儲存在緩衝區中,然後分批應用更改。

可以將其視為頁面替換策略的預寫日誌。

當緩衝區滿的時候,將會立即嘗試獲取鎖並掛起,直到緩衝區內的操作被處理完畢後將會立即返回。

讀緩衝區被實現為條帶狀環形緩衝區。

條帶用於減少競爭,執行緒通過特定的雜湊選擇條帶。

環形緩衝區是一個固定大小的陣列,使其高效並最大程度地減少了 GC 開銷。條帶數可以根據競爭檢測演算法動態增長。

寫緩衝區

與讀取緩衝區類似,此緩衝區用於重放寫入事件

讀緩衝區是允許丟記錄的,因為這隻用於用於 優化驅逐策略的命中率

但是寫入不允許丟記錄的,因此必須用 有效的有界佇列實現。

由於每次填充(populate)時都要優先清空寫緩衝區,因此它通常保持為空或很小。

緩衝區被實現為可擴充套件的迴圈陣列,該陣列可調整為最大大小。

調整大小時,將分配併產生一個新陣列。先前的陣列包括供消費者遍歷的轉發連結,然後允許將舊的陣列釋放。

通過使用這種分塊機制,緩衝區的初始大小較小,讀取和寫入的成本較低,並且產生的垃圾最少。

當緩衝區已滿且無法增長時,生產者會不停自旋重試並觸發維護操作,然後 yield 一小段時間。

這樣可以使消費者執行緒根據執行緒優先順序來清空快取區重放寫操作。

鎖定攤銷

傳統的快取會鎖定每個操作,而 Caffeine 會批量處理工作、並將成本分散到許多執行緒中。

這將對鎖定的代價進行攤銷,而不會建立鎖競爭。

維護的開銷一般會委託給配置的執行器,不過如果任務被拒絕或使用了呼叫者執行策略,還是會由使用者執行緒執行。

批處理的一個優點是,由於鎖的排他性,緩衝區僅在給定的時間被單個執行緒清空。

這允許使用更有效的基於多生產者-單消費者的緩衝區實現。

通過利用 CPU 快取效率的優勢,它也可以更好地與硬體特性保持一致。

Entry 狀態切換

當快取不受排他鎖保護時,會存在操作可能以錯誤的順序記錄和重放的問題。

由於競爭的原因,建立-讀取-更新-刪除序列可能無法以相同順序儲存在緩衝區中。

這樣做將需要粗粒度的鎖,會導致降低效能。

與併發資料結構中的典型情況一樣,Caffeine 使用原子狀態轉換解決這個問題。

Entry 有三種狀態——活躍(alive)、已退休(retired)、已失效(dead)。

活躍狀態意味著它在雜湊表和訪問/寫入佇列中都存在。

從雜湊表中刪除 entry 時,該 entry 被標記為已退休,需要從佇列中刪除。

刪除完成後,該 entry 將被標記已失效並且可以進行垃圾回收。

寬鬆讀寫

Caffeine 對充分利用 volatile 操作花費了很多精力。

記憶體屏障提供了一種從硬體角度出發的視角,來代替從語言層面思考 volatile 的讀寫。

通過了解具體哪些屏障被建立以及它們對硬體和資料視覺化的影響,將具有實現更好效能的潛力。

當在鎖內保證獨佔訪問時,由於鎖獲取的記憶體屏障提供了資料可見性,因此 Caffeine 使用寬鬆讀。

在資料競爭無法避免的情況下,比如在讀取元素時校驗是否過期來模擬快取丟失,是可以接受的。

和寬鬆讀類似,Caffeine 還使用寬鬆寫。

當一個元素在鎖定狀態進行排他寫,那麼寫入可以在解鎖時釋放的記憶體屏障返回。

這也可以用來支援解決寫偏序問題,比如在讀取一個數據的時候更新其時間戳。

驅逐策略

Caffeine 使用 Window TinyLfu 策略來提供幾乎最佳的命中率。

訪問佇列分為兩個空間:主空間和准入視窗。

如果 TinyLfu 策略接受,則退出准入視窗並進入主空間。

TinyLfu 會比較視窗中的受害者和主空間的受害者之間的訪問頻率,選擇保留兩者之間之前被訪問頻率更高的 entry。

頻率在 CountMinSketch 中通過 4 bit 儲存,每個 entry 要求 8 bytes 以保證準確。

此配置使快取能夠以 O(1) 時間根據頻率和新近度進行淘汰,同時佔用空間也很小。

適應性

准入視窗和主空間的大小是根據工作負載特徵動態確定的。

如果偏向新近度,則傾向於使用大視窗,而偏向頻率傾向使用較小的視窗。

Caffeine 使用 hill climbing 演算法來取樣命中率,進行調整並將其配置為最佳平衡。

快速處理

當快取低於其最大容量的 50% 時,驅逐策略並不完全啟用。

暫不初始化記錄頻率的 sketch 可以減少記憶體佔用,因為快取可能會被配置了一個極其高的閾值。

除非其他特性要求,否則不記錄訪問,以避免讀取緩衝區操作和重放訪問並清空操作的競爭。

Hash DoS 保護

DoS是Denial of Service的簡稱,即拒絕服務,造成DoS的攻擊行為被稱為DoS攻擊,其目的是使計算機或網路無法提供正常的服務。最常見的DoS攻擊有計算機網路頻寬攻擊和連通性攻擊。

DoS攻擊是指惡意的攻擊網路協議實現的缺陷或直接通過野蠻手段殘忍地耗盡被攻擊物件的資源,目的是讓目標計算機或網路無法提供正常的服務或資源訪問,使目標系統服務系統停止響應甚至崩潰,而在此攻擊中並不包括侵入目標伺服器或目標網路裝置。這些服務資源包括網路頻寬,檔案系統空間容量,開放的程序或者允許的連線。這種攻擊會導致資源的匱乏,無論計算機的處理速度多快、記憶體容量多大、網路頻寬的速度多快都無法避免這種攻擊帶來的後果。

雜湊碰撞攻擊

採用鏈地址法的情況下發生衝突時會在雜湊衝突處構造一個連結串列,但是在極端情況下,有些惡意的攻擊者,可能會通過精心構造的資料,使得所有的資料經過雜湊函式之後,都對映到到同一個位置,這時候雜湊表就會退化為連結串列,查詢的時間複雜度就從 O(1) 急劇退化為 O(n)。

這樣就有可能發生因為查詢操作消耗大量 CPU 或者執行緒資源,而導致系統無法響應其他請求的情況,從而達到拒絕服務攻擊(DoS)的目的。

要防止雜湊碰撞攻擊,就要求我們在設計一個雜湊表的時候要考慮到在極端情況下效能也不不能退化到無法接受的情況,

防止雜湊碰撞 Dos 攻擊

一般我們為了防止雜湊碰撞攻擊需要從以下幾個方面著手:

雜湊函式需要設計好,即使只有細微改動,經過雜湊函式後得到的雜湊值也要大不相同,這樣可以增加偽造資料的難度。

設計負載因子,支援動態擴容。也就是不要等到雜湊表滿了才開始擴容,而要達到一定百分比之後就要開始擴容。
選擇合適的方法來解決雜湊衝突,

如果選擇鏈地址法,可以引入紅黑樹或者跳錶等資料結構來避免出現過長的連結串列,從而導致效能急劇下降。

當 key 之間的 hash 值相同,或者 hash 到了同一個位置,這類的 hash 衝突可能會導致效能降低。

hash 表採用將連結串列降級為紅黑樹來解決這一問題。

TinyLFU的Dos攻擊

一種針對 TinyLFU 的攻擊行為是人為地提高驅逐策略下的元素的預估頻率。

這將導致所有後續進入的元素被頻率過濾器所拒絕,導致快取失效。

一種解決方案是在比較過程中,加入少量抖動使得最後的結果具有一定的不確定性。

這通過 1% 以下的概率選擇保留一個將要被驅逐的視窗中的新進度更高的中等訪問頻率元素來實現。

程式碼生成

Cache 有許多不同的配置,只有使用特定功能的子集的時候,相關欄位才有意義。

如果預設情況下所有欄位都被存在,將會導致快取和每個快取中的元素的記憶體開銷的浪費。

而通過程式碼生成的最合理實現,將會減少執行時的記憶體開銷,但是會需要磁碟上更大的二進位制檔案。

這項技術有通過演算法優化的潛力。

也許在構造的時候使用者可以根據用法指定最適合的特性。

一個移動應用可能更需要更高的併發率,而伺服器可能需要在一定記憶體開銷下更高的命中率。

也許不需要通過不斷嘗試在所有用法中選擇最佳的平衡,而可以通過驅動演算法進行選擇。

封裝 hash map

快取通過在 ConcurrentHashMap 之上進行封裝來新增所需要的特性。

快取和 hash 表的併發演算法非常複雜。通過將兩者分開,可以更便利地應用 hash 表的設計的優秀之處,也可以避免更粗粒度的鎖覆蓋全表和驅逐所引發的問題。

這種方式的成本是額外的執行時開銷。

這些欄位可以直接內聯到表中的元素上,而不是通過包裝容納額外的元資料。缺少包裝可以提供單次表操作的快速路徑(比如 lambdas)而不是多次 map 呼叫和短暫存活的物件例項。

之前的專案中探索了兩種途徑:基於 ConcurrentLinkedHashMap 的封裝和 Guava 中 hash 表的分支開發。在最後的設計裡,分支開發的想法最後沒有實施,因為工程實在是太複雜了。

分層 TimerWheel

一個時間感知(time-aware)的優先順序佇列,該佇列使用雜湊和雙向連結列表在 O(1) 時間內執行操作。

此佇列用於變數到期(expireAfter(Expiry))。

Caffeine 的空間優化與時間優化

說明:本文會持續更新,最新PDF電子版本,請從下面的連接獲取: 語雀 或者 碼雲

Caffeine 的空間優化

Caffeine 如何對一個 key如何可以節省空間呢?

Caffeine 如何對一個 key 進行統計,但又可以節省空間呢?

在 TinyLFU 中,近似頻率的統計如下圖所示:

Caffeine 對這個演算法的實現在FrequencySketch類。

但 Caffeine 對此有進一步的優化,例如 :

  • Count–Min Sketch 使用了二維陣列,Caffeine 只是用了一個一維的陣列;
  • Caffeine 認為快取的訪問頻率不需要用到那麼大, 訪問頻率又是數值型別,這個數需要用 int 或 long 來儲存,但是 ,只需要 15 就足夠,一般認為達到 15 次的頻率算是很高的了,而且 Caffeine 還有另外一個機制來使得這個頻率進行衰退減半。

如果最大是 15 的話,那麼只需要 4 個 bit 就可以滿足了,一個 long 有 64bit,可以儲存 16 個這樣的統計數,Caffeine 就是這樣的設計,使得儲存效率提高了 16 倍。

Caffeine 對快取的讀寫(afterReadafterWrite方法)都會呼叫onAccesss 方法,而onAccess方法裡有一句:

frequencySketch().increment(key);

通過程式碼和註釋或者讀者可能難以理解,下圖是我畫出來幫助大家理解的結構圖。

注意紫色虛線框,其中藍色小格就是需要計算的位置:

Count-Min Sketch演算法詳細實現方案如下:

Count-Min Sketch原理

Count-Min Sketch維護了一個long[] table陣列,陣列的大小為快取空間容量(快取項最大數量)向上取整為2的n次方。Count-Min Sketch的計數器是4bit,table陣列的每個元素大小是64bit,相當於table元素包含了16個計數器,這16個計數器進一步分為4個group,那麼每個group包含4個計數器,正好等於bloom hash函式的個數,同一個key的四個計數器分別使用group內相應位置的計數器。每個table元素包含16個計數器,4個hash計數器在相應table元素內計數器的偏移不一樣,可以有效降低hash衝突。

快取項計數統計過程為:先計算快取項key的hash值,然後使用4個不同的種子值分別計算得到table陣列四個元素的下標。然後根據hash值的低2bit確定table陣列元素中的group,那麼第一個計數器位置為第一個table陣列元素相應group中的第一個計數器,第二個計數器位置為第二個table陣列元素相應group中的第二個計數器,第三個計數器位置為第三個table陣列元素相應group中的第三個計數器,第四個計數器位置為第四個table陣列元素相應group中的第四個計數器。

從Count-Min Sketch頻率統計演算法描述可知,由於計數器大小隻有4bit,極大地降低了LFU頻率統計對儲存空間的要求。同時,計數器統計上限是15,並在計數總和達到閾值時所有計數器值減半,相當於引入了計數飽和和衰減機制,可以有效解決短時間內突發大流量不能有效淘汰的問題。比如出現了一個突發熱點事件,它的訪問量是其他事件的成百上千倍,但是該熱點事件很快冷卻下去,傳統的LFU淘汰機制會讓該事件的快取長時間地保留在快取中而無法淘汰掉,雖然該型別事件已經訪問量非常小或無人問津了。

Caffeine 的降鮮機制

傳統LFU一般使用key-value形式來記錄每個key的頻率,優點是資料結構非常簡單,並且能跟快取本身的資料結構複用,增加一個屬性記錄頻率就行了,它的缺點也比較明顯就是頻率這個屬性會佔用很大的空間,但如果改用壓縮方式儲存頻率呢?

頻率佔用空間肯定可以減少,但會引出另外一個問題:怎麼從壓縮後的資料裡獲得對應key的頻率呢?

TinyLFU的解決方案是類似點陣圖的方法,將key取hash值獲得它的位下標,然後用這個下標來找頻率,但點陣圖只有0、1兩個值,那頻率明顯可能會非常大,這要怎麼處理呢? 另外使用點陣圖需要預佔非常大的空間,這個問題怎麼解決呢?

TinyLFU根據最大資料量設定生成一個long陣列,然後將頻率值儲存在其中的四個long的4個bit位中(4個bit位不會大於15),取頻率值時則取四個中的最小一個。

Caffeine認為頻率大於15已經很高了,是屬於熱資料,所以它只需要4個bit位來儲存,long有8個位元組64位,這樣可以儲存16個頻率。取hash值的後左移兩位,然後加上hash四次,這樣可以利用到16箇中的13個,利用率挺高的,或許有更好的演算法能將16個都利用到。

為了讓快取降低“新鮮度”,剔除掉過往頻率很高,但之後不經常的快取,Caffeine 有一個 Freshness Mechanism。

做法很簡答,就是當整體的統計計數(當前所有記錄的頻率統計之和,這個數值內部維護)達到某一個值時,那麼所有記錄的頻率統計除以 2。

 */
  @SuppressWarnings("ShortCircuitBoolean")
  public void increment(E e) {
    if (isNotInitialized()) {
      return;
    }
    //統計 tinyLFU 的計數
    int[] index = new int[8];
    int blockHash = spread(e.hashCode());
    int counterHash = rehash(blockHash);
    int block = (blockHash & blockMask) << 3;
    for (int i = 0; i < 4; i++) {
      int h = counterHash >>> (i << 3);  //0 8 16  24
      index[i] = (h >>> 1) & 15;
      int offset = h & 1;
      index[i + 4] = block + offset + (i << 1);  //i << 1: 0,2,4,6
    }
    boolean added =
          incrementAt(index[4], index[0])
        | incrementAt(index[5], index[1])
        | incrementAt(index[6], index[2])
        | incrementAt(index[7], index[3]);


    //當資料寫入次數達到資料長度時就重置
    if (added && (++size == sampleSize)) {
      reset();
    }
  }


看到reset方法就是做這個事情


  /** Reduces every counter by half of its original value. */
  void reset() {
    int count = 0;
    for (int i = 0; i < table.length; i++) {
      count += Long.bitCount(table[i] & ONE_MASK);
      table[i] = (table[i] >>> 1) & RESET_MASK;
    }
    size = (size - (count >>> 2)) >>> 1;
  }
}

關於這個 reset 方法,為什麼是除以 2,而不是其他,及其正確性,在最下面的參考資料的 TinyLFU 論文中 3.3 章節給出了數學證明,大家有興趣可以看看。

W-TinyLFU 整體設計

上面說到淘汰策略是影響快取命中率的因素之一,

一般比較簡單的快取就會直接用到 LFU(Least Frequently Used,即最不經常使用) 或者LRU(Least Recently Used,即最近最少使用) ,而 Caffeine 就是使用了 W-TinyLFU 演算法。

W-TinyLFU 看名字就能大概猜出來,它是 LFU 的變種,也是一種快取淘汰演算法。

那為什麼要使用 W-TinyLFU 呢?

淘汰策略(eviction policy)

當 window 區滿了,就會根據 LRU 把 candidate(即淘汰出來的元素)放到 probation 區,

如果 probation 區也滿了,就把 candidate 和 probation 將要淘汰的元素 victim,兩個進行“PK”,勝者留在 probation,輸者就要被淘汰了。

而且經過實驗發現當 window 區配置為總容量的 1%,剩餘的 99%當中的 80%分給 protected 區,20%分給 probation 區時,這時整體效能和命中率表現得最好,所以 Caffeine 預設的比例設定就是這個。

不過這個比例 Caffeine 會在執行時根據統計資料(statistics)去動態調整,

如果你的應用程式的快取隨著時間變化比較快的話,那麼增加 window 區的比例可以提高命中率,相反快取都是比較固定不變的話,增加 Main Cache 區(protected 區 +probation 區)的比例會有較好的效果。

下面我們看看上面說到的淘汰策略是怎麼實現的:

一般快取對讀寫操作後都有後續的一系列“維護”操作,Caffeine 也不例外,

這些“維護”操作都在maintenance方法,我們將要說到的淘汰策略也在裡面。

Caffeine的TinyLFU資料模式

Caffeine的TinyLFU資料模式,來說它使用了三個雙端佇列:

  • accessOrderEdenDeque,
  • accessOrderProbationDeque,
  • accessOrderProtectedDeque,

使用雙端佇列的原因是支援LRU演算法比較方便。

accessOrderEdenDeque屬於eden區,快取1%的資料,其餘的99%快取在main區。

accessOrderProbationDeque屬於main區,快取main內資料的20%,這部分是屬於冷資料,即將補淘汰。

accessOrderProtectedDeque屬於main區,快取main內資料的80%,這部分是屬於熱資料,是整個快取的主存區。

我們先看一下淘汰方法入口:


    /**
     * Evicts entries if the cache exceeds the maximum.
     */
    @GuardedBy("evictionLock")
    void evictEntries() {
        if (!evicts()) {
            return;
        }
        //todo 高併發非同步刪除 4.1 伊甸園的候選人,和考察區的犧牲者PK之戰 by nien  at 2022/11/30
        // 淘汰window區的記錄
        // candidate  第一個 準備晉升的元素
        var candidate = evictFromWindow();

        //淘汰Main區的記錄
        evictFromMain(candidate);
    }

accessOrderEdenDeque對應W-TinyLFU的W(window),這裡儲存的是最新寫入資料的引用,它使用LRU淘汰,

這裡面的資料主要是應對突發流量的問題,淘汰後的資料進入accessOrderProbationDeque.

程式碼如下:

 @GuardedBy("evictionLock")
    @Nullable Node<K, V> evictFromWindow() {
        Node<K, V> first = null;
        //todo 高併發非同步處理 5.2 從伊甸園升級到考察區 by nien  at 2022/11/30

        //node = window queue的頭部節點
        Node<K, V> node = accessOrderWindowDeque().peekFirst();

        //一直迴圈: 如果window區超過了最大的限制,那麼就要把“多出來”的記錄做處理
        while (windowWeightedSize() > windowMaximum()) {
            // The pending operations will adjust the size to reflect the correct weight
            if (node == null) {
                break;
            }
            //下一個節點
            Node<K, V> next = node.getNextInAccessOrder();
            if (node.getPolicyWeight() != 0) {
                //設定 node 的型別: 為觀察型別 probation
                node.makeMainProbation();

                //todo 高併發寫的關鍵程式碼    5.3 的呼應程式碼:節點從 lru 佇列 移除   by nien  at 2022/11/28
                // 從window區去掉
                //      node  = accessOrderWindowDeque().peekFirst()
                accessOrderWindowDeque().remove(node);


                //加入到probation queue,相當於把節點移動到probation區(開始準備晉升)
                accessOrderProbationDeque().offerLast(node);

                // 記錄一下第一個 準備晉升的元素
                if (first == null) {
                    first = node;
                }
                //因為window移除了一個節點,所以需要調整 size
                setWindowWeightedSize(windowWeightedSize() - node.getPolicyWeight());
            }
            node = next;
        }
//第一個 準備晉升的元素
        return first;
    }

資料進入probation佇列後,繼續執行以下程式碼:

  @GuardedBy("evictionLock")
    void evictFromMain(@Nullable Node<K, V> candidate) {
        int victimQueue = PROBATION;
        int candidateQueue = PROBATION;

        // 迭代處理 考察區 probation queue
        // victim 是probation queue的頭部
        Node<K, V> victim = accessOrderProbationDeque().peekFirst();
        while (weightedSize() > maximum()) {
            // Search the admission window for additional candidates
            // candidate 剛從window晉升來的, 最先晉升的那個 元素
            if ((candidate == null) && (candidateQueue == PROBATION)) {
                candidate = accessOrderWindowDeque().peekFirst();
                candidateQueue = WINDOW;
            }

            // Try evicting from the protected and window queues
            if ((candidate == null) && (victim == null)) {
                if (victimQueue == PROBATION) {

                    //todo 高併發非同步刪除 4.2 考察區的犧牲者  by nien  at 2022/11/30

                    victim = accessOrderProtectedDeque().peekFirst();
                    victimQueue = PROTECTED;
                    continue;
                } else if (victimQueue == PROTECTED) {
                    victim = accessOrderWindowDeque().peekFirst();
                    victimQueue = WINDOW;
                    continue;
                }

                // The pending operations will adjust the size to reflect the correct weight
                break;
            }

            // Skip over entries with zero weight
            if ((victim != null) && (victim.getPolicyWeight() == 0)) {
                victim = victim.getNextInAccessOrder();
                continue;
            } else if ((candidate != null) && (candidate.getPolicyWeight() == 0)) {
                candidate = candidate.getNextInAccessOrder();
                continue;
            }

            // Evict immediately if only one of the entries is present
            if (victim == null) {
                @SuppressWarnings("NullAway")
                Node<K, V> previous = candidate.getNextInAccessOrder();
                Node<K, V> evict = candidate;
                candidate = previous;
                evictEntry(evict, RemovalCause.SIZE, 0L);
                continue;
            } else if (candidate == null) {
                Node<K, V> evict = victim;
                victim = victim.getNextInAccessOrder();
                evictEntry(evict, RemovalCause.SIZE, 0L);
                continue;
            }

            // Evict immediately if both selected the same entry
            if (candidate == victim) {
                victim = victim.getNextInAccessOrder();
                evictEntry(candidate, RemovalCause.SIZE, 0L);
                candidate = null;
                continue;
            }

            // Evict immediately if an entry was collected
            K victimKey = victim.getKey();
            K candidateKey = candidate.getKey();
            if (victimKey == null) {
                Node<K, V> evict = victim;
                victim = victim.getNextInAccessOrder();
                evictEntry(evict, RemovalCause.COLLECTED, 0L);
                continue;
            } else if (candidateKey == null) {
                Node<K, V> evict = candidate;
                candidate = candidate.getNextInAccessOrder();
                evictEntry(evict, RemovalCause.COLLECTED, 0L);
                continue;
            }

            // Evict immediately if an entry was removed
            if (!victim.isAlive()) {
                Node<K, V> evict = victim;
                victim = victim.getNextInAccessOrder();
                evictEntry(evict, RemovalCause.SIZE, 0L);
                continue;
            } else if (!candidate.isAlive()) {
                Node<K, V> evict = candidate;
                candidate = candidate.getNextInAccessOrder();
                evictEntry(evict, RemovalCause.SIZE, 0L);
                continue;
            }

            // Evict immediately if the candidate's weight exceeds the maximum
            if (candidate.getPolicyWeight() > maximum()) {
                Node<K, V> evict = candidate;
                candidate = candidate.getNextInAccessOrder();
                evictEntry(evict, RemovalCause.SIZE, 0L);
                continue;
            }
          //todo 高併發非同步刪除 4.3 candidate和 victim的頻率PK之戰 by nien  at 2022/11/30

            // Evict the entry with the lowest frequency
            //根據節點的統計頻率frequency來做比較,看看要處理掉victim還是candidate
            // admit = ture : 准許 candidate 加入 ;false:淘汰 candidate

            if (admit(candidateKey, victimKey)) {
                Node<K, V> evict = victim;
                victim = victim.getNextInAccessOrder();
                //  刪除  victim ,從而留下 candidate
                evictEntry(evict, RemovalCause.SIZE, 0L);
                candidate = candidate.getNextInAccessOrder();
            } else {
                Node<K, V> evict = candidate;
                candidate = candidate.getNextInAccessOrder();
                //  刪除  candidate  ,從而留下 victim
                evictEntry(evict, RemovalCause.SIZE, 0L);
            }
        }
    }

上面的程式碼邏輯是從probation的頭尾取出兩個node進行比較頻率,頻率更小者將被remove,其中尾部元素就是上一部分從eden中淘汰出來的元素,

如果將兩步邏輯合併起來講是這樣的:

在eden佇列通過lru淘汰出來的”候選者“與probation佇列通過lru淘汰出來的“被驅逐者“進行頻率比較,失敗者將被從cache中真正移除。

下面看一下它的比較邏輯admit:

    //准許 candidate 加入:
    boolean admit(K candidateKey, K victimKey) {
        //分別獲取victim和candidate的統計頻率
        int victimFreq = frequencySketch().frequency(victimKey);
        int candidateFreq = frequencySketch().frequency(candidateKey);
        // 伊甸園的末位 > 考察區的末位
        if (candidateFreq > victimFreq) {
            return true;    //ture : 准許 candidate 加入,淘汰 victimKey ;           false:淘汰 candidate,留下victimKey
        } else if (candidateFreq >= ADMIT_HASHDOS_THRESHOLD) {
            // The maximum frequency is 15 and halved to 7 after a reset to age the history. An attack
            // exploits that a hot candidate is rejected in favor of a hot victim. The threshold of a warm
            // candidate reduces the number of random acceptances to minimize the impact on the hit rate.
            int random = ThreadLocalRandom.current().nextInt();
            return ((random & 127) == 0);
        }
        //   伊甸園的末位 太小   false:淘汰 candidate,留下victimKey
        return false;
    }

從frequencySketch取出候選者與被驅逐者的頻率,如果候選者的頻率高就淘汰被驅逐者,如果被驅逐者比候選者的頻率高,並且候選者頻率小於等於5則淘汰者,如果前面兩個條件都不滿足則隨機淘汰。

probation中這個資料, 如何移動到protected中去的呢?

 @GuardedBy("evictionLock")
    void reorderProbation(Node<K, V> node) {
        if (!accessOrderProbationDeque().contains(node)) {
            // Ignore stale accesses for an entry that is no longer present
            return;
        } else if (node.getPolicyWeight() > mainProtectedMaximum()) {
            reorder(accessOrderProbationDeque(), node);
            return;
        }

        // If the protected space exceeds its maximum, the LRU items are demoted to the probation space.
        // This is deferred to the adaption phase at the end of the maintenance cycle.
        setMainProtectedWeightedSize(mainProtectedWeightedSize() + node.getPolicyWeight());
        accessOrderProbationDeque().remove(node);
        accessOrderProtectedDeque().offerLast(node);
        node.makeMainProtected();
    }

當資料被訪問時並且該資料在probation中,這個資料就會移動到protected中去,同時通過lru從protected中淘汰一個數據進入到probation中。

這樣資料流轉的邏輯全部通了:

新資料都會進入到eden中,通過lru淘汰到probation,並與probation中通過lru淘汰的資料進行使用頻率pk,如果勝利了就繼續留在probation中,如果失敗了就會被直接淘汰,當這條資料被訪問了,則移動到protected。當其它資料被訪問了,則它可能會從protected中通過lru淘汰到probation中。

讀寫操作後都有後續的一系列“維護”操作

一般快取對讀寫操作後都有後續的一系列“維護”操作,Caffeine 也不例外,

這些“維護”操作都在maintenance方法,我們將要說到的淘汰策略也在裡面。

maintenance 過程

這方法比較重要,下面也會提到,

所以這裡只先說跟“淘汰策略”有關的evictEntriesclimb


  /**
   * Performs the pending maintenance work and sets the state flags during processing to avoid
   * excess scheduling attempts.
   * 
   * 排空:
   * 1 The read buffer 
   * 2 write buffer 
   * 3. reference queues
   * 
   * The read buffer, write buffer, and reference queues are drained,
   * followed by expiration, and size-based eviction.
   *
   * @param task an additional pending task to run, or {@code null} if not present
   */
  @GuardedBy("evictionLock")
  void maintenance(@Nullable Runnable task) {
    setDrainStatusRelease(PROCESSING_TO_IDLE);

    try {
      drainReadBuffer();

      drainWriteBuffer();
      if (task != null) {
        task.run();
      }

      drainKeyReferences();
      drainValueReferences();
      //過期符合條件的記錄
      expireEntries();
      //淘汰符合條件的記錄
      evictEntries();

     //動態調整window區和protected區的大小
      climb();
    } finally {
      if ((drainStatusOpaque() != PROCESSING_TO_IDLE) || !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) {
        setDrainStatusOpaque(REQUIRED);
      }
    }
  }
  1. 設定狀態位為 PROCESSING_TO_IDLE
  2. 清空讀快取
  3. 清空寫快取
  4. 一般只有 afterWrite 的情況有正在執行的 task(比如新增快取項時發現已達到最大上限,此時 task 就是正在進行的新增快取的操作),如果有則執行 task
  5. 清空 key 引用和 value 引用佇列
  6. 處理過期項
  7. 淘汰項
  8. 調整視窗比例(climbing hill 演算法)
  9. 嘗試將狀態從 PROCESSING_TO_IDLE 改成 IDLE,否則記為 REQUIRED

這裡有一個小設計:BLCHeader.DrainStatusRef<K, V> 包含一個 volatile 的 drainStatus 狀態位 + 15 個 long 的填充位。

注:lazySetDrainStatus 本質呼叫的是 unsafe 的 putOrderedInt 方法,可以 lazy set 一個 volatile 的變數

W-TinyLFU 策略的實現用到的資料結構

先說一下 Caffeine 對上面說到的 W-TinyLFU 策略的實現用到的資料結構:

//最大的個數限制
long maximum;
//當前的個數
long weightedSize;
//window區的最大限制
long windowMaximum;
//window區當前的個數
long windowWeightedSize;
//protected區的最大限制
long mainProtectedMaximum;
//protected區當前的個數
long mainProtectedWeightedSize;
//下一次需要調整的大小(還需要進一步計算)
double stepSize;
//window區需要調整的大小
long adjustment;
//命中計數
int hitsInSample;
//不命中的計數
int missesInSample;
//上一次的快取命中率
double previousSampleHitRate;
 
final FrequencySketch<K> sketch;
//window區的LRU queue(FIFO)
final AccessOrderDeque<Node<K, V>> accessOrderWindowDeque;
//probation區的LRU queue(FIFO)
final AccessOrderDeque<Node<K, V>> accessOrderProbationDeque;
//protected區的LRU queue(FIFO)
final AccessOrderDeque<Node<K, V>> accessOrderProtectedDeque;

以及預設比例設定(意思看註釋)

/** The initial percent of the maximum weighted capacity dedicated to the main space. */
static final double PERCENT_MAIN = 0.99d;
/** The percent of the maximum weighted capacity dedicated to the main's protected space. */
static final double PERCENT_MAIN_PROTECTED = 0.80d;
/** The difference in hit rates that restarts the climber. */
static final double HILL_CLIMBER_RESTART_THRESHOLD = 0.05d;
/** The percent of the total size to adapt the window by. */
static final double HILL_CLIMBER_STEP_PERCENT = 0.0625d;
/** The rate to decrease the step size to adapt by. */
static final double HILL_CLIMBER_STEP_DECAY_RATE = 0.98d;
/** The maximum number of entries that can be transfered between queues. */

/** The initial percent of the maximum weighted capacity dedicated to the main space. */

static final double PERCENT_MAIN = 0.99d;

/** The percent of the maximum weighted capacity dedicated to the main's protected space. */
static final double PERCENT_MAIN_PROTECTED = 0.80d;

/** The difference in hit rates that restarts the climber. */

static final double HILL_CLIMBER_RESTART_THRESHOLD = 0.05d;
/** The percent of the total size to adapt the window by. */

static final double HILL_CLIMBER_STEP_PERCENT = 0.0625d;
/** The rate to decrease the step size to adapt by. */

static final double HILL_CLIMBER_STEP_DECAY_RATE = 0.98d;

/** The maximum number of entries that can be transfered between queues. */

重點來了,evictEntriesclimb方法:

/** Evicts entries if the cache exceeds the maximum. */
@GuardedBy("evictionLock")
void evictEntries() {
  if (!evicts()) {
    return;
  }
  //淘汰window區的記錄
  int candidates = evictFromWindow();
  //淘汰Main區的記錄
  evictFromMain(candidates);
}

evictFromWindow方法

 
/**
 * Evicts entries from the window space into the main space while the window size exceeds a
 * maximum.
 *
 * @return the number of candidate entries evicted from the window space
 */
//根據W-TinyLFU,新的資料都會無條件的加到admission window
//但是window是有大小限制,所以要“定期”做一下“維護”
@GuardedBy("evictionLock")
int evictFromWindow() {
  int candidates = 0;
  //檢視window queue的頭部節點
  Node<K, V> node = accessOrderWindowDeque().peek();
  //如果window區超過了最大的限制,那麼就要把“多出來”的記錄做處理
  while (windowWeightedSize() > windowMaximum()) {
    // The pending operations will adjust the size to reflect the correct weight
    if (node == null) {
      break;
    }
    //下一個節點
    Node<K, V> next = node.getNextInAccessOrder();
    if (node.getWeight() != 0) {
      //把node定位在probation區
      node.makeMainProbation();
      //從window區去掉
      accessOrderWindowDeque().remove(node);
      //加入到probation queue,相當於把節點移動到probation區(晉升了)
      accessOrderProbationDeque().add(node);
      candidates++;
      //因為移除了一個節點,所以需要調整window的size
      setWindowWeightedSize(windowWeightedSize() - node.getPolicyWeight());
    }
    //處理下一個節點
    node = next;
  }
 
  return candidates;
}

evictFromMain方法:

 //根據W-TinyLFU,從window晉升過來的要跟probation區的進行“PK”,勝者才能留下

  @GuardedBy("evictionLock")
  void evictFromMain(@Nullable Node<K, V> candidate) {
    int victimQueue = PROBATION;
    int candidateQueue = PROBATION;

    // 迭代處理 考察區 probation queue
    // victim 是probation queue的頭部
    Node<K, V> victim = accessOrderProbationDeque().peekFirst();
    while (weightedSize() > maximum()) {
      // Search the admission window for additional candidates
      // candidate 剛從window晉升來的, 最先晉升的那個 元素
      if ((candidate == null) && (candidateQueue == PROBATION)) {
        candidate = accessOrderWindowDeque().peekFirst();
        candidateQueue = WINDOW;
      }

      // Try evicting from the protected and window queues
      if ((candidate == null) && (victim == null)) {
        if (victimQueue == PROBATION) {
          victim = accessOrderProtectedDeque().peekFirst();
          victimQueue = PROTECTED;
          continue;
        } else if (victimQueue == PROTECTED) {
          victim = accessOrderWindowDeque().peekFirst();
          victimQueue = WINDOW;
          continue;
        }

        // The pending operations will adjust the size to reflect the correct weight
        break;
      }

      // Skip over entries with zero weight
      if ((victim != null) && (victim.getPolicyWeight() == 0)) {
        victim = victim.getNextInAccessOrder();
        continue;
      } else if ((candidate != null) && (candidate.getPolicyWeight() == 0)) {
        candidate = candidate.getNextInAccessOrder();
        continue;
      }

      // Evict immediately if only one of the entries is present
      if (victim == null) {
        @SuppressWarnings("NullAway")
        Node<K, V> previous = candidate.getNextInAccessOrder();
        Node<K, V> evict = candidate;
        candidate = previous;
        evictEntry(evict, RemovalCause.SIZE, 0L);
        continue;
      } else if (candidate == null) {
        Node<K, V> evict = victim;
        victim = victim.getNextInAccessOrder();
        evictEntry(evict, RemovalCause.SIZE, 0L);
        continue;
      }

      // Evict immediately if both selected the same entry
      if (candidate == victim) {
        victim = victim.getNextInAccessOrder();
        evictEntry(candidate, RemovalCause.SIZE, 0L);
        candidate = null;
        continue;
      }

      // Evict immediately if an entry was collected
      K victimKey = victim.getKey();
      K candidateKey = candidate.getKey();
      if (victimKey == null) {
        Node<K, V> evict = victim;
        victim = victim.getNextInAccessOrder();
        evictEntry(evict, RemovalCause.COLLECTED, 0L);
        continue;
      } else if (candidateKey == null) {
        Node<K, V> evict = candidate;
        candidate = candidate.getNextInAccessOrder();
        evictEntry(evict, RemovalCause.COLLECTED, 0L);
        continue;
      }

      // Evict immediately if an entry was removed
      if (!victim.isAlive()) {
        Node<K, V> evict = victim;
        victim = victim.getNextInAccessOrder();
        evictEntry(evict, RemovalCause.SIZE, 0L);
        continue;
      } else if (!candidate.isAlive()) {
        Node<K, V> evict = candidate;
        candidate = candidate.getNextInAccessOrder();
        evictEntry(evict, RemovalCause.SIZE, 0L);
        continue;
      }

      // Evict immediately if the candidate's weight exceeds the maximum
      if (candidate.getPolicyWeight() > maximum()) {
        Node<K, V> evict = candidate;
        candidate = candidate.getNextInAccessOrder();
        evictEntry(evict, RemovalCause.SIZE, 0L);
        continue;
      }

      // Evict the entry with the lowest frequency
      //根據節點的統計頻率frequency來做比較,看看要處理掉victim還是candidate
      // admit = ture : 准許 candidate 加入 ;false:淘汰 candidate

      if (admit(candidateKey, victimKey)) {
        Node<K, V> evict = victim;
         victim = victim.getNextInAccessOrder();
        //  刪除  victim ,從而留下 candidate
        evictEntry(evict, RemovalCause.SIZE, 0L);
        candidate = candidate.getNextInAccessOrder();
      } else {
        Node<K, V> evict = candidate;
        candidate = candidate.getNextInAccessOrder();
        //  刪除  candidate  ,從而留下 victim
        evictEntry(evict, RemovalCause.SIZE, 0L);
      }
    }
  }

climb方法主要是用來調整 window size 的,使得 Caffeine 可以適應你的應用型別(如 OLAP 或 OLTP)表現出最佳的命中率。

下圖是官方測試的資料:

調整時用到的預設比例資料:

//與上次命中率之差的閾值
static final double HILL_CLIMBER_RESTART_THRESHOLD = 0.05d;
//步長(調整)的大小(跟最大值maximum的比例)
static final double HILL_CLIMBER_STEP_PERCENT = 0.0625d;
//步長的衰減比例
static final double HILL_CLIMBER_STEP_DECAY_RATE = 0.98d;
  /** Adapts the eviction policy to towards the optimal recency / frequency configuration. */
//climb方法的主要作用就是動態調整window區的大小(相應的,main區的大小也會發生變化,兩個之和為100%)。
//因為區域的大小發生了變化,那麼區域內的資料也可能需要發生相應的移動。
@GuardedBy("evictionLock")
void climb() {
  if (!evicts()) {
    return;
  }
  //確定window需要調整的大小
  determineAdjustment();
  //如果protected區有溢位,把溢位部分移動到probation區。因為下面的操作有可能需要調整到protected區。
  demoteFromMainProtected();
  long amount = adjustment();
  if (amount == 0) {
    return;
  } else if (amount > 0) {
    //增加window的大小
    increaseWindow();
  } else {
    //減少window的大小
    decreaseWindow();
  }
}

下面分別展開每個方法來解釋:

/** Calculates the amount to adapt the window by and sets {@link #adjustment()} accordingly. */
@GuardedBy("evictionLock")
void determineAdjustment() {
  //如果frequencySketch還沒初始化,則返回
  if (frequencySketch().isNotInitialized()) {
    setPreviousSampleHitRate(0.0);
    setMissesInSample(0);
    setHitsInSample(0);
    return;
  }
  //總請求量 = 命中 + miss
  int requestCount = hitsInSample() + missesInSample();
  //沒達到sampleSize則返回
  //預設下sampleSize = 10 * maximum。用sampleSize來判斷快取是否足夠”熱“。
  if (requestCount < frequencySketch().sampleSize) {
    return;
  }

  //命中率的公式 = 命中 / 總請求
  double hitRate = (double) hitsInSample() / requestCount;
  //命中率的差值
  double hitRateChange = hitRate - previousSampleHitRate();
  //本次調整的大小,是由命中率的差值和上次的stepSize決定的
  double amount = (hitRateChange >= 0) ? stepSize() : -stepSize();
  //下次的調整大小:如果命中率的之差大於0.05,則重置為0.065 * maximum,否則按照0.98來進行衰減
  double nextStepSize = (Math.abs(hitRateChange) >= HILL_CLIMBER_RESTART_THRESHOLD)
      ? HILL_CLIMBER_STEP_PERCENT * maximum() * (amount >= 0 ? 1 : -1)
      : HILL_CLIMBER_STEP_DECAY_RATE * amount;
  setPreviousSampleHitRate(hitRate);
  setAdjustment((long) amount);
  setStepSize(nextStepSize);
  setMissesInSample(0);
  setHitsInSample(0);
}

/** Transfers the nodes from the protected to the probation region if it exceeds the maximum. */

//這個方法比較簡單,減少protected區溢位的部分
@GuardedBy("evictionLock")
void demoteFromMainProtected() {
  long mainProtectedMaximum = mainProtectedMaximum();
  long mainProtectedWeightedSize = mainProtectedWeightedSize();
  if (mainProtectedWeightedSize <= mainProtectedMaximum) {
    return;
  }

  for (int i = 0; i < QUEUE_TRANSFER_THRESHOLD; i++) {
    if (mainProtectedWeightedSize <= mainProtectedMaximum) {
      break;
    }

    Node<K, V> demoted = accessOrderProtectedDeque().poll();
    if (demoted == null) {
      break;
    }
    demoted.makeMainProbation();
    accessOrderProbationDeque().add(demoted);
    mainProtectedWeightedSize -= demoted.getPolicyWeight();
  }
  setMainProtectedWeightedSize(mainProtectedWeightedSize);
}

/**
 * Increases the size of the admission window by shrinking the portion allocated to the main
 * space. As the main space is partitioned into probation and protected regions (80% / 20%), for
 * simplicity only the protected is reduced. If the regions exceed their maximums, this may cause
 * protected items to be demoted to the probation region and probation items to be demoted to the
 * admission window.
 */

//增加window區的大小,這個方法比較簡單,思路就像我上面說的
@GuardedBy("evictionLock")
void increaseWindow() {
  if (mainProtectedMaximum() == 0) {
    return;
  }

  long quota = Math.min(adjustment(), mainProtectedMaximum());
  setMainProtectedMaximum(mainProtectedMaximum() - quota);
  setWindowMaximum(windowMaximum() + quota);
  demoteFromMainProtected();

  for (int i = 0; i < QUEUE_TRANSFER_THRESHOLD; i++) {
    Node<K, V> candidate = accessOrderProbationDeque().peek();
    boolean probation = true;
    if ((candidate == null) || (quota < candidate.getPolicyWeight())) {
      candidate = accessOrderProtectedDeque().peek();
      probation = false;
    }
    if (candidate == null) {
      break;
    }

    int weight = candidate.getPolicyWeight();
    if (quota < weight) {
      break;
    }

    quota -= weight;
    if (probation) {
      accessOrderProbationDeque().remove(candidate);
    } else {
      setMainProtectedWeightedSize(mainProtectedWeightedSize() - weight);
      accessOrderProtectedDeque().remove(candidate);
    }
    setWindowWeightedSize(windowWeightedSize() + weight);
    accessOrderWindowDeque().add(candidate);
    candidate.makeWindow();
  }

  setMainProtectedMaximum(mainProtectedMaximum() + quota);
  setWindowMaximum(windowMaximum() - quota);
  setAdjustment(quota);
}

/** Decreases the size of the admission window and increases the main's protected region. */
//同上increaseWindow差不多,反操作
@GuardedBy("evictionLock")
void decreaseWindow() {
  if (windowMaximum() <= 1) {
    return;
  }

  long quota = Math.min(-adjustment(), Math.max(0, windowMaximum() - 1));
  setMainProtectedMaximum(mainProtectedMaximum() + quota);
  setWindowMaximum(windowMaximum() - quota);

  for (int i = 0; i < QUEUE_TRANSFER_THRESHOLD; i++) {
    Node<K, V> candidate = accessOrderWindowDeque().peek();
    if (candidate == null) {
      break;
    }

    int weight = candidate.getPolicyWeight();
    if (quota < weight) {
      break;
    }

    quota -= weight;
    setMainProtectedWeightedSize(mainProtectedWeightedSize() + weight);
    setWindowWeightedSize(windowWeightedSize() - weight);
    accessOrderWindowDeque().remove(candidate);
    accessOrderProbationDeque().add(candidate);
    candidate.makeMainProbation();
  }

  setMainProtectedMaximum(mainProtectedMaximum() - quota);
  setWindowMaximum(windowMaximum() + quota);
  setAdjustment(-quota);
}

以上,是 Caffeine 的 W-TinyLFU 策略的設計原理及程式碼實現解析。

說明:本文會持續更新,最新PDF電子版本,請從下面的連接獲取: 語雀 或者 碼雲

清空讀緩衝

清空就是將所有的 readBuffer 使用 accessPolicy 清空。


  /** Drains the read buffer. */
  @GuardedBy("evictionLock")
  void drainReadBuffer() {
    if (!skipReadBuffer()) {
      readBuffer.drainTo(accessPolicy);
    }
  }

accessPolicy 在前面設定了的

accessPolicy = (evicts() || expiresAfterAccess()) ? this::onAccess : e -> {};

onAccess的程式碼如下:

 
  /** Updates the node's location in the page replacement policy. */
  @GuardedBy("evictionLock")
  void onAccess(Node<K, V> node) {
    if (evicts()) {
      K key = node.getKey();
      if (key == null) {
        return;
      }
      frequencySketch().increment(key);
      if (node.inWindow()) {
        reorder(accessOrderWindowDeque(), node);
      } else if (node.inMainProbation()) {
        reorderProbation(node);
      } else {
        reorder(accessOrderProtectedDeque(), node);
      }
      setHitsInSample(hitsInSample() + 1);
    } else if (expiresAfterAccess()) {
      reorder(accessOrderWindowDeque(), node);
    }
    if (expiresVariable()) {
      timerWheel().reschedule(node);
    }
  }

這個 onAccess 主要是:

  • 統計 tinyLFU 的計數
  • 將節點在佇列中重排序,
  • 以及更新統計資訊。

Caffeine 的效能比較

吞吐量 PK

可以清楚的看到Caffeine效率明顯的高於其他快取。

Read (100%)

In this benchmark 8 threads concurrently read from a cache configured with a maximum size.

Read (75%) / Write (25%)

In this benchmark 6 threads concurrently read from and 2 threads write to a cache configured with a maximum size.

Write (100%)

In this benchmark 8 threads concurrently write to a cache configured with a maximum size.

Server-class

The benchmarks were run on an Azure G4 instance, the largest available during a free trial from the major cloud providers. The machine was reported as a single socket Xeon E5-2698B v3 @ 2.00GHz (16 core, hyperthreading disabled), 224 GB, Ubuntu 15.04.

Compute

Cache same key spread
ConcurrentHashMap 29,679,839 65,726,864
Caffeine 1,581,524,763 530,182,873
Guava 25,132,366 114,608,951

Read (100%)

Unbounded ops/s (8 threads) ops/s (16 threads)
ConcurrentHashMap (v8) 560,367,163 1,171,389,095
ConcurrentHashMap (v7) 301,331,240 542,304,172
Bounded
Caffeine 181,703,298 382,355,194
ConcurrentLinkedHashMap 154,771,582 313,892,223
LinkedHashMap_Lru 9,209,065 13,598,576
Guava (default) 12,434,655 10,647,238
Guava (64) 24,533,922 43,101,468
Ehcache2_Lru 11,252,172 20,750,543
Ehcache3_Lru 11,415,248 17,611,169
Infinispan_Old_Lru 29,073,439 49,719,833
Infinispan_New_Lru 4,888,027 4,749,506

Read (75%) / Write (25%)

Unbounded ops/s (8 threads) ops/s (16 threads)
ConcurrentHashMap (v8) 441,965,711 790,602,730
ConcurrentHashMap (v7) 196,215,481 346,479,582
Bounded
Caffeine 144,193,725 279,440,749
ConcurrentLinkedHashMap 63,968,369 122,342,605
LinkedHashMap_Lru 8,668,785 12,779,625
Guava (default) 11,782,063 11,886,673
Guava (64) 22,782,431 37,332,090
Ehcache2_Lru 9,472,810 8,471,016
Ehcache3_Lru 10,958,697 17,302,523
Infinispan_Old_Lru 22,663,359 37,270,102
Infinispan_New_Lru 4,753,313 4,885,061

Write (100%)

Unbounded ops/s (8 threads) ops/s (16 threads)
ConcurrentHashMap (v8) 60,477,550 50,591,346
ConcurrentHashMap (v7) 46,204,091 36,659,485
Bounded
Caffeine 55,281,751 48,295,360
ConcurrentLinkedHashMap 23,819,597 39,797,969
LinkedHashMap_Lru 10,179,891 10,859,549
Guava (default) 4,764,056 5,446,282
Guava (64) 8,128,024 7,483,986
Ehcache2_Lru 4,205,936 4,697,745
Ehcache3_Lru 10,051,020 13,939,317
Infinispan_Old_Lru 7,538,859 7,332,973
Infinispan_New_Lru 4,797,502 5,086,305

非同步的高效能讀寫

解決CAS惡性空自旋的有效方式之一是以空間換時間,較為常見的方案有兩種:分散操作熱
點、使用佇列削峰。

一般的快取每次對資料處理完之後(讀的話,已經存在則直接返回,不存在則 load 資料,儲存,再返回;寫的話,則直接插入或更新)

,但是因為要維護一些淘汰策略,則需要一些額外的操作,諸如:

  • 計算和比較資料的是否過期
  • 統計頻率(像 LFU 或其變種)
  • 維護 read queue 和 write queue
  • 淘汰符合條件的資料
  • 等等。。。

這種資料的讀寫伴隨著快取狀態的變更,Guava Cache 的做法是把這些操作和讀寫操作放在一起,在一個同步加鎖的操作中完成,

雖然 Guava Cache 巧妙地利用了 JDK 的 ConcurrentHashMap(分段鎖或者無鎖 CAS)來降低鎖的密度,達到提高併發度的目的。

但是,對於一些熱點資料,這種做法還是避免不了頻繁的鎖競爭。

Caffeine 借鑑了資料庫系統的 WAL(Write-Ahead Logging)思想,即:先寫日誌,再執行操作

這種思想同樣適合快取的,

執行讀寫操作時,先把操作記錄在緩衝區,然後在合適的時機非同步、批量地執行緩衝區中的內容。

但在執行緩衝區的內容時,也是需要在緩衝區加上同步鎖的,不然存在併發問題,

只不過這樣就可以把對鎖的競爭從快取資料轉移到對緩衝區上。

ReadBuffer 讀緩衝

在 Caffeine 的內部實現中,為了很好的支援不同的 Features(如 Eviction,Removal,Refresh,Statistics,Cleanup,Policy 等等),擴充套件了很多子類,

它們共同的父類是BoundedLocalCache,而readBuffer就是作為它們共有的屬性,即都是用一樣的 readBuffer,

ReadBuffer 讀緩衝定義

final Buffer<Node<K, V>> readBuffer;


ReadBuffer 讀緩衝初始化:


readBuffer = evicts() || collectKeys() || collectValues() || expiresAfterAccess()
        ? new BoundedBuffer<>()
        : Buffer.disabled();
        

readBuffer 的型別是 BoundedBuffer,它的實現是一個 Striped Ring (條帶隔離的 環形) 的 buffer

首先考慮到 readBuffer 的特點是多生產者-單消費者(MPSC),所以只需要考慮寫入端的併發問題。

生產者並行(可能存在競爭)讀取計數,檢查是否有可用的容量,如果可用,則嘗試一下 CAS 寫入計數的操作。

如果增量成功,則生產者會懶釋出這個元素。

由於 CAS 失敗或緩衝區已滿而失敗時,生產方不會重試或阻塞。

消費者讀取計數並獲取可用元素,然後清除元素的並懶設定讀取計數。

緩衝區分成很多條帶(這就是 Striped 的含義)。

如果檢測到競爭,則重新雜湊、並動態新增新緩衝區來進一步提高併發性,直到一個內部最大值。

當重新雜湊發現了可用的緩衝區時,生產者可以重試新增元素以確定是否找到了滿足的緩衝區,或者是否有必要調整大小。

具體程式碼不再列舉,一些關鍵引數:

  • 每條 ring buffer 允許 16 個元素(每個元素一個 4 位元組的引用)
  • 最大允許 4 * ceilingNextPowerOfTwo(CPU 數) 條 ring buffer

ceilingNextPowerOfTwo 表示向上取 2 的整數冪

觸發afterRead

Caffeine 對每次快取的讀操作都會觸發afterRead

/**
 * Performs the post-processing work required after a read.
 *
 * @param node the entry in the page replacement policy
 * @param now the current time, in nanoseconds
 * @param recordHit if the hit count should be incremented
 */
void afterRead(Node<K, V> node, long now, boolean recordHit) {
  if (recordHit) {
    statsCounter().recordHits(1);
  }
  //把記錄加入到readBuffer
  //判斷是否需要立即處理readBuffer
  //注意這裡無論offer是否成功都可以走下去的,即允許寫入readBuffer丟失,因為這個
  
  boolean delayable = skipReadBuffer() || (readBuffer.offer(node) != Buffer.FULL);
  if (shouldDrainBuffers(delayable)) {
    scheduleDrainBuffers();
  }
  refreshIfNeeded(node, now);
}

 /**
   * Returns whether maintenance work is needed.
   *
   * @param delayable if draining the read buffer can be delayed
   */

  //caffeine用了一組狀態來定義和管理“維護”的過程
  boolean shouldDrainBuffers(boolean delayable) {
    switch (drainStatus()) {
      case IDLE:
        return !delayable;
      case REQUIRED:
        return true;
      case PROCESSING_TO_IDLE:
      case PROCESSING_TO_REQUIRED:
        return false;
      default:
        throw new IllegalStateException();
    }
  }

重點看BoundedBuffer

/**
 * A striped, non-blocking, bounded buffer.
 *
 * @author [email protected] (Ben Manes)
 * @param <E> the type of elements maintained by this buffer
 */
final class BoundedBuffer<E> extends StripedBuffer<E>

它是一個 striped、非阻塞、有界限的 buffer,繼承於StripedBuffer類。

下面看看StripedBuffer的實現:

/**
 * A base class providing the mechanics for supporting dynamic striping of bounded buffers. This
 * implementation is an adaption of the numeric 64-bit {@link java.util.concurrent.atomic.Striped64}
 * class, which is used by atomic counters. The approach was modified to lazily grow an array of
 * buffers in order to minimize memory usage for caches that are not heavily contended on.
 *
 * @author [email protected] (Doug Lea)
 * @author [email protected] (Ben Manes)
 */

abstract class StripedBuffer<E> implements Buffer<E>

分散操作熱點

解決CAS惡性空自旋的有效方式之一是以空間換時間,

較為常見的方案有兩種:

  • 分散操作熱點、
  • 使用佇列削峰。

這個StripedBuffer設計的思想是跟Striped64類似的,通過擴充套件結構把分散操作熱點(/競爭熱點分離)

具體實現是這樣的,StripedBuffer維護一個Buffer[]陣列,每個元素就是一個RingBuffer,

每個執行緒用自己threadLocalRandomProbe屬性作為 hash 值,這樣就相當於每個執行緒都有自己“專屬”的RingBuffer,就不會產生競爭啦,

而不是用 key 的hashCode作為 hash 值,因為會產生熱點資料問題。

看看StripedBuffer的屬性

/** Table of buffers. When non-null, size is a power of 2. */
//RingBuffer陣列
transient volatile Buffer<E> @Nullable[] table;

//當進行resize時,需要整個table鎖住。tableBusy作為CAS的標記。
static final long TABLE_BUSY = UnsafeAccess.objectFieldOffset(StripedBuffer.class, "tableBusy");
static final long PROBE = UnsafeAccess.objectFieldOffset(Thread.class, "threadLocalRandomProbe");

/** Number of CPUS. */
static final int NCPU = Runtime.getRuntime().availableProcessors();

/** The bound on the table size. */
//table最大size
static final int MAXIMUM_TABLE_SIZE = 4 * ceilingNextPowerOfTwo(NCPU);

/** The maximum number of attempts when trying to expand the table. */
//如果發生競爭時(CAS失敗)的嘗試次數
static final int ATTEMPTS = 3;

/** Table of buffers. When non-null, size is a power of 2. */
//核心資料結構
transient volatile Buffer<E> @Nullable[] table;

/** Spinlock (locked via CAS) used when resizing and/or creating Buffers. */
transient volatile int tableBusy;

/** CASes the tableBusy field from 0 to 1 to acquire lock. */
final boolean casTableBusy() {
  return UnsafeAccess.UNSAFE.compareAndSwapInt(this, TABLE_BUSY, 0, 1);
}

/**
 * Returns the probe value for the current thread. Duplicated from ThreadLocalRandom because of
 * packaging restrictions.
 */
static final int getProbe() {
  return UnsafeAccess.UNSAFE.getInt(Thread.currentThread(), PROBE);
}
/** Table of buffers. When non-null, size is a power of 2. *///RingBuffer陣列transient volatile Buffer<E> @Nullable[] table; //當進行resize時,需要整個table鎖住。tableBusy作為CAS的標記。static final long TABLE_BUSY = UnsafeAccess.objectFieldOffset(StripedBuffer.class, "tableBusy");static final long PROBE = UnsafeAccess.objectFieldOffset(Thread.class, "threadLocalRandomProbe"); /** Number of CPUS. */static final int NCPU = Runtime.getRuntime().availableProcessors(); /** The bound on the table size. *///table最大sizestatic final int MAXIMUM_TABLE_SIZE = 4 * ceilingNextPowerOfTwo(NCPU); /** The maximum number of attempts when trying to expand the table. *///如果發生競爭時(CAS失敗)的嘗試次數static final int ATTEMPTS = 3; /** Table of buffers. When non-null, size is a power of 2. *///核心資料結構transient volatile Buffer<E> @Nullable[] table; /** Spinlock (locked via CAS) used when resizing and/or creating Buffers. */transient volatile int tableBusy; /** CASes the tableBusy field from 0 to 1 to acquire lock. */final boolean casTableBusy() {  return UnsafeAccess.UNSAFE.compareAndSwapInt(this, TABLE_BUSY, 0, 1);} /** * Returns the probe value for the current thread. Duplicated from ThreadLocalRandom because of * packaging restrictions. */static final int getProbe() {  return UnsafeAccess.UNSAFE.getInt(Thread.currentThread(), PROBE);}

offer方法,當沒初始化或存在競爭時,則擴容為 2 倍。

實際是呼叫RingBuffer的 offer 方法,把資料追加到RingBuffer後面。

@Override
public int offer(E e) {
  int mask;
  int result = 0;
  Buffer<E> buffer;
  //是否不存在競爭
  boolean uncontended = true;
  Buffer<E>[] buffers = table
  //是否已經初始化
  if ((buffers == null)
      || (mask = buffers.length - 1) < 0
      //用thread的隨機值作為hash值,得到對應位置的RingBuffer
      || (buffer = buffers[getProbe() & mask]) == null
      //檢查追加到RingBuffer是否成功
      || !(uncontended = ((result = buffer.offer(e)) != Buffer.FAILED))) {
    //其中一個符合條件則進行擴容
    expandOrRetry(e, uncontended);
  }
  return result;
}

/**
 * Handles cases of updates involving initialization, resizing, creating new Buffers, and/or
 * contention. See above for explanation. This method suffers the usual non-modularity problems of
 * optimistic retry code, relying on rechecked sets of reads.
 *
 * @param e the element to add
 * @param wasUncontended false if CAS failed before call
 */

//這個方法比較長,但思路還是相對清晰的。
@SuppressWarnings("PMD.ConfusingTernary")
final void expandOrRetry(E e, boolean wasUncontended) {
  int h;
  if ((h = getProbe()) == 0) {
    ThreadLocalRandom.current(); // force initialization
    h = getProbe();
    wasUncontended = true;
  }
  boolean collide = false; // True if last slot nonempty
  for (int attempt = 0; attempt < ATTEMPTS; attempt++) {
    Buffer<E>[] buffers;
    Buffer<E> buffer;
    int n;
    if (((buffers = table) != null) && ((n = buffers.length) > 0)) {
      if ((buffer = buffers[(n - 1) & h]) == null) {
        if ((tableBusy == 0) && casTableBusy()) { // Try to attach new Buffer
          boolean created = false;
          try { // Recheck under lock
            Buffer<E>[] rs;
            int mask, j;
            if (((rs = table) != null) && ((mask = rs.length) > 0)
                && (rs[j = (mask - 1) & h] == null)) {
              rs[j] = create(e);
              created = true;
            }
          } finally {
            tableBusy = 0;
          }
          if (created) {
            break;
          }
          continue; // Slot is now non-empty
        }
        collide = false;
      } else if (!wasUncontended) { // CAS already known to fail
        wasUncontended = true;      // Continue after rehash
      } else if (buffer.offer(e) != Buffer.FAILED) {
        break;
      } else if (n >= MAXIMUM_TABLE_SIZE || table != buffers) {
        collide = false; // At max size or stale
      } else if (!collide) {
        collide = true;
      } else if (tableBusy == 0 && casTableBusy()) {
        try {
          if (table == buffers) { // Expand table unless stale
            table = Arrays.copyOf(buffers, n << 1);
          }
        } finally {
          tableBusy = 0;
        }
        collide = false;
        continue; // Retry with expanded table
      }
      h = advanceProbe(h);
    } else if ((tableBusy == 0) && (table == buffers) && casTableBusy()) {
      boolean init = false;
      try { // Initialize table
        if (table == buffers) {
          @SuppressWarnings({"unchecked", "rawtypes"})
          Buffer<E>[] rs = new Buffer[1];
          rs[0] = create(e);
          table = rs;
          init = true;
        }
      } finally {
        tableBusy = 0;
      }
      if (init) {
        break;
      }
    }
  }
}

最後看看RingBuffer,注意RingBuffer是BoundedBuffer的內部類。

/** The maximum number of elements per buffer. */
static final int BUFFER_SIZE = 16;

// Assume 4-byte references and 64-byte cache line (16 elements per line)
//256長度,但是是以16為單位,所以最多存放16個元素
static final int SPACED_SIZE = BUFFER_SIZE << 4;
static final int SPACED_MASK = SPACED_SIZE - 1;
static final int OFFSET = 16;
//RingBuffer陣列
final AtomicReferenceArray<E> buffer;

 //插入方法
 @Override
 public int offer(E e) {
   long head = readCounter;
   long tail = relaxedWriteCounter();
   //用head和tail來限制個數
   long size = (tail - head);
   if (size >= SPACED_SIZE) {
     return Buffer.FULL;
   }
   //tail追加16
   if (casWriteCounter(tail, tail + OFFSET)) {
     //用tail“取餘”得到下標
     int index = (int) (tail & SPACED_MASK);
     //用unsafe.putOrderedObject設值
     buffer.lazySet(index, e);
     return Buffer.SUCCESS;
   }
   //如果CAS失敗則返回失敗
   return Buffer.FAILED;
 }

 //用consumer來處理buffer的資料
 @Override
 public void drainTo(Consumer<E> consumer) {
   long head = readCounter;
   long tail = relaxedWriteCounter();
   //判斷資料多少
   long size = (tail - head);
   if (size == 0) {
     return;
   }
   do {
     int index = (int) (head & SPACED_MASK);
     E e = buffer.get(index);
     if (e == null) {
       // not published yet
       break;
     }
     buffer.lazySet(index, null);
     consumer.accept(e);
     //head也跟tail一樣,每次遞增16
     head += OFFSET;
   } while (head != tail);
   lazySetReadCounter(head);
 }

注意,ring buffer 的 size(固定是 16 個)是不變的,變的是 head 和 tail 而已。

總的來說ReadBuffer有如下特點:

  • 使用 Striped-RingBuffer來提升對 buffer 的讀寫
  • 用 thread 的 hash 來避開熱點 key 的競爭
  • 允許寫入的丟失

WriteBuffer

本來快取的一般場景是讀多寫少的,讀的併發會更高,且 afterRead 顯得沒那麼重要,允許延遲甚至丟失。

writeBuffer跟readBuffer不一樣,主要體現在使用場景的不一樣。

寫不一樣,寫afterWrite不允許丟失,且要求儘量馬上執行

Caffeine 使用MPSC(Multiple Producer / Single Consumer)作為 buffer 陣列,

實現在MpscGrowableArrayQueue類,它是仿照JCTools的MpscGrowableArrayQueue來寫的。

MPSC 允許無鎖的高併發寫入,但只允許一個消費者,同時也犧牲了部分操作。

TimerWheel

除了支援expireAfterAccess和expireAfterWrite之外(Guava Cache 也支援這兩個特性),Caffeine 還支援expireAfter。

因為expireAfterAccess和expireAfterWrite都只能是固定的過期時間,這可能滿足不了某些場景,譬如記錄的過期時間是需要根據某些條件而不一樣的,這就需要使用者自定義過期時間。

先看看expireAfter的用法

package com.github.benmanes.caffeine.demo;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry;
import org.checkerframework.checker.index.qual.NonNegative;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.concurrent.TimeUnit;

public class ExpireAfterDemo {
    static System.Logger logger = System.getLogger(ExpireAfterDemo.class.getName());

    public static void hello(String[] args) {
        System.out.println("args = " + args);
    }


    public static void main(String... args) throws Exception {
        Cache<String, String> cache =  Caffeine.newBuilder()
                //最大個數限制
                //最大容量1024個,超過會自動清理空間
                .maximumSize(1024)
                //初始化容量
                .initialCapacity(1)
                //訪問後過期(包括讀和寫)
                //5秒沒有讀寫自動刪除
//                .expireAfterAccess(5, TimeUnit.SECONDS)
                //寫後過期
//                .expireAfterWrite(2, TimeUnit.HOURS)
                //寫後自動非同步重新整理
//                .refreshAfterWrite(1, TimeUnit.HOURS)
                //記錄下快取的一些統計資料,例如命中率等
                .recordStats()
                .removalListener(((key, value, cause) -> {
                    //清理通知 key,value ==> 鍵值對   cause ==> 清理原因
                  System.out.println("removed key="+ key);
                }))
                .expireAfter(new Expiry<String, String>() {
                    //返回建立後的過期時間
                    @Override
                    public long expireAfterCreate(@NonNull String key, @NonNull String value, long currentTime) {
                        System.out.println("1. expireAfterCreate key="+ key);
                        return 0;
                    }

                    //返回更新後的過期時間
                    @Override
                    public long expireAfterUpdate(@NonNull String key, @NonNull String value, long currentTime, @NonNegative long currentDuration) {
                        System.out.println("2. expireAfterUpdate key="+ key);
                        return 0;
                    }

                    //返回讀取後的過期時間
                    @Override
                    public long expireAfterRead(@NonNull String key, @NonNull String value, long currentTime, @NonNegative long currentDuration) {
                        System.out.println("3. expireAfterRead key="+ key);
                        return 0;
                    }
                })
                .recordStats()
                //使用CacheLoader建立一個LoadingCache
                .build(new CacheLoader<String, String>() {
                    //同步載入資料
                    @Nullable
                    @Override
                    public String load(@NonNull String key) throws Exception {
                        System.out.println("loading  key="+ key);
                        return "value_" + key;
                    }

                    //非同步載入資料
                    @Nullable
                    @Override
                    public String reload(@NonNull String key, @NonNull String oldValue) throws Exception {
                        System.out.println("reloading  key="+ key);
                        return "value_" + key;
                    }
                });

        //新增值
        cache.put("name", "瘋狂創客圈");
        cache.put("key", "一個高併發 研究社群");

        //獲取值
        @Nullable String value = cache.getIfPresent("name");
        System.out.println("value = " + value);
        //remove
        cache.invalidate("name");
        value = cache.getIfPresent("name");
        System.out.println("value = " + value);
    }

}

通過自定義過期時間,使得不同的 key 可以動態的得到不同的過期時間。

注意,我把expireAfterAccess和expireAfterWrite註釋了,因為這兩個特性不能跟expireAfter一起使用。

而當使用了expireAfter特性後,Caffeine 會啟用一種叫“時間輪”的演算法來實現這個功能。

為什麼要用時間輪

好,重點來了,為什麼要用時間輪?

對expireAfterAccess和expireAfterWrite的實現是用一個AccessOrderDeque雙端佇列,它是 FIFO 的

因為它們的過期時間是固定的,所以在佇列頭的資料肯定是最早過期的,要處理過期資料時,只需要首先看看頭部是否過期,然後再挨個檢查就可以了。

但是,如果過期時間不一樣的話,這需要對accessOrderQueue進行排序&插入,這個代價太大了。

於是,Caffeine 用了一種更加高效、優雅的演算法-時間輪。

時間輪的結構:

Caffeine 對時間輪的實現在TimerWheel,它是一種多層時間輪(hierarchical timing wheels )。

看看元素加入到時間輪的schedule方法:

/**
 * Schedules a timer event for the node.
 *
 * @param node the entry in the cache
 */
public void schedule(@NonNull Node<K, V> node) {
  Node<K, V> sentinel = findBucket(node.getVariableTime());
  link(sentinel, node);
}

/**
 * Determines the bucket that the timer event should be added to.
 *
 * @param time the time when the event fires
 * @return the sentinel at the head of the bucket
 */
Node<K, V> findBucket(long time) {
  long duration = time - nanos;
  int length = wheel.length - 1;
  for (int i = 0; i < length; i++) {
    if (duration < SPANS[i + 1]) {
      long ticks = (time >>> SHIFT[i]);
      int index = (int) (ticks & (wheel[i].length - 1));
      return wheel[i][index];
    }
  }
  return wheel[length][0];
}

/** Adds the entry at the tail of the bucket's list. */
void link(Node<K, V> sentinel, Node<K, V> node) {
  node.setPreviousInVariableOrder(sentinel.getPreviousInVariableOrder());
  node.setNextInVariableOrder(sentinel);

  sentinel.getPreviousInVariableOrder().setNextInVariableOrder(node);
  sentinel.setPreviousInVariableOrder(node);
}

分層時間輪演算法

分層時間輪演算法是為了更高效的實現定時器而設計的一種資料格式,

像 Netty 、ZooKeepr、Dubbo 這樣的開源專案都有使用到時間輪的實現,其中kafka更進一步使用的是分層時間輪演算法。

定時器的核心需求

  1. 新增(初始化一個定時任務)
  2. 移除(過期任務)
  3. 任務到期檢測

定時器迭代歷史

1,連結串列實現的定時器

直接在一個連結串列中加入一個定時任務節點,每隔一個最小時間單位,開始從頭向尾部檢測,並將任務節點中的倒計時-1

  • 如果倒計時變為0,那麼說明該定時任務已經到期,就直接觸發它的執行操作,並將它從連結串列中刪除
  • 如果倒計時還不為0,那麼就繼續往尾部遍歷

時間複雜度:新增O(1),移除O(N),檢測O(N)

缺點:時間複雜度高

2,排序連結串列實現的定時器

還是一個連結串列的資料格式,但是它這個是將各個定時任務的執行時間做了一個排序,然後每個最小時間間隔檢測頭節點

  • 如果頭結點的執行時間與當前時間一致,那麼就開始執行該定時任務操作,並將頭節點移動到next節點,同時也檢測一下next節點
  • 如果頭節點的執行時間與當前時間不一致,那麼就等待下一個時間節點再次檢測

時間複雜度:新增O(N)需要額外排序操作,移除O(N),檢測O(1)只用檢測頭結點

如果使用最小堆新增和移除的時間複雜度都為O(logN)

缺點:時間複雜度高

3,普通時間輪實現的定時器

時間輪的本質就是一個數組,它的長度就是一個時間迴圈

以上圖為例,該時間輪的時間迴圈週期為8個最小時間間隔,時鐘輪詢從0>8>0~>8開始每一個最小時間間隔步進一個單位,然後檢查當前時間輪節點上是否有任務

  • 如果有任務,就直接執行
  • 沒有任務就等待下一個時間間隔步進1重複進行檢測

同時,它原版的會維護一個溢位列表(overflow list有序),因為定時任務有可能沒有在這個時間週期內,那麼就將這些未來需要執行的任務放在溢位列表中,每次時鐘輪詢的時候,檢測一下是否可以新增到時間輪上

時間複雜度:純粹的時間輪-新增O(1),移除O(1),檢測O(1)

但是維護溢位列表需要額外資源,時間複雜度O(N)

缺點:時間複雜度高

分層時間輪實現定時器

本質就是多個時間輪共同一起作用,分時間層級!

以上述圖片為樣例,當前時間為2時59分1秒,新建一個3時0分2秒的定時任務,先將定時任務儲存在小時單位的時間輪上,存放位置為3時;

然後分層時間輪以秒進行驅動步進,秒驅動到59向0切換時,分鐘時間輪也隨之步進1,同理小時時間輪;

如果小時時間輪步進到3時,發現該節點上有一個定時任務,那麼就將該任務轉移到對應的分鐘時間輪上,存放位置為0分;同理如果分鐘時間輪發現當前的節點中有定時任務,那麼就將其轉移到秒時間輪上,存放位置為1;秒時間輪發現當前節點有任務,那麼就直接執行!

時間複雜度:新增O(1),移除O(1),檢測O(1)

請參見視訊《第25章:穿透Caffeine 的架構和原始碼分析》

如使用軟引用和弱引用、

請參見視訊《第25章:穿透Caffeine 的架構和原始碼分析》

消除偽共享、

請參見視訊《第25章:穿透Caffeine 的架構和原始碼分析》

CompletableFuture非同步

請參見視訊《第25章:穿透Caffeine 的架構和原始碼分析》

說明:本文會持續更新,最新PDF電子版本,請從下面的連接獲取: 語雀 或者 碼雲

寬鬆讀寫相關的基礎知識:VarHandle變數控制代碼(指標)技術

在JDK9引入了VarHandle,

變數控制代碼(VarHandle)是對於一個變數的強型別引用,或者是一組引數化定義的變數族,包括了靜態欄位、非靜態欄位、陣列元素等,

VarHandle支援不同訪問模型下對於變數的訪問,包括簡單的read/write訪問,volatile read/write訪問,以及CAS訪問。

VarHandle相比於傳統的對於變數的併發操作具有巨大的優勢,

在JDK9引入了VarHandle之後,JUC包中對於變數的訪問基本上都使用VarHandle,比如AQS中的CLH佇列中使用到的變數等。

VarHandle的作用與優勢

在開始討論VarHandle之前,我們先來回憶一下併發操作裡面的三大特性:原子性、可見性、有序性

volatile變數可以保證可見性、有序性(防止指令重拍),

加鎖或者原子操作、CAS等可以保證原子性,

只有同時滿足這三個特性才能夠保證對於一個變數的併發操作是執行緒安全的、合乎預期的。

加鎖的話,需要對執行緒進行同步,而執行緒上下文切換之間帶來的開銷是很大的,所以這裡不予考慮,考慮一下幾種方式:

  • 使用Atomic包下的原子類進行間接管理,但增加了開銷,也可能導致額外的問題如ABA問題;
  • 使用原子性的FieldUpdaters,利用反射機制,開銷也會增大;
  • 使用sun.misc.Unsafe提供的JVM內建函式,但直接操作JVM可能會損害安全性和可移植性;

針對以上的問題,VarHandle就是用來替代上述方式的一種方案,它提供了一系列標準的記憶體屏障操作,用於更細粒度的控制指令排序,

在安全性、可用性、效能等方面都要優於現有的AIP,且基本上能夠和任何型別的變數相關聯。

建立VarHandle

VarHandle通過MethodHandles類中的內部類Lookup來建立:

package com.github.benmanes.caffeine.demo;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;

public class VarHandleTest {
    private String plainStr;    //普通變數
    private static String staticStr;    //靜態變數
    private String reflectStr;    //通過反射生成控制代碼的變數
    private String[] arrayStr = new String[10];    //陣列變數
 
    private static final VarHandle plainVar;    //普通變數控制代碼
    private static final VarHandle staticVar;    //靜態變數控制代碼
    private static final VarHandle reflectVar;    //反射欄位控制代碼
    private static final VarHandle arrayVar;    //陣列控制代碼
 
    static {
        try {
            MethodHandles.Lookup lookup = MethodHandles.lookup();
            plainVar = lookup.findVarHandle(VarHandleTest.class, "plainStr", String.class);
            staticVar = lookup.findStaticVarHandle(VarHandleTest.class, "staticStr", String.class);
            reflectVar = lookup.unreflectVarHandle(VarHandleTest.class.getDeclaredField("reflectStr"));
            arrayVar = MethodHandles.arrayElementVarHandle(String[].class);
        } catch (ReflectiveOperationException e) {
            throw new ExceptionInInitializerError(e);
        }
    }
 
}

來分析一下上述建立VarHandle的程式碼:

1、通過MethodHandles類裡面的lookup()函式建立一個Lookup類,

這個Lookup類是MethodHandles的內部類,作用就是用於建立方法控制代碼和變數控制代碼的一個工廠類,原始碼如下:

@CallerSensitive
    @ForceInline // to ensure Reflection.getCallerClass optimization
    public static Lookup lookup() {
        return new Lookup(Reflection.getCallerClass());
    }

2、通過Lookup裡面的工廠方法生成不同型別的VarHandle,

拿生成普通變數的findVarHandle方法來看:

          /**
         * Produces a VarHandle giving access to a non-static field {@code name}
         * of type {@code type} declared in a class of type {@code recv}.
         * The VarHandle's variable type is {@code type} and it has one
         * coordinate type, {@code recv}.
         * <p>
         * Access checking is performed immediately on behalf of the lookup
         * class.
         * <p>
         * Certain access modes of the returned VarHandle are unsupported under
         * the following conditions:
         * <ul>
         * <li>if the field is declared {@code final}, then the write, atomic
         *     update, numeric atomic update, and bitwise atomic update access
         *     modes are unsupported.
         * <li>if the field type is anything other than {@code byte},
         *     {@code short}, {@code char}, {@code int}, {@code long},
         *     {@code float}, or {@code double} then numeric atomic update
         *     access modes are unsupported.
         * <li>if the field type is anything other than {@code boolean},
         *     {@code byte}, {@code short}, {@code char}, {@code int} or
         *     {@code long} then bitwise atomic update access modes are
         *     unsupported.
         * </ul>
         * <p>
         * If the field is declared {@code volatile} then the returned VarHandle
         * will override access to the field (effectively ignore the
         * {@code volatile} declaration) in accordance to its specified
         * access modes.
         * <p>
         * If the field type is {@code float} or {@code double} then numeric
         * and atomic update access modes compare values using their bitwise
         * representation (see {@link Float#floatToRawIntBits} and
         * {@link Double#doubleToRawLongBits}, respectively).
         * @apiNote
         * Bitwise comparison of {@code float} values or {@code double} values,
         * as performed by the numeric and atomic update access modes, differ
         * from the primitive {@code ==} operator and the {@link Float#equals}
         * and {@link Double#equals} methods, specifically with respect to
         * comparing NaN values or comparing {@code -0.0} with {@code +0.0}.
         * Care should be taken when performing a compare and set or a compare
         * and exchange operation with such values since the operation may
         * unexpectedly fail.
         * There are many possible NaN values that are considered to be
         * {@code NaN} in Java, although no IEEE 754 floating-point operation
         * provided by Java can distinguish between them.  Operation failure can
         * occur if the expected or witness value is a NaN value and it is
         * transformed (perhaps in a platform specific manner) into another NaN
         * value, and thus has a different bitwise representation (see
         * {@link Float#intBitsToFloat} or {@link Double#longBitsToDouble} for more
         * details).
         * The values {@code -0.0} and {@code +0.0} have different bitwise
         * representations but are considered equal when using the primitive
         * {@code ==} operator.  Operation failure can occur if, for example, a
         * numeric algorithm computes an expected value to be say {@code -0.0}
         * and previously computed the witness value to be say {@code +0.0}.
         * @param recv the receiver class, of type {@code R}, that declares the
         * non-static field
         * @param name the field's name
         * @param type the field's type, of type {@code T}
         * @return a VarHandle giving access to non-static fields.
         * @throws NoSuchFieldException if the field does not exist
         * @throws IllegalAccessException if access checking fails, or if the field is {@code static}
         * @exception SecurityException if a security manager is present and it
         *                              <a href="MethodHandles.Lookup.html#secmgr">refuses access</a>
         * @throws NullPointerException if any argument is null
         * @since 9
         */
        public VarHandle findVarHandle(Class<?> recv, String name, Class<?> type) throws NoSuchFieldException, IllegalAccessException {
            MemberName getField = resolveOrFail(REF_getField, recv, name, type);
            MemberName putField = resolveOrFail(REF_putField, recv, name, type);
            return getFieldVarHandle(REF_getField, REF_putField, recv, getField, putField);
        }

程式碼很簡單,就是根據傳入的引數來生成欄位的訪問物件MemberName,

MemberName是用來描述一個方法或欄位引用的資料結構,再根據MemberName生成控制代碼,熟悉JVM的同學看到REF_getField應該能夠聯想到JVM位元組碼,這個就是對應著訪問欄位的位元組碼。

對於findStaticVarHandle、unreflectVarHandle方法的實現其實也很類似,大致就是將REF_getField改為REF_getStatic的過程。

VarHandle的訪問

一開始我們就講過,VarHandle支援不同訪問模型下對於變數的訪問,

包括簡單的read/write訪問,volatile read/write訪問,以及CAS訪問等,

那麼分別來看一下VarHandle是如何支援這些訪問模型下對於變數的訪問的。

1、簡單的read/write訪問

  public  void plainUse(String[] args) {
        plainVar.set(this, "I am a plain string");    //例項變數的普通write操作
        staticVar.set("I am a static string");    //    靜態變數的普通write操作
        reflectVar.set(this, "I am a string created by reflection");    //反射欄位的普通write操作
        arrayVar.set(arrayStr, 0, "I am a string array element");    //陣列變數的普通write操作

        String plainString = (String) plainVar.get(this);    //例項變數的普通read操作
        String staticString = (String) staticVar.get();    //    靜態變數的普通read操作
        String reflectString = (String) staticVar.get(this);    //反射欄位的普通read操作
        String arrayStrElem = (String) arrayVar.get(arrayStr, 0);    //陣列變數的普通read操作, 第二個引數是指陣列下標,即第0個元素
    }

2、volatile read/write訪問

對於不同型別的變數的訪問方法跟上面其實大同小異,下面就以普通變數來舉例:

 
    public    void volatileUse(String[] args) {
        volatileVar.setVolatile(this, "I am volatile string");    //volatile write
        String volatileString = (String) volatileVar.getVolatile(this);    //volatile read
    }

3、CAS訪問

      public void casUse(String[] args) {
        String casString = (String) plainVar.get(this);    //先讀取當前值作為cas的預期值
        plainVar.compareAndSet(this, casString, "I am a new cas string");    //第二個引數為預期值,第三個引數為修改值
    }

VarHandle中的指令重排序影響

VarHandle中定義了多種不同的訪問模式,定義在VarHandle內部的列舉類裡面:

    enum AccessType {
        GET(Object.class),
        SET(void.class),
        COMPARE_AND_SET(boolean.class),
        COMPARE_AND_EXCHANGE(Object.class),
        GET_AND_UPDATE(Object.class);

        final Class<?> returnType;
        final boolean isMonomorphicInReturnType;

上面只是VarHandle中定義的訪問模式中的一小部分,實際上還有很多,


    /**
     * The set of access modes that specify how a variable, referenced by a
     * VarHandle, is accessed.
     */
    public enum AccessMode {
        /**
         * The access mode whose access is specified by the corresponding
         * method
         * {@link VarHandle#get VarHandle.get}
         */
        GET("get", AccessType.GET),
        /**
         * The access mode whose access is specified by the corresponding
         * method
         * {@link VarHandle#set VarHandle.set}
         */
        SET("set", AccessType.SET),
        /**
         * The access mode whose access is specified by the corresponding
         * method
         * {@link VarHandle#getVolatile VarHandle.getVolatile}
         */
        GET_VOLATILE("getVolatile", AccessType.GET),
        /**
         * The access mode whose access is specified by the corresponding
         * method
         * {@link VarHandle#setVolatile VarHandle.setVolatile}
         */
        SET_VOLATILE("setVolatile", AccessType.SET),
        /**
         * The access mode whose access is specified by the corresponding
         * method
         * {@link VarHandle#getAcquire VarHandle.getAcquire}
         */
      

不同的訪問模式對於指令重排的效果都可能不一樣,因此需要慎重選擇訪問模式。

其次,在建立VarHandle的過程中,VarHandle內部的訪問模式會覆蓋變數宣告時的任何指令排序效果。

比如說一個變數被宣告為volatile型別,但是呼叫varHandle.get()方法時,其訪問模式就是get模式,即簡單讀取方法,因此需要多加註意。

參考文獻

  1. 瘋狂創客圈 JAVA 高併發 總目錄

    ThreadLocal(史上最全)
    https://www.cnblogs.com/crazymakercircle/p/14491965.html

  2. 3000頁《尼恩 Java 面試寶典 》的 35個面試專題 :
    https://www.cnblogs.com/crazymakercircle/p/13917138.html

  3. 價值10W的架構師知識圖譜
    https://www.processon.com/view/link/60fb9421637689719d246739

4、尼恩 架構師哲學
https://www.processon.com/view/link/616f801963768961e9d9aec8

5、尼恩 3高架構知識宇宙
https://www.processon.com/view/link/635097d2e0b34d40be778ab4

Guava Cache主頁:https://github.com/google/guava/wiki/CachesExplained

Caffeine的官網:https://github.com/ben-manes/caffeine/wiki/Benchmarks

https://gitee.com/jd-platform-opensource/hotkey

https://developer.aliyun.com/article/788271?utm_content=m_1000291945

https://b.alipay.com/page/account-manage-oc/approval/setList

Caffeine: https://github.com/ben-manes/caffeine

這裡: https://albenw.github.io/posts/df42dc84/

Benchmarks: https://github.com/ben-manes/caffeine/wiki/Benchmarks

官方API說明文件: https://github.com/ben-manes/caffeine/wiki

這裡: https://github.com/ben-manes/caffeine/wiki/Guava

HashedWheelTimer時間輪原理分析: https://albenw.github.io/posts/ec8df8c/

TinyLFU論文: https://arxiv.org/abs/1512.00727

Design Of A Modern Cache: http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html

Design Of A Modern Cache—Part Deux: http://highscalability.com/blog/2019/2/25/design-of-a-modern-cachepart-deux.html

Caffeine的github: https://github.com/ben-manes/caffeine

https://github.com/axinSoochow/redis-caffeine-cache-starter

https://www.cnblogs.com/liang24/p/14210542.html

https://www.jianshu.com/p/62757d2a592c

https://blog.csdn.net/tongkongyu/article/details/124842847

https://www.163.com/dy/article/FSC51E7G0511FQO9.html

https://blog.csdn.net/Hellowenpan/article/details/121264731

https://www.jianshu.com/p/3c6161e5337b

https://blog.csdn.net/darin1997/article/details/89397862

https://blog.csdn.net/weixin_42297591/article/details/112658262)

https://www.cnblogs.com/liujinhua306/p/9808500.html

https://blog.csdn.net/varyall/article/details/81172725

http://www.cnblogs.com/zhaoxinshanwei/p/8519717.html

https://segmentfault.com/a/1190000016091569?utm_source=tag-newest

https://www.javadevjournal.com/spring-boot/spring-boot-with-caffeine-cache/

https://sunitc.dev/2020/08/27/springboot-implement-caffeine-cache/

https://github.com/ben-manes/caffeine/wiki/Population-zh-CN

https://www.cnblogs.com/Mufasa/p/15994714.html

https://www.cnblogs.com/liujinhua306/p/9808500.html