Netty學習之旅----ByteBuf原始碼解讀之初探UnpooledHeapByteBuf、UnpooledDirectByteBuf
阿新 • • 發佈:2019-01-06
前沿:
在讀原始碼的過程中我發現有如下幾個點,感覺有問題:
1)UnpooledDirectByteBuf的 capacity(int newCapacity)方法,在readerIndex大於或等於newCapacity時,此時不需要將原ByteBuffer中的資料寫入到新的ByteBuffer中,這可以理解,但為什麼要呼叫setIndex(newCapacity,newCapacity)將readerIndex,writerIndex設定為capacity呢?設定之後,該ByteBuf不能讀也不能寫。
2)setByteBuffer 方法,在第一次oldBuffer不為空的時候,此時doNotFree為true(在構造方法中設定),為什麼第一次oldBuffer不為空的時候,不釋放掉該部分記憶體,不明白。
2.1、AbstractByteBuf原始碼分析
AbstractByteBuf定義ByteBuf的基本屬性,諸如 readerIndex,writerIndex,markedReaderIndex,markedWriterIndex,maxCapacity,我們知道ByteBuf的容量是可以自動擴容的。
AbstractByteBuf的這兩個屬性,應該引起我們的注意:
一個是SwappedByteBuf swappedBuf; 這個是大端序列與小端序列的轉換。
二個是ResourceLeakDetector<ByteBuf> leakDetector = new ResourceLeakDetector<ByteBuf>(ByteBuf.class);
//這個又是一個很關鍵點,下篇文章將重點分析,Netty用來解決記憶體洩漏檢測機制。非常重要。
這裡擷取一下SwappedByteBuf的原始碼,採用了典型的裝飾模式來設計。SwappedByteBuf採用
public class SwappedByteBuf extends ByteBuf {
private final ByteBuf buf;
private final ByteOrder order;
public SwappedByteBuf(ByteBuf buf) {
if (buf == null) {
throw new NullPointerException("buf");
}
this.buf = buf;
if (buf.order() == ByteOrder.BIG_ENDIAN) {
order = ByteOrder.LITTLE_ENDIAN;
} else {
order = ByteOrder.BIG_ENDIAN;
}
}
@Override
public ByteOrder order() {
return order;
}
@Override
public ByteBuf order(ByteOrder endianness) {
if (endianness == null) {
throw new NullPointerException("endianness");
}
if (endianness == order) {
return this;
}
return buf;
}
}
關於其他AbstractByteBuf,該類設計使用了典型的模板模式,對ByteBuf提供的類,實現時,提供一種模板,然後再提供一個鉤子方法,供子類實現,比如_getLong方法,_setLong等方法,由於該類的實現原理不復雜,就不做進一步的原始碼解讀。
2.2 AbstractReferenceCountedByteBuf原始碼實現,該類主要是實現引用計算的常規方法,充分利用voliate記憶體可見性與CAS操作完成refCnt變數的維護。
其原始碼實現如下:
目前,先關注UnpooledByteBufAllocator,物件池的ByteBuf在後續章節中重點關注。 結合原始碼,有如下兩個方法引起了我的注意: 1、容量擴容規則(容量增長規則)calculateNewCapacity方法 2、直接記憶體的分配。newDirectBuffer方法 2.3.2.1 calculateNewCapacity
1、首先,我們先看一下ByteBuf的類設計圖,從中更進一步瞭解ByteBuf。
package io.netty.buffer;
import io.netty.util.IllegalReferenceCountException;
import io.netty.util.internal.PlatformDependent;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
/**
* Abstract base class for {@link ByteBuf} implementations that count references.
*/
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater;
static {
AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater =
PlatformDependent.newAtomicIntegerFieldUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
if (updater == null) {
updater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
}
refCntUpdater = updater;
}
private volatile int refCnt = 1;
protected AbstractReferenceCountedByteBuf(int maxCapacity) {
super(maxCapacity);
}
@Override
public final int refCnt() {
return refCnt;
}
/**
* An unsafe operation intended for use by a subclass that sets the reference count of the buffer directly
*/
protected final void setRefCnt(int refCnt) {
this.refCnt = refCnt;
}
@Override
public ByteBuf retain() {
for (;;) {
int refCnt = this.refCnt;
if (refCnt == 0) {
throw new IllegalReferenceCountException(0, 1);
}
if (refCnt == Integer.MAX_VALUE) {
throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) {
break;
}
}
return this;
}
@Override
public ByteBuf retain(int increment) {
if (increment <= 0) {
throw new IllegalArgumentException("increment: " + increment + " (expected: > 0)");
}
for (;;) {
int refCnt = this.refCnt;
if (refCnt == 0) {
throw new IllegalReferenceCountException(0, increment);
}
if (refCnt > Integer.MAX_VALUE - increment) {
throw new IllegalReferenceCountException(refCnt, increment);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt + increment)) {
break;
}
}
return this;
}
@Override
public ByteBuf touch() {
return this;
}
@Override
public ByteBuf touch(Object hint) {
return this;
}
@Override
public final boolean release() {
for (;;) {
int refCnt = this.refCnt;
if (refCnt == 0) {
throw new IllegalReferenceCountException(0, -1);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
if (refCnt == 1) {
deallocate();
return true;
}
return false;
}
}
}
@Override
public final boolean release(int decrement) {
if (decrement <= 0) {
throw new IllegalArgumentException("decrement: " + decrement + " (expected: > 0)");
}
for (;;) {
int refCnt = this.refCnt;
if (refCnt < decrement) {
throw new IllegalReferenceCountException(refCnt, -decrement);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
if (refCnt == decrement) {
deallocate();
return true;
}
return false;
}
}
}
/**
* Called once {@link #refCnt()} is equals 0.
*/
protected abstract void deallocate();
}
該類,我們只需要瞭解,當一個ByteBuf被引用的次數為0時,dealocate()方法將被呼叫,該方法就是具體回收ByteBuf的操作,由具體的子類去實現。
2.3 UnpooledHeapByteBuf與UnpooledDirectByteBuf原始碼分析:
首先該類的內部結構如下:
public ByteBuf capacity(int newCapacity) {
ensureAccessible(); // @1
if (newCapacity < 0 || newCapacity > maxCapacity()) {
throw new IllegalArgumentException("newCapacity: " + newCapacity);
}
int readerIndex = readerIndex();
int writerIndex = writerIndex();
int oldCapacity = capacity;
if (newCapacity > oldCapacity) { // @2
ByteBuffer oldBuffer = buffer;
ByteBuffer newBuffer = allocateDirect(newCapacity); //@21
oldBuffer.position(0).limit(oldBuffer.capacity()); //@22
newBuffer.position(0).limit(oldBuffer.capacity()); //@23
newBuffer.put(oldBuffer); //@24
newBuffer.clear(); //@25
setByteBuffer(newBuffer); //@26
} else if (newCapacity < oldCapacity) { //@3
ByteBuffer oldBuffer = buffer;
ByteBuffer newBuffer = allocateDirect(newCapacity);
if (readerIndex < newCapacity) {
if (writerIndex > newCapacity) {
writerIndex(writerIndex = newCapacity);
}
oldBuffer.position(readerIndex).limit(writerIndex);
newBuffer.position(readerIndex).limit(writerIndex);
newBuffer.put(oldBuffer);
newBuffer.clear();
} else {
setIndex(newCapacity, newCapacity);
}
setByteBuffer(newBuffer);
}
return this;
}
程式碼@1,檢測一下訪問性,可達性,就是引用數必須大於0,否則該ByteBuf的內部空間已經被回收了(堆外記憶體)
程式碼@2,擴容操作,思路新建一個快取區,然後將原先快取區的資料全部寫入到新的快取區,然後釋放舊的快取區。
程式碼@21、22,申請一個直接快取區,然後將原緩衝區的postion設定為0,將limit設定為capacity,處於釋放狀態(從快取區讀)。
程式碼@23,將新快取區的postion,limit屬性設定為0,老快取區limit。
程式碼@24,將原緩衝區寫入到新的快取區,然後將快取區置的position設定為0,limt設定為capacity,其實這裡設定position,capacity的意義不大,因為ByteBuf並不會利用內部的ByteBuffer的limit,postion屬性,而是使用readerIndex,wriateIndex。
程式碼@26,關聯新的ByteBuffer,並釋放原快取區的空間。
程式碼@3,壓縮快取區。實現思路是新建一個快取區,如果readerIndex大於新建的ByteBuffer的capacity,則無需將舊的快取區內容寫入到新的快取區中。如果readerIndex小於新capacity,那需要將readerIndex
至( Math.min(writerIndex, newCapacity) )直接的內容寫入到新的快取,然後釋放舊的快取區。值得注意一點是,如果readerIndex > newCapcity,該ByteBuf的 readerIndex,writerIndex將會被設定為容量值,意味著如果不對readerIndex設定為0,或呼叫discardReadBytes,該快取區是不可以使用的,所以,我不明白這裡為什麼要這樣做,是為了安全?。
我們在重點關注一下setByteBuffer(newBuffer)方法,該方法還負責銷燬原先的ByteBuffer。
private void setByteBuffer(ByteBuffer buffer) {
ByteBuffer oldBuffer = this.buffer;
if (oldBuffer != null) {
if (doNotFree) {
doNotFree = false;
} else {
freeDirect(oldBuffer);
}
}
this.buffer = buffer;
tmpNioBuf = null;
capacity = buffer.remaining();
}
釋放原先的記憶體。
2.3.2 記憶體的分配。
Netty在為記憶體的分配,單獨封裝,相關類圖:
目前,先關注UnpooledByteBufAllocator,物件池的ByteBuf在後續章節中重點關注。 結合原始碼,有如下兩個方法引起了我的注意: 1、容量擴容規則(容量增長規則)calculateNewCapacity方法 2、直接記憶體的分配。newDirectBuffer方法 2.3.2.1 calculateNewCapacity
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
if (minNewCapacity < 0) {
throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expectd: 0+)");
}
if (minNewCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
minNewCapacity, maxCapacity));
}
final int threshold = 1048576 * 4; // 4 MiB page
if (minNewCapacity == threshold) {
return threshold;
}
// If over threshold, do not double but just increase by threshold.
if (minNewCapacity > threshold) { //@1
int newCapacity = minNewCapacity / threshold * threshold;
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
newCapacity += threshold;
}
return newCapacity;
}
// Not over threshold. Double up to 4 MiB, starting from 64.
int newCapacity = 64;c // @2
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}
return Math.min(newCapacity, maxCapacity);
}
引數:minNewCapacity,本次需要申請的最小記憶體
引數:macCapacity,最大總記憶體申請值。
程式碼@1,如果最小需要的記憶體超過設定的 threshold(闊值的話),則迴圈,每次增加threshold,然後看是否達到本次申請目標。
程式碼@2,如果需要申請的記憶體小於闊值,則以64個位元組以2的冪增長。
這裡提現了記憶體擴容時的一個優化點。
2.3.2.2 newDirectBuffer方法
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
ByteBuf buf;
if (PlatformDependent.hasUnsafe()) {
buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
該方法中,除了見到申請一個直接記憶體外,還將該buf 變成一個可感知的物件 toLeakAwareBuffer方法,用於該物件被引用的情況,因為UnpooledDirectByteBuf是一個聚合物件,內部維護了一個java.nio.ByteBuffer的直接對外記憶體空間,在什麼是釋放UnpooledDirectByteBuf中的堆外記憶體呢?在UnpooledDirectByteBuf被java垃圾回收的時候,應該於此同時需要釋放指向的堆外記憶體,但堆外記憶體不受JVM
GC的管理,所以我們只有感知到UnpooledDirectByteBuf被JVM虛擬機器回收後,手動去釋放堆外記憶體,大家想想都知道,我們可以通過JAVA提供的引用機制,來實現跟蹤垃圾回收器的收集工作,虛引用的作用來了,下一篇,我將會以這個為入口點,重點分析Netty堆外記憶體如何管理,也就是記憶體洩露檢測等方面的課題。