1. 程式人生 > >【原創】從原始碼剖析IO流(四)管道流--轉載請註明出處

【原創】從原始碼剖析IO流(四)管道流--轉載請註明出處

一、管道流的特點與作用:

PipedInputStream與PipedOutputStream分別為管道輸入流和管道輸出流。管道輸入流通過連線到管道輸出流實現了類似管道的功能,用於執行緒之間的通訊。在使用時,通常由某個執行緒向管道輸出流中寫入資料。根據管道的特性,這些資料會自動傳送到與管道輸出流對應的管道輸入流中。這時其他執行緒就可以從管道輸入流中讀取資料,這樣就實現了執行緒之間的通訊。

管道流與其他流一樣,分為讀入和寫出流,但是管道流是用於從管道內讀取內容和將內容寫入到管道中。並且,在進行管道流物件的建立時,如果構建輸入流物件則需要在輸入流中傳入輸出流物件,構建輸出流物件則需要在構造器中傳入輸入流物件。每次寫入管道時寫入的資料,都將會被儲存在作為快取區進行使用的位元組陣列中。由輸出流寫入管道,再由輸入流從管道中進行讀取。

二、輸出流PipOutPutStream:

首先來看一下輸出流的構造器:

​
    public PipedOutputStream(PipedInputStream snk)  throws IOException {
        connect(snk);
    }

    public synchronized void connect(PipedInputStream snk) throws IOException {
        if (snk == null) {
            throw new NullPointerException();
        } else if (sink != null || snk.connected) {
            throw new IOException("Already connected");
        }
        sink = snk;
        snk.in = -1;
        snk.out = 0;
        snk.connected = true;
    }

​

在進行輸出流的構造時,需要需要傳入一個輸入流的物件,然後將輸入流物件的連線狀態設定為連線,並對其中的引數進行初始化。

之後,再來看write方法:

    public void write(byte b[], int off, int len) throws IOException {
        if (sink == null) {
            throw new IOException("Pipe not connected");
        } else if (b == null) {
            throw new NullPointerException();
        } else if ((off < 0) || (off > b.length) || (len < 0) ||
                   ((off + len) > b.length) || ((off + len) < 0)) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return;
        }
        sink.receive(b, off, len);
    }

write方法中,在經過了一系列的引數校驗以後,呼叫了自身持有的輸入流成員變數 sink 的receive方法。這個方法即是管道流的主要核心方法。

三、輸入流 PipInPutStream:

剛才說到的是receive方法,這個方法中的邏輯看起來比較複雜,在看這段程式碼之前,需要先看一下這個類的成員變數有哪些:

欄位 說明
boolean closedByWriter = false; 管道輸出流是否關閉
volatile boolean closedByReader = false; 管道輸入流是否關閉
boolean connected = false; 管道輸入流是否被連線
Thread readSide; 從管道中讀取資料的執行緒
Thread writeSide; 向管道中寫入資料的執行緒
private static final int DEFAULT_PIPE_SIZE = 1024; 管道迴圈輸入緩衝區的預設大小。
protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE; 管道迴圈輸入緩衝區的預設大小。
protected byte buffer[]; 放置資料的迴圈緩衝區。
protected int in = -1; 緩衝區的位置,當從連線的管道輸出流中接收到下一個資料位元組時,會將其儲存到該位置。
protected int out = 0; 緩衝區的位置,此管道輸入流將從該位置讀取下一個資料位元組。

就可以瞭解到這段程式碼的功能。這段程式碼主要用於將一個位元組陣列 b 儲存到快取區 buffer 陣列中。如果buffer快取區已經滿了,但是沒有被讀取,則會等待讀取完畢後再進行繼續儲存。此處唯一要注意的點,為快取區陣列中的內容是不會進行清空的,只會進行迴圈使用。

synchronized void receive(byte b[], int off, int len) throws IOException {
    //檢查PipedInputStream狀態,如果不正常,則丟擲異常。
    checkStateForReceive();
    //獲取將資料寫入管道的執行緒
    writeSide = Thread.currentThread();
    //需要接收的資料量,初始值為len
    int bytesToTransfer = len;
    //如果需要接收的資料>0,即還有需要接收的資料
    while (bytesToTransfer > 0) {
        //如果寫入管道的資料已經被讀完,則等待
        if (in == out)
            awaitSpace();
        //下次要傳輸的位元組數,初始值為0
        int nextTransferAmount = 0;
        //如果緩衝區未滿(滿後,in會重置為0)
        if (out < in) {
            //計算下次要傳輸的位元組數。值為緩衝區還剩餘的空間(即in之前的位置都滿了,只有in到末尾的位置可用)
            nextTransferAmount = buffer.length - in;
        } else if (in < out) {//?
            if (in == -1) {//?
                in = out = 0;
                nextTransferAmount = buffer.length - in;
            } else {//如果緩衝區已滿(此時out之後和in之前的位置已滿,只有in到out之間的位置可用)
                nextTransferAmount = out - in;
            }
        }
        //如果下次傳輸的位元組數大於需要傳輸的位元組數,這時只需要bytesToTransfer大小的空間就可以了
        if (nextTransferAmount > bytesToTransfer)
            nextTransferAmount = bytesToTransfer;
        assert(nextTransferAmount > 0);
        //將nextTransferAmount個位元組從b中複製到緩衝區陣列中
        System.arraycopy(b, off, buffer, in, nextTransferAmount);
        //重新計算需要傳輸的位元組數,已經傳輸了nextTransferAmount位元組,所以減去nextTransferAmount
        bytesToTransfer -= nextTransferAmount;
        //b中已經複製的部分以後不能再複製了,所以偏移量off後移nextTransferAmount個位置,防止重複複製
        off += nextTransferAmount;
        //將in後移nextTransferAmount,防止之前複製的資料被覆蓋
        in += nextTransferAmount;
        //如果in已經超出緩衝區大小,將in置0,從頭開始寫
        if (in >= buffer.length) {
            in = 0;
        }
    }
}

之後是read(byte[] b, int off, int len)方法,該方法中,會從快取區陣列中讀取內容,如果已經讀取出了所有的內容,則會將in標記重新重置為-1。

    public synchronized int read(byte b[], int off, int len)  throws IOException {
        if (b == null) {
            throw new NullPointerException();
        } else if (off < 0 || len < 0 || len > b.length - off) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return 0;
        }

        /* possibly wait on the first character */
        int c = read();
        if (c < 0) {
            return -1;
        }
        b[off] = (byte) c;
        int rlen = 1;
        while ((in >= 0) && (len > 1)) {

            int available;

            if (in > out) {
                available = Math.min((buffer.length - out), (in - out));
            } else {
                available = buffer.length - out;
            }

            // A byte is read beforehand outside the loop
            if (available > (len - 1)) {
                available = len - 1;
            }
            System.arraycopy(buffer, out, b, off + rlen, available);
            out += available;
            rlen += available;
            len -= available;

            if (out >= buffer.length) {
                out = 0;
            }
            if (in == out) {
                /* now empty */
                in = -1;
            }
        }
        return rlen;
    }
    public synchronized int read()  throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
            throw new IOException("Pipe closed");
        } else if (writeSide != null && !writeSide.isAlive()
                   && !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");
        }

        readSide = Thread.currentThread();
        int trials = 2;
        while (in < 0) {
            if (closedByWriter) {
                /* closed by writer, return EOF */
                return -1;
            }
            if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
                throw new IOException("Pipe broken");
            }
            /* might be a writer waiting */
            notifyAll();
            try {
                wait(1000);
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();
            }
        }
        int ret = buffer[out++] & 0xFF;
        if (out >= buffer.length) {
            out = 0;
        }
        if (in == out) {
            /* now empty */
            in = -1;
        }

        return ret;
    }

四、總結:

管道流的輸入與輸出必須存在於兩個不同的執行緒中進行使用,在進行管道的輸入流輸入時,如果快取區已經寫滿,卻一直沒有被讀取,則會造成當前執行緒的死鎖。如果在進行從快取區讀取的時候,如果先執行了讀取操作,進行讀取操作的執行緒會認為目前還沒有進行寫入,會在當前執行緒進行等待寫入管道流,同樣會造成死鎖的情況發生。