1. 程式人生 > >PipedOutStream和PipedInputStream管道流

PipedOutStream和PipedInputStream管道流

以JDK1.8原始碼為例。 

管道流的主要作用是進行兩個執行緒之間的通訊,如圖:

管道流分為輸出管道流(PipedOutStream)和輸入管道流(PipedInputStream)。如果要進行管道輸出,必須把輸出管道流連線到輸入管道流上。輸出流管道PipedOutStream通過public synchronized void connect(PipedInputStream snk) throws IOException方法與輸入管道流管道建立連線,當然也可以反過來,通過輸入管道流的public void connect(PipedOutputStream src) throws IOException方法與輸出管道流建立連線,其本質也是呼叫了輸出流管道的connect。執行緒1通過PipedOutStream類呼叫PipedInputStream的recieve方法將位元組流資料寫入PipedInputStream的迴圈緩衝區(或者環形緩衝區)buffer陣列中,執行緒2通過PipedInputStream的read方法從緩衝區的讀取資料。PipedInputStream類中定義了兩個私有變數in和out用於表示緩衝區儲存位元組的索引位置和讀取位元組的索引位置,如果in<-1,表明緩衝區為空,如果in等於out表明緩衝區已經存滿,那麼執行緒1會阻塞,等待執行緒2從緩衝區中讀取資料。

原始碼如下:

PipedOutputStream: 

package java.io;

import java.io.*;

//輸出流管道類,它是資料的傳送端,將資料傳送到輸入流管道
public
class PipedOutputStream extends OutputStream {
    //輸入流管道
    private PipedInputStream sink;

    //構造具有輸入流管道的輸出流管道
    public PipedOutputStream(PipedInputStream snk)  throws IOException {
        //連線輸入流管道
        connect(snk);
    }

    public PipedOutputStream() {
    }

    //連線輸入流管道
    public synchronized void connect(PipedInputStream snk) throws IOException {
        if (snk == null) {//輸入流管道未例項化
            throw new NullPointerException();
        } else if (sink != null || snk.connected) {//已經與該輸入流管道建立連線
            throw new IOException("Already connected");
        }
        sink = snk;
        //輸入流管道中環形緩衝的資料索引
        snk.in = -1;
        snk.out = 0;
        snk.connected = true;
    }

    //向輸入流管道中寫入位元組
    public void write(int b)  throws IOException {
        if (sink == null) {
            throw new IOException("Pipe not connected");
        }
        //輸入流管道接收資料
        sink.receive(b);
    }

    //向輸入流管道中寫入位元組陣列從指定位置開始的len長度的位元組
    public void write(byte b[], int off, int len) throws IOException {
        if (sink == null) {
            throw new IOException("Pipe not connected");
        } else if (b == null) {
            throw new NullPointerException();
        } else if ((off < 0) || (off > b.length) || (len < 0) ||
                   ((off + len) > b.length) || ((off + len) < 0)) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return;
        }
        //輸入管道接收資料
        sink.receive(b, off, len);
    }

    /**
     * Flushes this output stream and forces any buffered output bytes
     * to be written out.
     * This will notify any readers that bytes are waiting in the pipe.
     *
     * @exception IOException if an I/O error occurs.
     */
    public synchronized void flush() throws IOException {
        if (sink != null) {
            synchronized (sink) {
                sink.notifyAll();
            }
        }
    }

    /**
     * Closes this piped output stream and releases any system resources
     * associated with this stream. This stream may no longer be used for
     * writing bytes.
     *
     * @exception  IOException  if an I/O error occurs.
     */
    public void close()  throws IOException {
        if (sink != null) {
            sink.receivedLast();
        }
    }
}

PipedInputStream :

package java.io;

public class PipedInputStream extends InputStream {
    //關閉輸入管道向其他地方輸出:false表示未關閉,true表示可以向其他地方輸出
    boolean closedByWriter = false;
    //關閉從輸出管道讀取資料:false表示未關閉
    volatile boolean closedByReader = false;
    //是否有連線的輸出管道
    boolean connected = false;

    //從輸出管道讀取資料到輸入管道的執行緒
    Thread readSide;
    //從輸入管道寫資料到其他地方的寫入執行緒
    Thread writeSide;

    //預設輸入管道大小
    private static final int DEFAULT_PIPE_SIZE = 1024;

    //管道大小
    protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;

    //存放資料的環形緩衝位元組陣列
    protected byte buffer[];

    //從已連線的輸出管道接收到的資料位元組將儲存在迴圈緩衝的位置索引
    //in小於0表示緩衝是空的,in等於out表示緩衝是滿的
    protected int in = -1;

    //輸入管道讀取的下一個位元組在迴圈緩衝中的位置索引
    protected int out = 0;

    //構造指定輸出管道和大小1024的輸入管道
    public PipedInputStream(PipedOutputStream src) throws IOException {
        this(src, DEFAULT_PIPE_SIZE);
    }

    //構造指定輸出管道和大小的輸入管道
    public PipedInputStream(PipedOutputStream src, int pipeSize)
            throws IOException {
         initPipe(pipeSize);
         //連線輸出管道
         connect(src);
    }

    //構造預設大小1024的輸入管道,使用前需要連線輸出管道
    public PipedInputStream() {
        initPipe(DEFAULT_PIPE_SIZE);
    }

    //初始化一個具有一定大小的輸入管道
    public PipedInputStream(int pipeSize) {
        initPipe(pipeSize);
    }

    //初始化輸入管道
    private void initPipe(int pipeSize) {
         if (pipeSize <= 0) {
            throw new IllegalArgumentException("Pipe Size <= 0");
         }
         buffer = new byte[pipeSize];
    }

    //連線輸出管道
    public void connect(PipedOutputStream src) throws IOException {
        src.connect(this);
    }

    //由輸出流管道呼叫此方法,將資料寫入輸入流管道
    protected synchronized void receive(int b) throws IOException {
        //檢查接收端狀態
        checkStateForReceive();
        //寫入端:實際是輸出流管道例項執行緒
        writeSide = Thread.currentThread();
        //如果環形緩衝已經寫滿,等待空白空間
        if (in == out)
            awaitSpace();
        if (in < 0) {
            in = 0;
            out = 0;
        }
        //將輸出流管道位元組資料放入緩衝中
        buffer[in++] = (byte)(b & 0xFF);
        //寫到緩衝最後,從頭開始寫,所謂環形緩衝或迴圈緩衝
        if (in >= buffer.length) {
            in = 0;
        }
    }

    
    synchronized void receive(byte b[], int off, int len)  throws IOException {
        checkStateForReceive();
        writeSide = Thread.currentThread();
        int bytesToTransfer = len;
        while (bytesToTransfer > 0) {
            if (in == out)
                awaitSpace();
            int nextTransferAmount = 0;
            if (out < in) {
                nextTransferAmount = buffer.length - in;
            } else if (in < out) {
                if (in == -1) {
                    in = out = 0;
                    nextTransferAmount = buffer.length - in;
                } else {
                    nextTransferAmount = out - in;
                }
            }
            if (nextTransferAmount > bytesToTransfer)
                nextTransferAmount = bytesToTransfer;
            assert(nextTransferAmount > 0);
            System.arraycopy(b, off, buffer, in, nextTransferAmount);
            bytesToTransfer -= nextTransferAmount;
            off += nextTransferAmount;
            in += nextTransferAmount;
            if (in >= buffer.length) {
                in = 0;
            }
        }
    }

    //檢查輸入管道狀態
    private void checkStateForReceive() throws IOException {
        if (!connected) {
            //未與輸出管道連線,拋異常
            throw new IOException("Pipe not connected");
        } else if (closedByWriter || closedByReader) {
            //向輸入流管道寫入資料已經關閉,或者從輸出流管道讀取資料已經關閉,丟擲異常
            throw new IOException("Pipe closed");
        } else if (readSide != null && !readSide.isAlive()) {
            //讀取端執行緒已經死亡
            throw new IOException("Read end dead");
        }
    }

    private void awaitSpace() throws IOException {
        while (in == out) {
            checkStateForReceive();

            /* full: kick any waiting readers */
            notifyAll();
            try {
                wait(1000);
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();
            }
        }
    }

    //最後一個位元組已經接收到後,標記向輸入流管道的寫入已經關閉。喚醒所有等待的執行緒
    synchronized void receivedLast() {
        closedByWriter = true;
        notifyAll();
    }

    
    public synchronized int read()  throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
            throw new IOException("Pipe closed");
        } else if (writeSide != null && !writeSide.isAlive()
                   && !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");
        }

        readSide = Thread.currentThread();
        int trials = 2;
        while (in < 0) {
            if (closedByWriter) {
                /* closed by writer, return EOF */
                return -1;
            }
            if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
                throw new IOException("Pipe broken");
            }
            /* might be a writer waiting */
            notifyAll();
            try {
                wait(1000);
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();
            }
        }
        int ret = buffer[out++] & 0xFF;
        if (out >= buffer.length) {
            out = 0;
        }
        if (in == out) {
            /* now empty */
            in = -1;
        }

        return ret;
    }

    
    public synchronized int read(byte b[], int off, int len)  throws IOException {
        if (b == null) {
            throw new NullPointerException();
        } else if (off < 0 || len < 0 || len > b.length - off) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return 0;
        }

        /* possibly wait on the first character */
        int c = read();
        if (c < 0) {
            return -1;
        }
        b[off] = (byte) c;
        int rlen = 1;
        while ((in >= 0) && (len > 1)) {

            int available;

            if (in > out) {
                available = Math.min((buffer.length - out), (in - out));
            } else {
                available = buffer.length - out;
            }

            // A byte is read beforehand outside the loop
            if (available > (len - 1)) {
                available = len - 1;
            }
            System.arraycopy(buffer, out, b, off + rlen, available);
            out += available;
            rlen += available;
            len -= available;

            if (out >= buffer.length) {
                out = 0;
            }
            if (in == out) {
                /* now empty */
                in = -1;
            }
        }
        return rlen;
    }

    //輸入流中可讀取的位元組數
    public synchronized int available() throws IOException {
        if(in < 0)
            return 0;
        else if(in == out)
            return buffer.length;
        else if (in > out)
            return in - out;
        else
            return in + buffer.length - out;
    }

    
    public void close()  throws IOException {
        closedByReader = true;
        synchronized (this) {
            in = -1;
        }
    }
}

下面我們看一個執行緒通訊的例子:

Sender:

package com.leboop;

import java.io.IOException;
import java.io.PipedOutputStream;

public class Sender implements Runnable {
	private PipedOutputStream pos = null;
	private byte[] bytes = null;
	
	public Sender(String data) {
		this.pos = new PipedOutputStream();
		this.bytes = data.getBytes();
	}

	public void connect(Reciever reciever){
		try {
			this.pos.connect(reciever.getPis());
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	@Override
	public void run() {
		try {
			pos.write(bytes);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		try {
			pos.close();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

Reciever:

package com.leboop;

import java.io.IOException;
import java.io.PipedInputStream;

public class Reciever implements Runnable {
	private PipedInputStream pis = new PipedInputStream();
	private byte[] b = new byte[1024];
	public Reciever() {
	}

	public PipedInputStream getPis() {
		return pis;
	}

	public void setPis(PipedInputStream pis) {
		this.pis = pis;
	}

	@Override
	public void run() {
		try {
			int len = this.pis.read(b);
			System.out.println(new String(b,0,len));
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

PipeTest:

package com.leboop;

public class PipeTest {

	public static void main(String[] args) {
		Sender sender = new Sender("Hello World!");
		Reciever reciever = new Reciever();
		sender.connect(reciever);
		new Thread(sender).start();
		new Thread(reciever).start();
	}
}

結果輸出Hello World!。