1. 程式人生 > >NIO-Channel介面分析

NIO-Channel介面分析

目錄

  • NIO-Channel原始碼分析
    • 目錄
    • 前言
    • 介面
      • SCTP協議
      • UDP協議
      • TCP協議
      • 檔案
    • 總結
    • 相關文獻


NIO-Channel原始碼分析

目錄

NIO-概覽
NIO-Buffer
NIO-Channel
NIO-Channel原始碼分析

前言

本來是想學習Netty的,但是Netty是一個NIO框架,因此在學習netty之前,還是先梳理一下NIO的知識。通過剖析原始碼理解NIO的設計原理。

本系列文章針對的是JDK1.8.0.161的原始碼。

上一篇介紹了Channel的基本使用,下面對Channel的介面進行分析。

介面

SCTP協議

SCTP(Stream Control Transmission Protocol)是一種傳輸協議,在TCP/IP協議棧中所處的位置和TCP、UDP類似,兼有TCP/UDP兩者特徵。

對於SCTP協議這裡不詳細描述,想了解的同學可以看下這篇文章
SCTP協議平時用的不多,這裡不做具體討論。

UDP協議

NIO使用DatagrmChannel實現了UDP協議的網路通訊。

下面我們對各個介面進行分析。

AutoCloseableCloseable

分別是自動關閉和主動關閉介面。當資源(如控制代碼或檔案等)需要釋放時,則需要呼叫close方法釋放資源。

public interface AutoCloseable {
    void close() throws Exception;
}

public interface Closeable extends AutoCloseable {
    void close() throws IOException;
}

Channel是通道介面,針對於I/O相關的操作,需要開啟和關閉操作。

public interface Channel extends Closeable {
    boolean isOpen();

    void close() throws IOException;
}

InterruptibleChannel是支援非同步關閉和中斷的通道介面。為了支援Thead的interrupt模型,當執行緒中斷時,可以執行中斷處理物件的回撥,從而關閉釋放Channel。

public interface InterruptibleChannel extends Channel {
    void close() throws IOException;
}

關於InterruptibleChannel可中斷I/O詳細解析可以看一下《JDK原始碼閱讀-InterruptibleChannel與可中斷IO》

Interruptible是執行緒中斷介面,即上面提的Thead的interrupt模型。當執行緒中斷時,則會呼叫中斷操作。

public abstract interface Interruptible {
    public abstract void interrupt(java.lang.Thread t);
}
public class Thread implements Runnable {
    ...
    public void interrupt() {
        if (this != Thread.currentThread())
            checkAccess();

        synchronized (blockerLock) {
            Interruptible b = blocker;
            if (b != null) {
                interrupt0();           // Just to set the interrupt flag
                b.interrupt(this);
                return;
            }
        }
        interrupt0();
    }
    ...
}

AbstractInterruptibleChannel實現了ChannelInterruptibleChannel介面。

  • closeLock是關閉時的鎖
  • open表示channle是否開啟
  • interuptorInterruptible中斷回撥
  • interrupted為I/O執行時的執行緒
public abstract class AbstractInterruptibleChannel implements Channel, InterruptibleChannel {
    ...
    public final void close() throws IOException {
        synchronized(this.closeLock) {
            if (this.open) {
                this.open = false;
                this.implCloseChannel();
            }
        }
    }
    //具體的Channel實現關閉
    protected abstract void implCloseChannel() throws IOException;

    protected final void begin() {
        if (this.interruptor == null) {
            this.interruptor = new Interruptible() {
                //執行緒中斷時,則會呼叫該介面關閉Channel
                public void interrupt(Thread target) {
                    synchronized(AbstractInterruptibleChannel.this.closeLock) {
                        if (AbstractInterruptibleChannel.this.open) {
                            AbstractInterruptibleChannel.this.open = false;
                            AbstractInterruptibleChannel.this.interrupted = target;

                            try {
                                AbstractInterruptibleChannel.this.implCloseChannel();
                            } catch (IOException x) {
                            }

                        }
                    }
                }
            };
        }
        //將執行緒的blockOn設定為當前interruptor,從而使得執行緒關閉時能關閉channel
        blockedOn(this.interruptor);
        Thread me = Thread.currentThread();
        if (me.isInterrupted()) {
            this.interruptor.interrupt(me);
        }

    }

    protected final void end(boolean completed)
        throws AsynchronousCloseException
    {
        //I/O結束,清除執行緒blocker
        blockedOn(null);
        Thread interrupted = this.interrupted;
        if (interrupted != null && interrupted == Thread.currentThread()) {
            interrupted = null;
            throw new ClosedByInterruptException();
        }
        if (!completed && !open)
            throw new AsynchronousCloseException();
    }

    static void blockedOn(Interruptible intr) {
        SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr);
    }
}

AbstractInterruptibleChannel添加了beginend方法。 在I/O操作開始時會呼叫begin,在I/O操作結束時會呼叫end。在begin方法內將中斷操作加入到當前執行緒中。最終會呼叫到執行緒的blockOn方法,它會將該中斷介面注入到執行緒中,使得執行緒中斷時可以呼叫到Channel並釋放相關資源。

public void blockedOn(Thread t, Interruptible b) {
    t.blockedOn(b);
}

SelectableChannel介面聲明瞭Channel是可以被選擇的,在Windows平臺通過WindowsSelectorImpl實現,Linux通過EPollSelectorImpl實現。此外還有KQueue等實現,關於Selector具體細節在《NIO-Selector》一文中會介紹。

AbstractSelectableChannel實現了SelectableChannel介面。

NetworkChannel適用於網路傳輸的介面。

public interface NetworkChannel extends Channel {
    //繫結地址
    NetworkChannel bind(SocketAddress var1) throws IOException;

    //獲取本地地址
    SocketAddress getLocalAddress() throws IOException;

    //設定socket選項
    <T> NetworkChannel setOption(SocketOption<T> var1, T var2) throws IOException;
    //獲取socket選項
    <T> T getOption(SocketOption<T> var1) throws IOException;
    //當前通道支援的socket選項
    Set<SocketOption<?>> supportedOptions();
}

MulticastChannel是支援組播介面。

public interface MulticastChannel extends NetworkChannel {
    void close() throws IOException;

    MembershipKey join(InetAddress group, NetworkInterface interf) throws IOException;

    MembershipKey join(InetAddress group, NetworkInterface interf, InetAddress source) throws IOException;
}

SelChImpl介面用於將底層的I/O就緒狀態更新為就緒事件。

public interface SelChImpl extends Channel {

    FileDescriptor getFD();
    int getFDVal();
    //更新就緒事件
    public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk);
    //設定就緒事件
    public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk);
    //將底層的輪詢操作轉換為事件
    void translateAndSetInterestOps(int ops, SelectionKeyImpl sk);
    //返回channle支援的操作,比如讀操作、寫操作等
    int validOps();
    void kill() throws IOException;
}

由於UDP支援讀寫資料,因此還實現了ReadableByteChannelWritableByteChannel介面

public interface ReadableByteChannel extends Channel {
    int read(ByteBuffer dst) throws IOException;
}   

public interface WritableByteChannel extends Channel {
    int write(ByteBuffer src) throws IOException;
}

ByteChannel是支援讀寫的通道。

public interface ByteChannel extends ReadableByteChannel, WritableByteChannel {
}

ScatteringByteChannel則支援根據傳入偏移量讀,支援根據傳入偏移量寫GatheringByteChannel

public interface ScatteringByteChannel extends ReadableByteChannel {
    long read(ByteBuffer[] dsts, int offset, int length) throws IOException;
    long read(ByteBuffer[] dsts) throws IOException;}

public interface GatheringByteChannel extends WritableByteChannel {
    long write(ByteBuffer[] srcs, int offset, int length) throws IOException;
    long write(ByteBuffer[] srcs) throws IOException;
}

TCP協議

客戶端

TCP協議除了不支援組播,其他和UDP是一樣的,不再重複介紹。

服務端

服務端無需資料讀寫,僅需要接收連線,資料讀寫是SocketChannel乾的事。因此沒有ReadableByteChannelWriteableByteChannel等讀寫介面

檔案

檔案比網路協議少了NetworkChannelSelChImplSelectableChannelSelChImplSelectableChannel主要是用於支援選擇器的,由於網路傳輸大多數連線時空閒的,而且資料何時會到來並不知曉,同時需要支援高併發來連線,因此支援多路複用技術可以顯著的提高效能,而磁碟讀寫則沒有該需求,因此無需選擇器。

SeekableByteChannel可以通過修改position支援從指定位置讀寫資料。

public interface SeekableByteChannel extends ByteChannel {
    int read(ByteBuffer dst) throws IOException;

    int write(ByteBuffer src) throws IOException;
    
    long position() throws IOException;
    //設定偏移量
    SeekableByteChannel position(long newPosition) throws IOException;

    long size() throws IOException;
    //擷取指定大小
    SeekableByteChannel truncate(long size) throws IOException;
}

總結

由於文章篇幅比較長,因此還是將介面分析和實現分析分開。本篇文章對Channel的介面進行說明,下一篇將對具體的實現進行分析。

相關文獻

  1. SCTP協議詳解
  2. 史上最強Java NIO入門:擔心從入門到放棄的,請讀這篇!
  3. Java NIO系列教程
  4. 為什麼SCTP沒有被大量使用/知道
  5. JDK原始碼閱讀-InterruptibleChannel與可中斷IO
  6. 廣播和組播
  7. 關於AccessController.doPrivileged
  8. ServiceLoader原始碼分析
  9. 基於Java的RDMA高效能通訊庫(六):SDP - Java Socket Direct Protocol


微信掃一掃二維碼關注訂閱號傑哥技術分享
出處:https://www.cnblogs.com/Jack-Blog/p/12040082.html
作者:傑哥很忙
本文使用「CC BY 4.0」創作共享協議。歡迎轉載,請在明顯位置給出出處及連結。