1. 程式人生 > >JDK原始碼閱讀:InterruptibleChannel與可中斷IO,ig牛逼

JDK原始碼閱讀:InterruptibleChannel與可中斷IO,ig牛逼

Java傳統IO是不支援中斷的,所以如果程式碼在read/write等操作阻塞的話,是無法被中斷的。這就無法和Thead的interrupt模型配合使用了。JavaNIO眾多的升級點中就包含了IO操作對中斷的支援。InterruptiableChannel表示支援中斷的Channel。我們常用的FileChannel,SocketChannel,DatagramChannel都實現了這個介面。
InterruptibleChannel介面

public interface InterruptibleChannel extends Channel
{

/**

  • 關閉當前Channel
  • 任何當前阻塞在當前channel執行的IO操作上的執行緒,都會收到一個AsynchronousCloseException異常
    */
    public void close() throws IOException;
    }

InterruptibleChannel介面沒有定義任何方法,其中的close方法是父介面就有的,這裡只是添加了額外的註釋。
AbstractInterruptibleChannel實現了InterruptibleChannel介面,並提供了實現可中斷IO機制的重要的方法,比如begin(),end()。
在解讀這些方法的程式碼前,先了解一下NIO中,支援中斷的Channel程式碼是如何編寫的。
第一個要求是要正確使用begin()和end()方法:

boolean completed = false;
try {
begin();
completed = ...; // 執行阻塞IO操作
return ...; // 返回結果
} finally {
end(completed);
}

NIO規定了,在阻塞IO的語句前後,需要呼叫begin()和end()方法,為了保證end()方法一定被呼叫,要求放在finally語句塊中。
第二個要求是Channel需要實現java.nio.channels.spi.AbstractInterruptibleChannel#implCloseChannel這個方法。AbstractInterruptibleChannel在處理中斷時,會呼叫這個方法,使用Channel的具體實現來關閉Channel。
接下來我們具體看一下begin()和end()方法是如何實現的。
begin方法

// 儲存中斷處理物件例項
private Interruptible interruptor;
// 儲存被中斷執行緒例項
private volatile Thread interrupted;

protected final void begin() {
// 初始化中斷處理物件,中斷處理物件提供了中斷處理回撥
// 中斷處理回撥登記被中斷的執行緒,然後呼叫implCloseChannel方法,關閉Channel
if (interruptor == null) {
interruptor = new Interruptible() {
public void interrupt(Thread target) {
synchronized (closeLock) {
// 如果當前Channel已經關閉,則直接返回
if (!open)
return;

// 設定標誌位,同時登記被中斷的執行緒
open = false;
interrupted = target;
try {
// 呼叫具體的Channel實現關閉Channel
AbstractInterruptibleChannel.this.implCloseChannel();
} catch (IOException x) { }
}
}};
}

// 登記中斷處理物件到當前執行緒
blockedOn(interruptor);
// 判斷當前執行緒是否已經被中斷,如果已經被中斷,可能登記的中斷處理物件沒有被執行,這裡手動執行一下
Thread me = Thread.currentThread();
if (me.isInterrupted())
interruptor.interrupt(me);
}
從begin()方法中,我們可以看出NIO實現可中斷IO操作的思路,是在Thread的中斷邏輯中,掛載自定義的中斷處理物件,這樣Thread物件在被中斷時,會執行中斷處理物件中的回撥,這個回撥中,執行關閉Channel的操作。這樣就實現了Channel對執行緒中斷的響應了。
接下來重點就是研究“Thread新增中斷處理邏輯”這個機制是如何實現的了,是通過blockedOn方法實現的:

static void blockedOn(Interruptible intr) { // package-private
sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(),intr);
}

blockedOn方法使用的是JavaLangAccess的blockedOn方法。
SharedSecrets是一個神奇而糟糕的類,為啥說是糟糕呢,因為這個方法的存在,就是為了訪問JDK類庫中一些因為類作用域限制而外部無法訪問的類或者方法。JDK很多類與方法是私有或者包級別私有的,外部是無法訪問的,但是JDK在本身實現的時候又存在互相依賴的情況,所以為了外部可以不依賴反射訪問這些類或者方法,在sun包下,存在這麼一個類,提供了各種超越限制的方法。
SharedSecrets.getJavaLangAccess()方法返回JavaLangAccess物件。JavaLangAccess物件就和名稱所說的一樣,提供了java.lang包下一些非公開的方法的訪問。這個類在System初始化時被構造:

// java.lang.System#setJavaLangAccess
private static void setJavaLangAccess() {
sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){
public void blockedOn(Thread t, Interruptible b) {
t.blockedOn(b);
}
//...
});
}

可以看出,sun.misc.JavaLangAccess#blockedOn保證的就是java.lang.Thread#blockedOn這個包級別私有的方法:

/* The object in which this thread is blocked in an interruptible I/O

  • operation, if any. The blocker's interrupt method should be invoked
  • after setting this thread's interrupt status.
    */
    private volatile Interruptible blocker;
    private final Object blockerLock = new Object();

/ Set the blocker field; invoked via sun.misc.SharedSecrets from java.nio code
/
void blockedOn(Interruptible b) {
// 序列化blocker相關操作
synchronized (blockerLock) {
blocker = b;
}
}

而這個方法也非常簡單,就是設定java.lang.Thread#blocker變數為之前提到的中斷處理物件。而且從註釋中可以看出,這個方法就是專門為NIO設計的,註釋都非常直白的提到了,NIO的程式碼會通過sun.misc.SharedSecrets呼叫到這個方法。。
接下來就是重頭戲了,看一下Thread在中斷時,如何呼叫NIO註冊的中斷處理器:

public void interrupt() {
if (this != Thread.currentThread())
checkAccess();

synchronized (blockerLock) {
Interruptible b = blocker;

// 如果NIO設定了中斷處理器,則只需Thread本身的中斷邏輯後,呼叫中斷處理器的回撥函式
if (b != null) {
interrupt0(); // 這一步會設定interrupt標誌位
b.interrupt(this);
return;
}
}

// 如果沒有的話,就走普通流程
interrupt0();
}

end方法
begin()方法負責新增Channel的中斷處理器到當前執行緒。end()是在IO操作執行完/中斷完後的操作,負責判斷中斷是否發生,如果發生判斷是當前執行緒發生還是別的執行緒中斷把當前操作的Channel給關閉了,對於不同的情況,丟擲不同的異常。

protected final void end(boolean completed) throws AsynchronousCloseException
{
// 清空執行緒的中斷處理器引用,避免執行緒一直存活導致中斷處理器無法被回收
blockedOn(null);
Thread interrupted = this.interrupted;

if (interrupted != null && interrupted == Thread.currentThread()) {
interrupted = null;
throw new ClosedByInterruptException();
}

// 如果這次沒有讀取到資料,並且Channel被另外一個執行緒關閉了,則排除Channel被非同步關閉的異常
// 但是如果這次讀取到了資料,就不能丟擲異常,因為這次讀取的資料是有效的,需要返回給使用者的(重要邏輯)
if (!completed && !open)
throw new AsynchronousCloseException();
}
通過程式碼可以看出,如果是當前執行緒被中斷,則丟擲ClosedByInterruptException異常,表示Channel因為執行緒中斷而被關閉了,IO操作也隨之中斷了。
如果是當前執行緒發現Channel被關閉了,並且是讀取還未執行完畢的情況,則丟擲AsynchronousCloseException異常,表示Channel被非同步關閉了。
end()邏輯的活動圖如下:

場景分析
併發的場景分析起來就是複雜,上面的程式碼不多,但是場景很多,我們以sun.nio.ch.FileChannelImpl#read(java.nio.ByteBuffer)為例分析一下可能的場景:

A執行緒read,B執行緒中斷A執行緒:A執行緒丟擲ClosedByInterruptException異常
A,B執行緒read,C執行緒中斷A執行緒
A被中斷時,B剛剛進入read方法:A執行緒丟擲ClosedByInterruptException異常,B執行緒ensureOpen方法丟擲ClosedChannelException異常
A被中斷時,B阻塞在底層read方法中:A執行緒丟擲ClosedByInterruptException異常,B執行緒底層方法丟擲異常返回,end方法中丟擲AsynchronousCloseException異常
A被中斷時,B已經讀取到資料:A執行緒丟擲ClosedByInterruptException異常,B執行緒正常返回
sun.nio.ch.FileChannelImpl#read(java.nio.ByteBuffer)程式碼如下:

public int read(ByteBuffer dst) throws IOException {
ensureOpen(); // 1
if (!readable) // 2
throw new NonReadableChannelException();
synchronized (positionLock) {
int n = 0;
int ti = -1;
try {
begin();
ti = threads.add();
if (!isOpen())
return 0; // 3
do {
n = IOUtil.read(fd, dst, -1, nd); // 4
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
} finally {
threads.remove(ti);
end(n > 0);
assert IOStatus.check(n);
}
}
}

總結
在JavaIO時期,人們為了中斷IO操作想了不少方法,核心操作就是關閉流,促使IO操作丟擲異常,達到中斷IO的效果。NIO中,將這個操作植入了java.lang.Thread#interrupt方法,免去使用者自己編碼特定程式碼的麻煩。使IO操作可以像其他可中斷方法一樣,在中斷時丟擲ClosedByInterruptException異常,業務程式捕獲該異常即可對IO中斷做出響應。