1. 程式人生 > >PipedOutStream和PipedInputStream管道流




管道流分為輸出管道流(PipedOutStream)和輸入管道流(PipedInputStream)。如果要進行管道輸出,必須把輸出管道流連線到輸入管道流上。輸出流管道PipedOutStream通過public synchronized void connect(PipedInputStream snk) throws IOException方法與輸入管道流管道建立連線,當然也可以反過來,通過輸入管道流的public void connect(PipedOutputStream src) throws IOException方法與輸出管道流建立連線,其本質也是呼叫了輸出流管道的connect。執行緒1通過PipedOutStream類呼叫PipedInputStream的recieve方法將位元組流資料寫入PipedInputStream的迴圈緩衝區(或者環形緩衝區)buffer陣列中,執行緒2通過PipedInputStream的read方法從緩衝區的讀取資料。PipedInputStream類中定義了兩個私有變數in和out用於表示緩衝區儲存位元組的索引位置和讀取位元組的索引位置,如果in<-1,表明緩衝區為空,如果in等於out表明緩衝區已經存滿,那麼執行緒1會阻塞,等待執行緒2從緩衝區中讀取資料。



package java.io;

import java.io.*;

class PipedOutputStream extends OutputStream {
    private PipedInputStream sink;

    public PipedOutputStream(PipedInputStream snk)  throws IOException {

    public PipedOutputStream() {

    public synchronized void connect(PipedInputStream snk) throws IOException {
        if (snk == null) {//輸入流管道未例項化
            throw new NullPointerException();
        } else if (sink != null || snk.connected) {//已經與該輸入流管道建立連線
            throw new IOException("Already connected");
        sink = snk;
        snk.in = -1;
        snk.out = 0;
        snk.connected = true;

    public void write(int b)  throws IOException {
        if (sink == null) {
            throw new IOException("Pipe not connected");

    public void write(byte b[], int off, int len) throws IOException {
        if (sink == null) {
            throw new IOException("Pipe not connected");
        } else if (b == null) {
            throw new NullPointerException();
        } else if ((off < 0) || (off > b.length) || (len < 0) ||
                   ((off + len) > b.length) || ((off + len) < 0)) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
        sink.receive(b, off, len);

     * Flushes this output stream and forces any buffered output bytes
     * to be written out.
     * This will notify any readers that bytes are waiting in the pipe.
     * @exception IOException if an I/O error occurs.
    public synchronized void flush() throws IOException {
        if (sink != null) {
            synchronized (sink) {

     * Closes this piped output stream and releases any system resources
     * associated with this stream. This stream may no longer be used for
     * writing bytes.
     * @exception  IOException  if an I/O error occurs.
    public void close()  throws IOException {
        if (sink != null) {

PipedInputStream :

package java.io;

public class PipedInputStream extends InputStream {
    boolean closedByWriter = false;
    volatile boolean closedByReader = false;
    boolean connected = false;

    Thread readSide;
    Thread writeSide;

    private static final int DEFAULT_PIPE_SIZE = 1024;

    protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;

    protected byte buffer[];

    protected int in = -1;

    protected int out = 0;

    public PipedInputStream(PipedOutputStream src) throws IOException {
        this(src, DEFAULT_PIPE_SIZE);

    public PipedInputStream(PipedOutputStream src, int pipeSize)
            throws IOException {

    public PipedInputStream() {

    public PipedInputStream(int pipeSize) {

    private void initPipe(int pipeSize) {
         if (pipeSize <= 0) {
            throw new IllegalArgumentException("Pipe Size <= 0");
         buffer = new byte[pipeSize];

    public void connect(PipedOutputStream src) throws IOException {

    protected synchronized void receive(int b) throws IOException {
        writeSide = Thread.currentThread();
        if (in == out)
        if (in < 0) {
            in = 0;
            out = 0;
        buffer[in++] = (byte)(b & 0xFF);
        if (in >= buffer.length) {
            in = 0;

    synchronized void receive(byte b[], int off, int len)  throws IOException {
        writeSide = Thread.currentThread();
        int bytesToTransfer = len;
        while (bytesToTransfer > 0) {
            if (in == out)
            int nextTransferAmount = 0;
            if (out < in) {
                nextTransferAmount = buffer.length - in;
            } else if (in < out) {
                if (in == -1) {
                    in = out = 0;
                    nextTransferAmount = buffer.length - in;
                } else {
                    nextTransferAmount = out - in;
            if (nextTransferAmount > bytesToTransfer)
                nextTransferAmount = bytesToTransfer;
            assert(nextTransferAmount > 0);
            System.arraycopy(b, off, buffer, in, nextTransferAmount);
            bytesToTransfer -= nextTransferAmount;
            off += nextTransferAmount;
            in += nextTransferAmount;
            if (in >= buffer.length) {
                in = 0;

    private void checkStateForReceive() throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByWriter || closedByReader) {
            throw new IOException("Pipe closed");
        } else if (readSide != null && !readSide.isAlive()) {
            throw new IOException("Read end dead");

    private void awaitSpace() throws IOException {
        while (in == out) {

            /* full: kick any waiting readers */
            try {
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();

    synchronized void receivedLast() {
        closedByWriter = true;

    public synchronized int read()  throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
            throw new IOException("Pipe closed");
        } else if (writeSide != null && !writeSide.isAlive()
                   && !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");

        readSide = Thread.currentThread();
        int trials = 2;
        while (in < 0) {
            if (closedByWriter) {
                /* closed by writer, return EOF */
                return -1;
            if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
                throw new IOException("Pipe broken");
            /* might be a writer waiting */
            try {
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();
        int ret = buffer[out++] & 0xFF;
        if (out >= buffer.length) {
            out = 0;
        if (in == out) {
            /* now empty */
            in = -1;

        return ret;

    public synchronized int read(byte b[], int off, int len)  throws IOException {
        if (b == null) {
            throw new NullPointerException();
        } else if (off < 0 || len < 0 || len > b.length - off) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return 0;

        /* possibly wait on the first character */
        int c = read();
        if (c < 0) {
            return -1;
        b[off] = (byte) c;
        int rlen = 1;
        while ((in >= 0) && (len > 1)) {

            int available;

            if (in > out) {
                available = Math.min((buffer.length - out), (in - out));
            } else {
                available = buffer.length - out;

            // A byte is read beforehand outside the loop
            if (available > (len - 1)) {
                available = len - 1;
            System.arraycopy(buffer, out, b, off + rlen, available);
            out += available;
            rlen += available;
            len -= available;

            if (out >= buffer.length) {
                out = 0;
            if (in == out) {
                /* now empty */
                in = -1;
        return rlen;

    public synchronized int available() throws IOException {
        if(in < 0)
            return 0;
        else if(in == out)
            return buffer.length;
        else if (in > out)
            return in - out;
            return in + buffer.length - out;

    public void close()  throws IOException {
        closedByReader = true;
        synchronized (this) {
            in = -1;



package com.leboop;

import java.io.IOException;
import java.io.PipedOutputStream;

public class Sender implements Runnable {
	private PipedOutputStream pos = null;
	private byte[] bytes = null;
	public Sender(String data) {
		this.pos = new PipedOutputStream();
		this.bytes = data.getBytes();

	public void connect(Reciever reciever){
		try {
		} catch (IOException e) {
			// TODO Auto-generated catch block
	public void run() {
		try {
		} catch (IOException e) {
			// TODO Auto-generated catch block
		try {
		} catch (IOException e) {
			// TODO Auto-generated catch block



package com.leboop;

import java.io.IOException;
import java.io.PipedInputStream;

public class Reciever implements Runnable {
	private PipedInputStream pis = new PipedInputStream();
	private byte[] b = new byte[1024];
	public Reciever() {

	public PipedInputStream getPis() {
		return pis;

	public void setPis(PipedInputStream pis) {
		this.pis = pis;

	public void run() {
		try {
			int len = this.pis.read(b);
			System.out.println(new String(b,0,len));
		} catch (IOException e) {
			// TODO Auto-generated catch block



package com.leboop;

public class PipeTest {

	public static void main(String[] args) {
		Sender sender = new Sender("Hello World!");
		Reciever reciever = new Reciever();
		new Thread(sender).start();
		new Thread(reciever).start();

結果輸出Hello World!。