1. 程式人生 > >DirectByteBuffer和檔案IO詳解

DirectByteBuffer和檔案IO詳解

java.nio 包裡,是java用於處理IO的新的API,它使用channel、select等模型,重新對IO操作進行了新的實現。

DirectByteBuffer就是nio包下面的一個類。這個類用於儲存byte陣列,其特別之處在於:他將資料儲存在堆外記憶體。不想傳統的物件,物件都在堆中。這樣的好處就是對於 IO操作,減少了記憶體copy次數,從而增加效率。這裡以檔案IO進行講解

在這裡我們先把結論說一下:

a. 傳統的IO操作(就是使用java.io包的api)訪問磁碟檔案,資料需要copy的次數:

    1. 磁碟檔案的資料 copy 核心page cache 

    2. 核心的資料 copy  應用程式空間(即:jvm 堆外記憶體)

    3. jvm堆外記憶體  copy  jvm堆內 記憶體

為什麼2、和3 不合並,將核心資料 copy jvm堆內記憶體。 因為jvm進行系統呼叫進行讀檔案時候,此時發生gc,那麼堆記憶體的對應地址就會移動,所以直接copy到堆內是有問題的。

b. 使用DirectByteBuffer訪問磁碟檔案,資料需要copy的次數:

   1. 磁碟檔案的資料 copy 核心page cache 

    2. 核心的資料 copy  應用程式空間(即:DirectByteBuffer)

所以DirectByteBuffer減少了記憶體copy次數。

1.傳統檔案IO解析

檔案讀取示例:

FileInputStream input = new FileInputStream("/data");

byte[] b = new byte[SIZE];

input.read(b);

byte陣列示堆記憶體物件,此處將資料copy 到jvm堆記憶體。我們看一下read函式內部實現

public int read(byte b[]) throws IOException { 

    return readBytes(b, 0, b.length);

}

private native int readBytes(byte b[], int off, int len) throws IOException;

我們看到 read函式最終呼叫 native函式 readBytes。

jintreadBytes(JNIEnv *env, jobject this, jbyteArray bytes,          jint off, jint len, jfieldID fid){

    jint nread;

    char stackBuf[BUF_SIZE];

    char *buf = NULL;

    FD fd;

    if (IS_NULL(bytes)) {

        JNU_ThrowNullPointerException(env, NULL);

        return -1;

    }

    if (outOfBounds(env, off, len, bytes)) {

        JNU_ThrowByName(env, "java/lang/IndexOutOfBoundsException", NULL);

        return -1;

    }

    if (len == 0) {

        return 0;

    } else if (len > BUF_SIZE) {

        buf = malloc(len);

        if (buf == NULL) {

            JNU_ThrowOutOfMemoryError(env, NULL);

            return 0;

        }

    } else {

        buf = stackBuf;

    }

    fd = GET_FD(this, fid);

    if (fd == -1) {

        JNU_ThrowIOException(env, "Stream Closed");

        nread = -1;

    } else {

        nread = IO_Read(fd, buf, len);

        if (nread > 0) {

            (*env)->SetByteArrayRegion(env, bytes, off, nread, (jbyte *)buf);

        } else if (nread == -1) {

            JNU_ThrowIOExceptionWithLastError(env, "Read error");

        } else { /* EOF */

            nread = -1;

        }

    }

    if (buf != stackBuf) {

        free(buf);   

}

    return nread;

}

我們看到最終通過IO_Read將緩衝資料讀到buf中去,這個IO_Read其實是一個巨集定義:

#define IO_Read handleRead

handleRead函式實現如下,這裡你可以看到這裡進行了read系統呼叫:

ssize_t

handleRead(FD fd, void *buf, jint len)

{

    ssize_t result;

    RESTARTABLE(read(fd, buf, len), result);

    return result;

}

buf返回之後,由SetByteArrayRegion這個JNI函式拷貝到了bytes,它的具體實現如下(下面定義了一個通用的巨集函式來表示各種資料型別陣列區域的設定,可以將Result巨集替換成Byte即可理解):

JNI_ENTRY(void, \

jni_Set##Result##ArrayRegion(JNIEnv *env, ElementType##Array array, jsize start, \

             jsize len, const ElementType *buf)) \

  JNIWrapper("Set" XSTR(Result) "ArrayRegion"); \

  DTRACE_PROBE5(hotspot_jni, Set##Result##ArrayRegion__entry, env, array, start, len, buf);\

  DT_VOID_RETURN_MARK(Set##Result##ArrayRegion); \

  typeArrayOop dst = typeArrayOop(JNIHandles::resolve_non_null(array)); \

  if (start < 0 || len < 0 || ((unsigned int)start + (unsigned int)len > (unsigned int)dst->length())) { \

    THROW(vmSymbols::java_lang_ArrayIndexOutOfBoundsException()); \

  } else { \

    if (len > 0) { \

      int sc = TypeArrayKlass::cast(dst->klass())->log2_element_size(); \

      memcpy((u_char*) dst->Tag##_at_addr(start), \

             (u_char*) buf, \

             len << sc);    \

    } \

  } \

JNI_END

(以上內容部門來源:https://www.zhihu.com/question/65415926

由此可見,nativ方法,readBytes而採用了C Heap - JVM Heap進行記憶體拷貝的方式進行資料傳遞。

而readBytes 通過呼叫 handleRead 進行讀寫。handleRead就是讀取核心快取區資料。核心資料來原始檔。

2. DirectByteBuffer

DirectByteBuffer 是構建在堆外的記憶體的物件。

DirectByteBuffer是包級別可訪問的,通過 ByteBuffer.allocateDirect(int capacity) 進行構造。

public static ByteBuffer allocateDirect(int capacity) {

return new DirectByteBuffer(capacity);

}

我們看一下DirectByteBuffer 建構函式實現

DirectByteBuffer(int cap) {// package-private

    super(-1,0, cap, cap);

    boolean pa = VM.isDirectMemoryPageAligned();

    int ps = Bits.pageSize();

    long size = Math.max(1L, (long)cap + (pa ? ps :0));

    Bits.reserveMemory(size, cap);

    long base =0;

    try {

        base =unsafe.allocateMemory(size);

    }catch (OutOfMemoryError x) {

        Bits.unreserveMemory(size, cap);

        throw x;

    }

    unsafe.setMemory(base, size, (byte)0);

    if (pa && (base % ps !=0)) {

        // Round up to page boundary

        address = base + ps - (base & (ps -1));

    }else {

        address = base;

    }

    cleaner = Cleaner.create(this,new Deallocator(base, size, cap));   

    att =null;

}

這裡我們主要關注這幾個地方:

1.unsafe.allocateMemory(size);

利用 unsafe 類在堆外記憶體(C_HEAP)中分配了一塊空間,這是一個 native 函式,轉到進行堆外記憶體分配的 C/C++ 程式碼

inline char* AllocateHeap( size_t size, MEMFLAGS flags, address pc = 0, AllocFailType alloc_failmode = AllocFailStrategy::EXIT_OOM){

// ... 省略

char*p=(char*)os::malloc(size, flags, pc);

// 分配在 C_HEAP 上並返回指向記憶體區域的指標

// ... 省略

return p;

}

2.cleaner = Cleaner.create(this,new Deallocator(base, size, cap)); 

cleaner物件是對DirectByteBuffer佔用對堆外記憶體進行清理。DirectByteBuffer.cleaner().clean() 進行手動清理。我們看一下clean() 函式

public void clean() {

//....省略

this.thunk.run();

//....省略

}

其中 thunk就是我們 Cleaner.create(this,new Deallocator(base, size, cap)); 中的Deallocator。看一下Deallocator。

private static class Deallocator implements Runnable

{

//。。。省略

    public void run() {

        if (address ==0) {

            // Paranoia

            return;

           }

        unsafe.freeMemory(address);

        address =0;

        Bits.unreserveMemory(size,capacity);

    }

}

可以看到其是一個執行緒進行 堆外記憶體的釋放動作。

cleaner是PhantomReference的子類。

PhantomReference它其實主要是用來跟蹤物件何時被回收的,它不能影響gc決策,但是gc過程中如果發現某個物件除了只有PhantomReference引用它之外,並沒有其他的地方引用它了,那將會把這個引用放到java.lang.ref.Reference.pending佇列裡,在gc完畢的時候通知ReferenceHandler這個守護執行緒去執行一些後置處理。這個處理方法中,就會判斷是否是cleaner物件,如果是,就性質clean()函式。

因此DirectByteBuffer並不需要我們手動清理記憶體。當jvm進行gc(oldgc)的時候,就會清理沒有引用的 dirctByteBuffer。

當我們一直申請DirectByteBuffer。其實佔用的是堆外記憶體,堆內記憶體只是佔用一個引用。如果一直觸發不了gc,納悶堆外記憶體就不會回收,導致jvm程序佔用記憶體很大。我們可以通過-XX:MaxDirectMemorySize限制DirecByteBuffer佔用堆外記憶體的大小

3.Bits.reserveMemory(size, cap);

static void reserveMemory(long size,int cap) {

    synchronized (Bits.class) {

        if (!memoryLimitSet && VM.isBooted()) {

            maxMemory = VM.maxDirectMemory();

            memoryLimitSet =true;

        }

        // -XX:MaxDirectMemorySize limits the total capacity rather than the

        // actual memory usage, which will differ when buffers are page

        // aligned.

        if (cap <=maxMemory -totalCapacity) {

            reservedMemory += size;

            totalCapacity += cap;

            count++;

            return;

        }

    }

    System.gc();

    try {

        Thread.sleep(100);

    }catch (InterruptedException x) {

        // Restore interrupt status

        Thread.currentThread().interrupt();

    }

    synchronized (Bits.class) {

        if (totalCapacity + cap >maxMemory)

            throw new OutOfMemoryError("Direct buffer memory");

        reservedMemory += size;

        totalCapacity += cap;

        count++;

        }

}

該函式用於統計DirectByteBuffer佔用的大小。VM.maxDirectMemory()是jvm允許申請的最大DirectBuffer的大小(XX:MaxDirectMemorySize 通過這個引數設定)

如果發現當前申請的空間,大於限制的空間,就會觸發一次gc,上面說過gc會回收哪些之前不使用的directBuffer。然後再次申請。

VM.maxDirectMemory() 大小是如何設定的內,在VM類有這樣一段程式碼

public static void saveAndRemoveProperties(Properties var0) {

    //....

    String var1 = (String)var0.remove("sun.nio.MaxDirectMemorySize");

    if (var1 !=null) {

        if (var1.equals("-1")) {

            directMemory = Runtime.getRuntime().maxMemory();

        }else {

            long var2 = Long.parseLong(var1);

            if (var2 > -1L) {

            directMemory = var2;

            }

    }

    //...

}

"sun.nio.MaxDirectMemorySize"  這個屬性就是通過 -XX:MaxDirectMemorySize 這個引數設定的。如果我們不指定這個jvm引數,筆者在jdk8中測試了一下,預設是-1,這樣就導致directBufffer記憶體限制為程序最大記憶體。當然這也是一個潛在風險。

風險案例:

筆者曾在線上執行一個應用。該應用就是從訊息佇列中消費資料,然後將資料處理後存到Hbase中。但是應用執行每次執行2周左右,機器就會出現swap佔用過大。經過分析,是jvm程序佔用記憶體太大,但是分析jvm相關引數(堆、執行緒大小),並沒有設定的很大。最後發現原來是directBuffer佔用達到了10G。後面通過-XX:MaxDirectMemorySize=2048m 限制directbuffer使用量,解決了問題。每次directBuffer佔用達到2G,就會觸發一次fullgc,將之前的無用directbuffer回收掉。hbase一個坑,有時間筆者會整理這個案例。

3.DirectByteBuffer檔案IO

檔案讀取示例:

FileChannel filechannel=new RandomAccessFile("/data/appdatas/cat/mmm","rw").getChannel();

ByteBuffer byteBuffer = ByteBuffer.allocateDirect(SIZE);

filechannel.read(byteBuffer)

我們看一下read函式

public int read(ByteBuffer var1)throws IOException {

//。。。。

var3 = IOUtil.read(this.fd, var1, -1L,this.nd);

//。。。。

}

主要邏輯呼叫IOUtil.read。我們看一下這個函式

static int read(FileDescriptor var0, ByteBuffer var1,long var2, NativeDispatcher var4)throws IOException {

    if (var1.isReadOnly()) {

        throw new IllegalArgumentException("Read-only buffer");

    }else if (var1instanceof DirectBuffer) {

        return readIntoNativeBuffer(var0, var1, var2, var4);

    }else {

        ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining());

        int var7;

    try {

        int var6 = readIntoNativeBuffer(var0, var5, var2, var4);

        var5.flip();

        if (var6 >0) {

            var1.put(var5);

        }

        var7 = var6;

    }finally {

        Util.offerFirstTemporaryDirectBuffer(var5);

    }

    return var7;

    }

}

主要方法就是通過 readIntoNativeBuffer 這個函式將資料讀入 directBuffer中,其中readIntoNativeBuffer也是呼叫一個native方法。

通過上面的程式碼,我們會看到,如果fielchannel.read(ByteBuffer) 也可以傳入一個HeapByteBuffer,這個類是堆中。如果是這個類,那麼內部讀取的時候,會把資料先讀到DirectByteBuffer中,然後在copy到HeapByteBuffer中。Util.getTemporaryDirectBuffer(var1.remaining());就是獲取一個DirectBuffer對像。因為DirectBuffer建立的時候,開銷比較大,所以使用的時候一般會用一個池子來管理。有興趣可以看一下Util這個