【原創】從原始碼剖析IO流(四)管道流--轉載請註明出處
一、管道流的特點與作用:
PipedInputStream與PipedOutputStream分別為管道輸入流和管道輸出流。管道輸入流通過連線到管道輸出流實現了類似管道的功能,用於執行緒之間的通訊。在使用時,通常由某個執行緒向管道輸出流中寫入資料。根據管道的特性,這些資料會自動傳送到與管道輸出流對應的管道輸入流中。這時其他執行緒就可以從管道輸入流中讀取資料,這樣就實現了執行緒之間的通訊。
管道流與其他流一樣,分為讀入和寫出流,但是管道流是用於從管道內讀取內容和將內容寫入到管道中。並且,在進行管道流物件的建立時,如果構建輸入流物件則需要在輸入流中傳入輸出流物件,構建輸出流物件則需要在構造器中傳入輸入流物件。每次寫入管道時寫入的資料,都將會被儲存在作為快取區進行使用的位元組陣列中。由輸出流寫入管道,再由輸入流從管道中進行讀取。
二、輸出流PipOutPutStream:
首先來看一下輸出流的構造器:
public PipedOutputStream(PipedInputStream snk) throws IOException { connect(snk); } public synchronized void connect(PipedInputStream snk) throws IOException { if (snk == null) { throw new NullPointerException(); } else if (sink != null || snk.connected) { throw new IOException("Already connected"); } sink = snk; snk.in = -1; snk.out = 0; snk.connected = true; }
在進行輸出流的構造時,需要需要傳入一個輸入流的物件,然後將輸入流物件的連線狀態設定為連線,並對其中的引數進行初始化。
之後,再來看write方法:
public void write(byte b[], int off, int len) throws IOException { if (sink == null) { throw new IOException("Pipe not connected"); } else if (b == null) { throw new NullPointerException(); } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return; } sink.receive(b, off, len); }
write方法中,在經過了一系列的引數校驗以後,呼叫了自身持有的輸入流成員變數 sink 的receive方法。這個方法即是管道流的主要核心方法。
三、輸入流 PipInPutStream:
剛才說到的是receive方法,這個方法中的邏輯看起來比較複雜,在看這段程式碼之前,需要先看一下這個類的成員變數有哪些:
欄位 | 說明 |
---|---|
boolean closedByWriter = false; | 管道輸出流是否關閉 |
volatile boolean closedByReader = false; | 管道輸入流是否關閉 |
boolean connected = false; | 管道輸入流是否被連線 |
Thread readSide; | 從管道中讀取資料的執行緒 |
Thread writeSide; | 向管道中寫入資料的執行緒 |
private static final int DEFAULT_PIPE_SIZE = 1024; | 管道迴圈輸入緩衝區的預設大小。 |
protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE; | 管道迴圈輸入緩衝區的預設大小。 |
protected byte buffer[]; | 放置資料的迴圈緩衝區。 |
protected int in = -1; | 緩衝區的位置,當從連線的管道輸出流中接收到下一個資料位元組時,會將其儲存到該位置。 |
protected int out = 0; | 緩衝區的位置,此管道輸入流將從該位置讀取下一個資料位元組。 |
就可以瞭解到這段程式碼的功能。這段程式碼主要用於將一個位元組陣列 b 儲存到快取區 buffer 陣列中。如果buffer快取區已經滿了,但是沒有被讀取,則會等待讀取完畢後再進行繼續儲存。此處唯一要注意的點,為快取區陣列中的內容是不會進行清空的,只會進行迴圈使用。
synchronized void receive(byte b[], int off, int len) throws IOException {
//檢查PipedInputStream狀態,如果不正常,則丟擲異常。
checkStateForReceive();
//獲取將資料寫入管道的執行緒
writeSide = Thread.currentThread();
//需要接收的資料量,初始值為len
int bytesToTransfer = len;
//如果需要接收的資料>0,即還有需要接收的資料
while (bytesToTransfer > 0) {
//如果寫入管道的資料已經被讀完,則等待
if (in == out)
awaitSpace();
//下次要傳輸的位元組數,初始值為0
int nextTransferAmount = 0;
//如果緩衝區未滿(滿後,in會重置為0)
if (out < in) {
//計算下次要傳輸的位元組數。值為緩衝區還剩餘的空間(即in之前的位置都滿了,只有in到末尾的位置可用)
nextTransferAmount = buffer.length - in;
} else if (in < out) {//?
if (in == -1) {//?
in = out = 0;
nextTransferAmount = buffer.length - in;
} else {//如果緩衝區已滿(此時out之後和in之前的位置已滿,只有in到out之間的位置可用)
nextTransferAmount = out - in;
}
}
//如果下次傳輸的位元組數大於需要傳輸的位元組數,這時只需要bytesToTransfer大小的空間就可以了
if (nextTransferAmount > bytesToTransfer)
nextTransferAmount = bytesToTransfer;
assert(nextTransferAmount > 0);
//將nextTransferAmount個位元組從b中複製到緩衝區陣列中
System.arraycopy(b, off, buffer, in, nextTransferAmount);
//重新計算需要傳輸的位元組數,已經傳輸了nextTransferAmount位元組,所以減去nextTransferAmount
bytesToTransfer -= nextTransferAmount;
//b中已經複製的部分以後不能再複製了,所以偏移量off後移nextTransferAmount個位置,防止重複複製
off += nextTransferAmount;
//將in後移nextTransferAmount,防止之前複製的資料被覆蓋
in += nextTransferAmount;
//如果in已經超出緩衝區大小,將in置0,從頭開始寫
if (in >= buffer.length) {
in = 0;
}
}
}
之後是read(byte[] b, int off, int len)方法,該方法中,會從快取區陣列中讀取內容,如果已經讀取出了所有的內容,則會將in標記重新重置為-1。
public synchronized int read(byte b[], int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
/* possibly wait on the first character */
int c = read();
if (c < 0) {
return -1;
}
b[off] = (byte) c;
int rlen = 1;
while ((in >= 0) && (len > 1)) {
int available;
if (in > out) {
available = Math.min((buffer.length - out), (in - out));
} else {
available = buffer.length - out;
}
// A byte is read beforehand outside the loop
if (available > (len - 1)) {
available = len - 1;
}
System.arraycopy(buffer, out, b, off + rlen, available);
out += available;
rlen += available;
len -= available;
if (out >= buffer.length) {
out = 0;
}
if (in == out) {
/* now empty */
in = -1;
}
}
return rlen;
}
public synchronized int read() throws IOException {
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByReader) {
throw new IOException("Pipe closed");
} else if (writeSide != null && !writeSide.isAlive()
&& !closedByWriter && (in < 0)) {
throw new IOException("Write end dead");
}
readSide = Thread.currentThread();
int trials = 2;
while (in < 0) {
if (closedByWriter) {
/* closed by writer, return EOF */
return -1;
}
if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
throw new IOException("Pipe broken");
}
/* might be a writer waiting */
notifyAll();
try {
wait(1000);
} catch (InterruptedException ex) {
throw new java.io.InterruptedIOException();
}
}
int ret = buffer[out++] & 0xFF;
if (out >= buffer.length) {
out = 0;
}
if (in == out) {
/* now empty */
in = -1;
}
return ret;
}
四、總結:
管道流的輸入與輸出必須存在於兩個不同的執行緒中進行使用,在進行管道的輸入流輸入時,如果快取區已經寫滿,卻一直沒有被讀取,則會造成當前執行緒的死鎖。如果在進行從快取區讀取的時候,如果先執行了讀取操作,進行讀取操作的執行緒會認為目前還沒有進行寫入,會在當前執行緒進行等待寫入管道流,同樣會造成死鎖的情況發生。