Guava Cache 實現與源碼分析
阿新 • • 發佈:2018-07-07
def ews obj load existing ray cat 核心 poll()
目錄
- Guava Cache
- 一、概述
- 1、內存緩存
- 2、核心數據結構
- 二、具體實現
- 0、一覽眾山小
- 1、CacheBuilder 構建器
- 2、LocalCache
- 一、概述
Guava Cache
一、概述
1、內存緩存
可看作一個jdk7的concurrentHashMap,核心功能get,put
但是比一般的map多了一些功能,如:
- ??過限失效(根據不同的維度失效,讀後N秒,寫後N秒,最大size,最大weight)
- 自動刷新
- 支持軟引用和弱引用
- 監聽刪除
2、核心數據結構
和jdk7的HashMap相似
有N個Segment,每個Segment下是一個HashTable,每個HashTable裏是一個鏈表
Guava的鎖是一個比較重的操作,鎖住的是整個Segment(Segment繼承的是ReetrentLock,驚)
二、具體實現
0、一覽眾山小
主要的類:
CacheBuilder 設置參數,構建LoadingCache
LocalCache 是核心實現,雖然builder構建的是LocalLoadingCache(帶refresh功能)和LocalManualCache(不帶refresh功能),但其實那兩個只是個殼子
1、CacheBuilder 構建器
提要:
記錄所需參數
public final class CacheBuilder<K, V> { public <K1 extends K, V1 extends V> LoadingCache<K1, V1> build( CacheLoader<? super K1, V1> loader) { // loader是用來自動刷新的 checkWeightWithWeigher(); return new LocalCache.LocalLoadingCache<>(this, loader); } public <K1 extends K, V1 extends V> Cache<K1, V1> build() { // 這個沒有loader,就不會自動刷新 checkWeightWithWeigher(); checkNonLoadingCache(); return new LocalCache.LocalManualCache<>(this); } int initialCapacity = UNSET_INT; // 初始map大小 int concurrencyLevel = UNSET_INT; // 並發度 long maximumSize = UNSET_INT; long maximumWeight = UNSET_INT; Weigher<? super K, ? super V> weigher; Strength keyStrength; // key強、弱、軟引,默認為強 Strength valueStrength; // value強、弱、軟引,默認為強 long expireAfterWriteNanos = UNSET_INT; // 寫過期 long expireAfterAccessNanos = UNSET_INT; // long refreshNanos = UNSET_INT; // Equivalence<Object> keyEquivalence; // 強引時為equals,否則為== Equivalence<Object> valueEquivalence; // 強引時為equals,否則為== RemovalListener<? super K, ? super V> removalListener; // 刪除時的監聽 Ticker ticker; // 時間鐘,用來獲得當前時間的 Supplier<? extends StatsCounter> statsCounterSupplier = NULL_STATS_COUNTER; // 計數器,用來記錄get或者miss之類的數據 }
2、LocalCache
1)初始化
提要:
a)賦值
b)初始化Segment[]數組
LocalCache( CacheBuilder<? super K, ? super V> builder, @Nullable CacheLoader<? super K, V> loader) { // a)把builder的參數賦值過來,略 // b)構建Segment[]數組,原理可參照jdk7點concurrentHashMap int segmentShift = 0; int segmentCount = 1; // 設置為剛剛好比concurrencyLevel大的2的冪次方的值 while (segmentCount < concurrencyLevel && (!evictsBySize() || segmentCount * 20 <= maxWeight)) { ++segmentShift; segmentCount <<= 1; } this.segmentShift = 32 - segmentShift; segmentMask = segmentCount - 1; this.segments = newSegmentArray(segmentCount); int segmentCapacity = initialCapacity / segmentCount; //每個Segment的容量 int segmentSize = 1; // 剛剛好比容量大的2等冪次方的值 while (segmentSize < segmentCapacity) { segmentSize <<= 1; } if (evictsBySize()) { // Ensure sum of segment max weights = overall max weights long maxSegmentWeight = maxWeight / segmentCount + 1; long remainder = maxWeight % segmentCount; for (int i = 0; i < this.segments.length; ++i) { if (i == remainder) { maxSegmentWeight--; } this.segments[i] = createSegment(segmentSize, maxSegmentWeight, builder.getStatsCounterSupplier().get()); } } else { for (int i = 0; i < this.segments.length; ++i) { this.segments[i] = createSegment(segmentSize, UNSET_INT, builder.getStatsCounterSupplier().get()); // 往Segment數組裏塞 } } } Segment( LocalCache<K, V> map, int initialCapacity, long maxSegmentWeight, StatsCounter statsCounter) { this.map = map; this.maxSegmentWeight = maxSegmentWeight; this.statsCounter = checkNotNull(statsCounter); initTable(newEntryArray(initialCapacity)); // 當key是弱、軟引用時,初始化keyReferenceQueue;其父類特性決定其gc時,會將被GC的元素放入該隊列中 keyReferenceQueue = map.usesKeyReferences() ? new ReferenceQueue<K>() : null; valueReferenceQueue = map.usesValueReferences() ? new ReferenceQueue<V>() : null; recencyQueue = map.usesAccessQueue() ? new ConcurrentLinkedQueue<ReferenceEntry<K, V>>() : LocalCache.<ReferenceEntry<K, V>>discardingQueue(); writeQueue = map.usesWriteQueue() ? new WriteQueue<K, V>() : LocalCache.<ReferenceEntry<K, V>>discardingQueue(); accessQueue = map.usesAccessQueue() ? new AccessQueue<K, V>() : LocalCache.<ReferenceEntry<K, V>>discardingQueue(); }
2)put
提要
a)找到key所在的segment,調用segment.put方法
b)鎖住segment,清理
i)如果key存在
ii)如果key不存在
c)清理
class LocalCache {
public V put(K key, V value) {
checkNotNull(key);
checkNotNull(value);
int hash = hash(key); // 計算hash
return segmentFor(hash).put(key, hash, value, false); // 找到hash所分配到的的Segment,put進去
}
}
// 轉而來看Segment的put方法
class Segment<K,V> implements ReentrantLock {
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock(); // 鎖住一個segment
try {
long now = map.ticker.read(); //獲得當前時間
preWriteCleanup(now); //清除軟/弱引用 詳見 2.4
int newCount = this.count + 1;
if (newCount > this.threshold) { // 如有需要則擴容
expand();
newCount = this.count + 1;
}
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
// Look for an existing entry.
// 根據不同情況決定是否要執行操作,1)count++ 更新數量 2)enqueueNotification 入隊通知 3)setValue 更新值 4)evictEntries 淘汰緩存
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
// 如果該key已經存在
if (e.getHash() == hash
&& entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
// We found an existing entry.
ValueReference<K, V> valueReference = e.getValueReference();
V entryValue = valueReference.get();
if (entryValue == null) {
++modCount;
if (valueReference.isActive()) {
enqueueNotification(
key, hash, entryValue, valueReference.getWeight(), RemovalCause.COLLECTED);
setValue(e, key, value, now);
newCount = this.count; // count remains unchanged
} else {
setValue(e, key, value, now);
newCount = this.count + 1;
}
this.count = newCount; // write-volatile
evictEntries(e);
return null;
} else if (onlyIfAbsent) {
recordLockedRead(e, now);
return entryValue;
} else {
// clobber existing entry, count remains unchanged
++modCount;
enqueueNotification(
key, hash, entryValue, valueReference.getWeight(), RemovalCause.REPLACED);
setValue(e, key, value, now);
evictEntries(e);
return entryValue;
}
}
}
// 如果該key不存在,則新建一個entry.
++modCount;
ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);
setValue(newEntry, key, value, now);
table.set(index, newEntry);
newCount = this.count + 1;
this.count = newCount; // write-volatile
evictEntries(newEntry);
return null;
} finally {
unlock();
postWriteCleanup();
}
}
@GuardedBy("this")
ReferenceEntry<K, V> newEntry(K key, int hash, @Nullable ReferenceEntry<K, V> next) {
return map.entryFactory.newEntry(this, checkNotNull(key), hash, next);
}
}
利用map.entryFactory創建Entry。其中entryFactory的初始化是下述得到的
EntryFactory entryFactory = EntryFactory.getFactory(keyStrength, usesAccessEntries(), usesWriteEntries());
EntryFactory是個枚舉類,枚舉類還可以這麽用,漲知識了!
enum EntryFactory {
STRONG {
@Override
<K, V> ReferenceEntry<K, V> newEntry(
Segment<K, V> segment, K key, int hash, @Nullable ReferenceEntry<K, V> next) {
return new StrongEntry<>(key, hash, next);
}
},...,// 省略部分
WEAK { // 軟/弱引用的精髓!!!
@Override
<K, V> ReferenceEntry<K, V> newEntry(
Segment<K, V> segment, K key, int hash, @Nullable ReferenceEntry<K, V> next) { // 所以!!!就是在這裏!!!把這個queue放進去了,終於找到了
return new WeakEntry<>(segment.keyReferenceQueue, key, hash, next);
}
}};
// Masks used to compute indices in the following table.
static final int ACCESS_MASK = 1;
static final int WRITE_MASK = 2;
static final int WEAK_MASK = 4;
/** Look-up table for factories. */
static final EntryFactory[] factories = {
STRONG,
STRONG_ACCESS,
STRONG_WRITE,
STRONG_ACCESS_WRITE,
WEAK,
WEAK_ACCESS,
WEAK_WRITE,
WEAK_ACCESS_WRITE,
};
static EntryFactory getFactory(
Strength keyStrength, boolean usesAccessQueue, boolean usesWriteQueue) {
int flags =
((keyStrength == Strength.WEAK) ? WEAK_MASK : 0)
| (usesAccessQueue ? ACCESS_MASK : 0)
| (usesWriteQueue ? WRITE_MASK : 0);
return factories[flags];
}
// 抽象方法:創建一個entry
abstract <K, V> ReferenceEntry<K, V> newEntry(
Segment<K, V> segment, K key, int hash, @Nullable ReferenceEntry<K, V> next);
}
static class WeakEntry<K, V> extends WeakReference<K> implements ReferenceEntry<K, V> {
WeakEntry(ReferenceQueue<K> queue, K key, int hash, @Nullable ReferenceEntry<K, V> next) {
super(key, queue); // 抽絲剝繭,這個是Reference的方法,所以放到這個queue裏面去,是Java WeakReference類自帶的功能
this.hash = hash;
this.next = next;
}
}
3)get
提要
a)找到key所在的segment,調用segment.get方法
b)得到ReferenceEntry,若存在,檢查value是否過期,返回結果
c)清理
class LocalCache{
public @Nullable V get(@Nullable Object key) {
if (key == null) {
return null;
}
int hash = hash(key);
return segmentFor(hash).get(key, hash);
}
}
class Segment{
V get(Object key, int hash) {
try {
if (count != 0) { // read-volatile
long now = map.ticker.read();
ReferenceEntry<K, V> e = getLiveEntry(key, hash, now); //如果發現沒有找到或者過期了,則返回為null
if (e == null) {
return null;
}
V value = e.getValueReference().get();
if (value != null) {
recordRead(e, now);
return scheduleRefresh(e, e.getKey(), hash, value, now, map.defaultLoader);// 如果有loader且在刷新時間段中則刷新,否則跳過
}
tryDrainReferenceQueues(); // 這個幽靈一般的操作,難受
}
return null;
} finally {
postReadCleanup();
}
}
}
4)清理軟/弱引用
每次put、get前後都會進行清理檢查
@GuardedBy("this")
void preWriteCleanup(long now) { // 寫前調用,其他方法類似,只是起了個不同的名字
runLockedCleanup(now);
}
void runLockedCleanup(long now) { // 加鎖+執行方法
if (tryLock()) {
try {
drainReferenceQueues();
expireEntries(now); // calls drainRecencyQueue
readCount.set(0);
} finally {
unlock();
}
}
}
@GuardedBy("this")
void drainReferenceQueues() { // 清空軟/弱引用key和value
if (map.usesKeyReferences()) {
drainKeyReferenceQueue();
}
if (map.usesValueReferences()) {
drainValueReferenceQueue();
}
}
@GuardedBy("this")
void drainKeyReferenceQueue() { // 清空軟/弱引用key
Reference<? extends K> ref;
int i = 0;
while ((ref = keyReferenceQueue.poll()) != null) {
@SuppressWarnings("unchecked")
ReferenceEntry<K, V> entry = (ReferenceEntry<K, V>) ref;
map.reclaimKey(entry);
if (++i == DRAIN_MAX) {
break;
}
}
}
}
// 之前一直沒想明白的地方就是,這個keyReferenceQueue到底是什麽時候被塞進去元素的???
// 需要看下創建entry的時候的操作!!!抽絲剝繭就能知道了
public class ReentrantLock implements Lock, java.io.Serializable {
private Sync sync;
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
abstract void lock();
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread(); // 獲取當前線程
int c = getState();
if (c == 0) { // 無線程持有,即無鎖狀態
if (compareAndSetState(0, acquires)) { // 設置持有線程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 如果持有者就是當前線程,perfect
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
}
Guava Cache 實現與源碼分析