Okio精簡高效的IO庫
本節主要講講Okhttp底層使用的IO庫–Okio,Okio同樣是Square公司推出的增強型IO處理庫,旨在增強原生Java IO流的處理,以更加簡便,高效的方式處理IO流操作。接下來我會從以下方面來分析它。
- Okio的特點和優勢
- Okio結構分析
- Okio的流程分析,讀與寫的實現
- Buffer寫資料的精華操作
- Buffer快取的總結
- TimeOut超時機制
- Gzip壓縮簡要分析
- 總結
1. Okio的特點和優勢
我們知道Java原生的IO處理已經很強大了,有針對位元組和字元的輸入輸出介面,實現有快取的處理,以及各種子類實現比如檔案的(FileInputStream和FileOutputStream),資料的(DataInputStream和DataOutputStream),物件的(ObjectInputStream和ObjectOutputStream)等等。為什麼Square還要搞出個Okio來呢?其實吧,我們要明白,Okio不是用來完全取代原生IO的,事實上它本身也是基於原生IO之上的,比如要從檔案得到一個檔案輸入流,還是得通過FileIntputStream來得到,所以Okio的用意不是取代,而是在某些場合作更加優化的使用,意思就是你原生IO有些地方沒有做好,我要用我自己的方式得到更高效簡便的IO處理。那麼Okio具體有哪些優勢呢?主要有以下:
1.精簡的api介面。
我們知道原生的Java IO流使用是比較複雜的,基礎的位元組流介面有InpuStream和OutputStream,字元流介面有Reader和Writer,每個介面都有很多實現的子類,裡面大量使用了裝飾著模式。假如我要建立一個DataOutputStream用於將一些資料型別資料輸出到檔案中,我可能需要經歷FileOutputStream->BufferedOutputStream->DataOutputStream的建立過程。而如果使用Okio來操作的話可以很簡單。
File file = new File(Environment.getExternalStorageDirectory() + "/" + "output.txt");
String name = "xiaoming";
int age = 11;
//使用原生IO
DataOutputStream dos = null;
try {
dos = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file)));
dos.writeChars(name);
dos.writeInt(age);
}catch (IOException ex){
ex.printStackTrace();
}finally {
try {
if(dos != null){
dos.close();
}
}catch (Exception ex){
ex.printStackTrace();
}
}
//使用Okio
try {
Okio.buffer(Okio.sink(file)).writeUtf8(name).writeInt(age).close();
} catch (FileNotFoundException e) {
e.printStackTrace();
}catch (IOException ex){
ex.printStackTrace();
}
從上面可以看出Okio可以很方便的使用鏈式呼叫,一句程式碼就可以完成流的建立->操作->關閉,是不是有一氣呵成的感覺。
Okio為了精簡我們常用的位元組字元流操作,抽象出了BufferedSource快取流輸入和BufferedSink快取流輸出介面,它們具有處理位元組和字元,和資料快取的功能。這麼看,是不是Okio一個介面實現了原生IO中三個介面的功能呢。
2.效能高效的快取處理功能。
這應該是推出Okio的關鍵原因吧,Okio不滿足於原生IO中BufferedOutputStream等快取流的簡單粗暴的快取處理,轉而自己使用更高效的方式處理處理流中資料快取的操作。
我們看看原生中快取的處理。
原生中快取的處理是這樣的,每個buffer流中維護一個預設大小8192的buf陣列,自己負責建立和回收,當快取的資料快要滿時(buf剩餘空間不足以儲存這一次的資料時),就會將buf中的快取資料全部輸出,然後重新快取。如果一次操作的資料大小大於快取的大小(預設8192),那麼快取就沒法使用了,因為快取一次性存不了這麼多資料。
然後原生中的快取與快取之間沒有直接的交流,這樣造成的影響是,輸入流中的資料轉移到輸出流中是:輸入buf -> 臨時byte陣列 -> 輸出buf,經歷兩次拷貝。
原生中的快取功能看起來並不高效,存在不少問題,Okio要改變這種窘境。
- Buffer互動增強。首先是快取互動的設計,我們看Source的read和Sink的write方法都是直接將對方的Buffer傳入進去。這樣的用意就是,增加輸入和輸出之間快取的互動,減少額外的臨時空間用於資料拷貝。例如從輸入流中讀取資料寫入到輸出流的這個過程,是先從Source流讀取資料到Source快取Buffer,然後從Source快取Buffer將資料轉移到Sink快取BUffer(注意,這裡是直接從Source的Buffer快取轉移資料到Sink的Buffer,相比原生的臨時byte陣列是不是少了一次資料拷貝過程?),最後到Sink快取BUffer將資料flush到輸出流中。
- Buffer結構優化,資料轉移優化。不像原生中使用一個預設8192的大陣列存放快取資料,Okio轉而使用一個個大小2048的小陣列(這段陣列資料封裝為Segment),採用雙鏈表的方式組織在一起,形成一條前後迴圈的以小段陣列為單位的連結串列。那麼這樣的結構有一個潛在的好處就是,操作的資料是以一小段陣列(segment)為單位,從source流的快取資料中轉移到sink流緩資料中的過程,大部分請求下就不必像原生中對位元組資料一個個拷貝過去,而是可以直接指向過去,注意,是直接指向,將source中的該小段陣列(Segment)從source的快取連結串列中移除,新增到sink中快取連結串列中,不需要任何的資料拷貝工作,這顯然可以節省很多cpu時間。
- 靈活智慧的Segment資料控制。同時這個segment又是一個很靈活智慧的小段陣列,它會考慮和它的前一個segment進行資料的合併(compact),節省出一個segment空間。同時它也支援將一個segment分割(split)為兩個相連的segment,這樣的話可以將原來[offset, limit]的資料,分割為[offset, offset+byteCount]和[offset+byteCount, limit]兩段segment資料,這樣的好處在於,分離出來後,操作更靈活,比如可以將[offset, offset+byteCount]這段segment直接從source快取指向sink快取,免去了資料拷貝工作,而其實這種分離操作,只是邏輯上的分離,這兩個分離的segment其實還是用的通過一個數組資料,也就是說它們共享同一段資料,只是它們標記資料範圍不一樣而已。
- Segment資料池的引入。你以為就這樣了嗎,Okio為了防止一個個的segment資料被頻繁的建立和銷燬,使用了一個SegmentPool用於維護使用完成的segment資料,由它來管理segment的銷燬或迴圈使用。SegmentPool提供了64*1024的最大大小,也就是說它可以容納最多32個segment資料。它提供了take用於獲取可使用的segment和recycle用來判定是否回收。這樣的好處在於,segment資料和Buffer是獨立分開的,Buffer只負責segment的使用,而不負責對它的建立和銷燬,轉而由SegmentPool來進行管理,這樣的話,segment就像是公有資源一樣,Buffer使用完了之後交還給SegmentPool,下一個Buffer需要了再從SegmentPool中獲取就可以了,這樣大大的減少了記憶體的分配和回收的開銷。
可見Okio為了提高IO快取的高效處理效能可謂是煞費苦心,從輸入輸出快取的直接資料對接,到內部Segment結構的引入,以Segment為單位進行資料操作的高效,以及Segment池的引入等等不一而足,為的就是儘可能快的完成IO操作。
3.TimeOut超時的引入
我們知道原生IO中是沒有超時這個機制的,如果在輸入或輸出過程中發生阻塞,那麼在這個過程中就沒有好的方式對它進行中斷操作,在丟擲IO異常前可能會一直阻塞下去,這顯然不時我們想要的結果,我們希望如果如果它在阻塞到一定時間後能夠丟擲異常告訴我們發生TimeOut超時了。因此Okio推出了TimeOut機制,實現有TimeOut(同步計時的超時處理)和AsyncTimeOut(非同步計時的超時處理)。
- 同步超時TimeOut。TimeOut會在每次的讀寫操作中判斷是否到達了超時時間,進而做超時處理,因而它有個缺點,就是如果在讀寫方法中,它一直阻塞,那麼TimeOut的計時方法也將被阻塞,這樣的超時情況,它也無能為力去判斷了。
- 非同步超時AsyncTimeOut。而AsyncTimeOut則不同,有一個單獨的執行緒Watchdog(姑且稱它為看門狗吧)用於監控這些AsyncTimeOut是否超時,如果某個AsyncTimeOut超時了,它就汪汪兩聲呼叫AsyncTimeOut的timedOut方法,而你需要做的是實現這個timedOut方法,比如在Okio實現Socket的超時中,它實現的timedOut是關閉Socket,這樣如果一直阻塞,看門狗發現超時了,會呼叫AsyncTimeOut的timedOut,然後就會關閉關閉Socket,這樣系統就會丟擲IO異常,阻塞也就中止了。
我想在描述了以上優點和它大致的實現原理之後,你已經對Okio已經有初步的瞭解了。大概知道了它有什麼功能,是怎麼樣的設計理念。接下來對Okio中重要的類作一個簡單的介紹,能讓你快速熟悉類的結構。
2. Okio結構分析
Okio有一些重要類:
Source,Sink
Okio中封裝的輸入流介面和輸出流介面,對應原生IO的InputStream和OutputStream。分別引入了read方法用於直接將資料讀取到傳入的Sink的Buffer快取中,和引入了write方法用於直接從傳入的Source快取中讀取資料並寫入到自己的Buffer快取中。然後還有timeout提供超時介面。
BufferedSource,BufferedSink
帶有快取功能的Source介面,Sink介面,分別繼承自Source和Sink。同時提供一系列讀寫位元組,字元資料的介面。
Okio
Okio是Okio庫的入口類,也是工廠類,它提供source方法得到一個Source輸入流,提供sink方法得到一個Sink輸出流,提供buffer方法得到具有快取功能的Source或者Sink物件。它提供對File,Socket,和(InputStream,OutputStream)三種類型的源進行操作,可見,Okio其實是構建在(InputStream,OutputStream)之上的,得到封裝之後的(Source,Sink)。
Segment
一小段陣列資料的鏈式節點封裝結構,由它們可以組成一個迴圈連線的雙向連結串列佇列。
- 該Segment結構包含pre和next可以找到前一個和後一個的Segment節點。
- 內部維護著一段固定大小2048的陣列資料,pos記錄下一個可讀的位置,limit記錄下一個可寫的位置,因此,0到pos的資料部分是已經標記讀取過了的無效資料區域,pos到limit之間的就是該Segment的有效資料區域,limit到Segment大小部分是還可以再寫入資料的區域。
- pop用於彈出當前Segment並返回下一個Segment。
- push用於壓入一個Segment到當前Segment的後面。
- split用於一個Segment分割成兩個相連的Segment,之前Segment的有效資料區域[offset,limit]被分割成前一個Segment的有效區域[offset,offset+byteCount]和後一個Segment的有效區域[offset+byteCount,limit]部分。
- compact用於考慮將當前Segment和前一個Segment的進行合併。如果前一個Segment的可寫區域[limt,Segment.SIZE]大於當前Segment的有效資料區域[pos, limit],則可以將當前Segment的有效資料寫到前一個Segment中去,然後刪除當前Segment,節省一個Segment空間。
SegmentPool
管理Segment的池,使用單鏈表記錄無用的Segment,提供了take獲取一個可用的Segment,提供recycle將無用的Segment進行回收或維護。如果SegmentPool中的Segment的數量小於32個,recycle時會將它加入到單鏈表中記錄起來,同時重置pos和limit以方便後期的Segment重用。如果超過了32個了,則recycle不進行任何操作,這將導致該Segment沒有任何引用了,也就將會被回收了。
Buffer
Okio的核心類,用於資料快取的管理,它實現了BufferedSource和BufferedSink介面,它還支援兩個Buffer之間資料的轉移(copyTo,注意是轉移,不是拷貝,轉移的話就是資料指向傳送改變了,速度不是拷貝能比的),這就是為啥Buffer這麼牛逼的原因了,因為它是唯一一個既能進行讀取資料管理,又能進行寫入資料管理,而且相互之間還能直接資料轉移操作,真是神一樣的存在。
RealBufferedSource,RealBufferedSink
RealBufferedSource是快取Source介面的具體實現,繼承自BufferedSource。同時提供一系列讀取位元組,字元資料的介面。內部的操作基本都是有Buffer來參與處理的,首先會呼叫request來讀取source裡的一段資料到Buffer中,然後後續的讀取資料都是從Buffer中讀的。
RealBufferedSink快取Sink介面的具體實現,繼承自BufferedSink。同時提供一系列寫入位元組,字元資料的介面。內部的操作基本都是有Buffer來參與處理的,首先會將資料寫到Buffer中,然後呼叫emitCompleteSegments,如果Buffer儲存快取資料的size小於Segment大小的一半,即1024的話,不會可以繼續快取,否則會將快取的內容全部寫到輸出中。
3. Okio的流程
上面已經說明了大部分Okio相關類的資訊和作用了,那麼Okio是怎麼樣的一個執行流程呢?我想從一個簡單的入口,來逐步分析Okio在此期間經歷了些什麼。下面是將srcFile檔案的資料全部複製到檔案destFile中
try {
Okio.buffer(Okio.sink(destFile)).writeAll(Okio.buffer(Okio.source(srcFile)));
} catch (FileNotFoundException e) {
e.printStackTrace();
}catch (IOException ex){
ex.printStackTrace();
}
看看,一句話就搞定了兩個檔案之間的資料複製。它其實等價於
try {
RealBufferedSource source = Okio.buffer(Okio.sink(srcFile));
RealBufferedSink sink = Okio.buffer(Okio.sink(destFile));
sink.writeAll(source);
} catch (FileNotFoundException e) {
e.printStackTrace();
}catch (IOException ex){
ex.printStackTrace();
}
也就是進入到RealBufferedSink.writeAll方法了,我們看裡面做了些什麼
final class RealBufferedSink implements BufferedSink {
@Override public long writeAll(Source source) throws IOException {
if (source == null) throw new IllegalArgumentException("source == null");
long totalBytesRead = 0;
//迴圈中,不斷從source讀取資訊到Buffer中
for (long readCount; (readCount = source.read(buffer, Segment.SIZE)) != -1; ) {
totalBytesRead += readCount;
//考慮將Buffer的資料輸出到sink
emitCompleteSegments();
}
return totalBytesRead;
}
}
讀取資料的處理
我們先看看source.read(buffer, Segment.SIZE)是怎麼完成讀取工作的
final class RealBufferedSource implements BufferedSource {
@Override public long read(Buffer sink, long byteCount) throws IOException {
if (sink == null) throw new IllegalArgumentException("sink == null");
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (closed) throw new IllegalStateException("closed");
if (buffer.size == 0) {
//如果當前source中的buffer快取沒有資料,則先讀取一部分資料到快取中,省的每次都從source讀取
long read = source.read(buffer, Segment.SIZE);
if (read == -1) return -1;
}
//接著就是從buffer中讀取byteCount個數據到sink中,但是byteCount大小不能超過buffer快取的大小,因為當前只有這麼多快取資料
long toRead = Math.min(byteCount, buffer.size);
return buffer.read(sink, toRead);
}
}
注意不要被上面的sink和buffer給迷惑了,sink表示要寫入到Sink的快取Buffer,而buffer是當前source的快取Buffer。上面做的就是首先從source讀取一段資料到自己的buffer中,然後再從buffer讀取資料到對方的sink快取中,雖然指定了要讀取byteCount個數據,但是實際能讀的大小要看buffer的size,最大不會超過Segment.SIZE。
這裡你可能會疑惑 source.read(buffer, Segment.SIZE) 和 buffer.read(sink, toRead) 是怎麼實現的。
我們先看看source.read(buffer, Segment.SIZE)
public final class Okio {
private static Source source(final InputStream in, final Timeout timeout) {
if (in == null) throw new IllegalArgumentException("in == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");
return new Source() {
@Override public long read(Buffer sink, long byteCount) throws IOException {
//這裡就是不帶快取的Source的讀取方式
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (byteCount == 0) return 0;
//判斷是否超時(同步方式)
timeout.throwIfReached();
//這裡獲取sink快取中最後一個Segment,準備將資料讀取到這個Segment中
Segment tail = sink.writableSegment(1);
//判斷這個Segment還能容納多少資料
int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
//這裡是從原生的InputStream輸入流中讀取資料到Segment的陣列中
int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
if (bytesRead == -1) return -1;
tail.limit += bytesRead;
sink.size += bytesRead;
return bytesRead;
}
@Override public void close() throws IOException {
in.close();
}
@Override public Timeout timeout() {
return timeout;
}
@Override public String toString() {
return "source(" + in + ")";
}
};
}
}
它的真實實現是在這裡的,就是先找到sink快取的最後一個Segment,然後將從InputStream輸入流中讀取資料到該Segment的陣列中。所以它完成的操作就是從輸入流讀取一部分資料到buffer快取中。 接著再看 buffer.read(sink, toRead) 是怎麼實現的。
public final class Buffer implements BufferedSource, BufferedSink, Cloneable {
@Override public long read(Buffer sink, long byteCount) {
if (sink == null) throw new IllegalArgumentException("sink == null");
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (size == 0) return -1L;
if (byteCount > size) byteCount = size;
//呼叫write方法,將自身Buffer的部分資料寫到sink的Buffer中
sink.write(this, byteCount);
return byteCount;
}
}
很簡單就是,Buffer的read操作就相當於呼叫對方Buffer的write寫入資料,其實就是兩個Buffer之間資料的傳遞過程,這是重點部分,我們在後面重點講解。接下來我們看Buffer有了資料之後是怎麼輸出到輸出流的。
寫入資料的處理
資料讀取到Buffer快取之後,我們再看emitCompleteSegments是幹嘛的
final class RealBufferedSink implements BufferedSink {
@Override public BufferedSink emitCompleteSegments() throws IOException {
if (closed) throw new IllegalStateException("closed");
//這裡是計算buffer是否有Segment的資料已經滿了,如果有的話,就會將滿了的Segment資料寫入到sink中
long byteCount = buffer.completeSegmentByteCount();
if (byteCount > 0) sink.write(buffer, byteCount);
return this;
}
}
public final class Buffer implements BufferedSource, BufferedSink, Cloneable {
/**
* Returns the number of bytes in segments that are not writable. This is the
* number of bytes that can be flushed immediately to an underlying sink
* without harming throughput.
*/
public long completeSegmentByteCount() {
// result是Buffer中所有的Segment的有效資料大小
long result = size;
if (result == 0) return 0;
// Omit the tail if it's still writable.
Segment tail = head.prev;
if (tail.limit < Segment.SIZE && tail.owner) {
result -= tail.limit - tail.pos;
}
//tail.limit - tail.pos; 是最後一個Segment的有效資料大小
//所以result如果大於0,說明Buffer中至少有兩個Segment了,也就是有Segment滿了。
return result;
}
}
在emitCompleteSegments中判斷Buffer中的是否有兩個以上的Segment了(也就是說有Segment滿了),如果有的話,會將之前滿了的Segment資料全部輸出到sink中(留下最後一段未滿的Segment資料繼續作為快取用),也就是將滿的那部分快取資料flush到輸出流中。
4. Buffer寫資料的精華操作
Buffer的write(Buffer source, long byteCount)方法是Buffer快取處理中的精華操作,它描述的是將一個Buffer的資料轉移到另一個Buffer中時,是怎麼樣的一個處理過程。這裡保留了英文註釋,讓你能更原汁原味的瞭解其中的含義。
public final class Buffer implements BufferedSource, BufferedSink, Cloneable {
@Override public void write(Buffer source, long byteCount) {
// Move bytes from the head of the source buffer to the tail of this buffer
// while balancing two conflicting goals: don't waste CPU and don't waste
// memory.
//
//
// Don't waste CPU (ie. don't copy data around).
//
// Copying large amounts of data is expensive. Instead, we prefer to
// reassign entire segments from one buffer to the other.
//
//
// Don't waste memory.
//
// As an invariant, adjacent pairs of segments in a buffer should be at
// least 50% full, except for the head segment and the tail segment.
//
// The head segment cannot maintain the invariant because the application is
// consuming bytes from this segment, decreasing its level.
//
// The tail segment cannot maintain the invariant because the application is
// producing bytes, which may require new nearly-empty tail segments to be
// appended.
//
//
// Moving segments between buffers
//
// When writing one buffer to another, we prefer to reassign entire segments
// over copying bytes into their most compact form. Suppose we have a buffer
// with these segment levels [91%, 61%]. If we append a buffer with a
// single [72%] segment, that yields [91%, 61%, 72%]. No bytes are copied.
//
// Or suppose we have a buffer with these segment levels: [100%, 2%], and we
// want to append it to a buffer with these segment levels [99%, 3%]. This
// operation will yield the following segments: [100%, 2%, 99%, 3%]. That
// is, we do not spend time copying bytes around to achieve more efficient
// memory use like [100%, 100%, 4%].
//
// When combining buffers, we will compact adjacent buffers when their
// combined level doesn't exceed 100%. For example, when we start with
// [100%, 40%] and append [30%, 80%], the result is [100%, 70%, 80%].
//
//
// Splitting segments
//
// Occasionally we write only part of a source buffer to a sink buffer. For
// example, given a sink [51%, 91%], we may want to write the first 30% of
// a source [92%, 82%] to it. To simplify, we first transform the source to
// an equivalent buffer [30%, 62%, 82%] and then move the head segment,
// yielding sink [51%, 91%, 30%] and source [62%, 82%].
if (source == null) throw new IllegalArgumentException("source == null");
if (source == this) throw new IllegalArgumentException("source == this");
//檢查操作,讀取的byteCount大小不能超過source快取Buffer的大小。
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
// Is a prefix of the source's head segment all that we need to move?
//要複製的資料size小於source中Segment有效資料的size
if (byteCount < (source.head.limit - source.head.pos)) {
//獲取寫入Buffer中最後一個可寫的Segment
Segment tail = head != null ? head.prev : null;
if (tail != null && tail.owner
&& (byteCount + tail.limit - (tail.shared ? 0 : tail.pos) <= Segment.SIZE)) {
// Our existing segments are sufficient. Move bytes from source's head to our tail.
//如果最後一個Segment存在,並且它是獨立的(不是共享其他Segment的),要寫的byteCount+有效資料大小<Segment的大小
//也就是說整個Segment的可以容易現存有效資料和要寫入的byteCount個數據。
//則將資料byteCount個數據複製過去。
//這裡首先會將sink的陣列資料整體前移offset,然後在複製byteCount個數據到sink中,意思就是丟棄sink前面offset的資料,騰出空間來放更多的資料,所以是驗證byteCount+有效資料大小<Segment的大小。
source.head.writeTo(tail, (int) byteCount);
source.size -= byteCount;
size += byteCount;
return;
} else {
// We're going to need another segment. Split the source's head
// segment in two, then move the first of those two to this buffer.
//如果要寫的byteCount資料不能全部寫到最後一個Segment中,那就要考慮將source中的Segment進行拆分了
//拆分之後,就可以將這個byteCount個數組組成的Segment移動到新的Buffer中,不用複製資料
source.head = source.head.split((int) byteCount);
}
}
// Remove the source's head segment and append it to our tail.
//將這個source的Segment從頭部移除,新增到自己Buffer的尾部
Segment segmentToMove = source.head;
long movedByteCount = segmentToMove.limit - segmentToMove.pos;
//將這個source的Segment從頭部移除
source.head = segmentToMove.pop();
if (head == null) {
head = segmentToMove;
head.next = head.prev = head;
} else {
Segment tail = head.prev;
tail = tail.push(segmentToMove);
//添加了新的Segment之後,看看這個Segment能不能和前一個Segment進行資料合併,節省出一個Segment空間
tail.compact();
}
source.size -= movedByteCount;
size += movedByteCount;
byteCount -= movedByteCount;
}
}
}
這裡根據不同情況進行處理。
1. 首先判斷操作的byteCount是否是在source的當前Segment範圍內。如果是在範圍內,判斷操作的byteCount資料在要寫入的Segment中是否容納的下,如果容納的下,則將資料複製過去(copy),返回完成操作。如果容納不下,就考慮進行split拆分,拆分之後,就不考慮複製資料了,而是後面的直接將整個Segment移動到目標Buffer中(move)。
2. 接下來就是移動Segment了,直接從soucrce中移除頭部的Segment,然後新增到目標Buffer的Segment尾部,新增之後,判斷是否需要和前一個Segment進行資料合併,以騰出一個Segment空間。
所以我們可以發現,這個操作非常精彩,對於目標Segment容納的下的小段資料,採用直接複製的方法,而大段的Segment資料,則是直接移動,而不是複製,只是一個引用指向的變化,那效率超級高啊,這個設計很絕妙,所以說Okio為什麼要設計成一小段的Segment,因為段小好操作啊,你要複製多少個數據,我可以根據情況來考慮大段的整個移動,小段的採用複製,而如果像原生IO那一大段的陣列,就只能乖乖的採用複製的方法了。
接下來我們分析小段資料的複製,slit分割,compact合併的實現。
先看小段資料的複製
final class Segment {
/** Moves {@code byteCount} bytes from this segment to {@code sink}. */
public void writeTo(Segment sink, int byteCount) {
if (!sink.owner) throw new IllegalArgumentException();
if (sink.limit + byteCount > SIZE) {
// We can't fit byteCount bytes at the sink's current position. Shift sink first.
if (sink.shared) throw new IllegalArgumentException();
if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
//其實就是將sink陣列的資料整體前移pos個位置,丟棄pos之前的資料
System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
sink.limit -= sink.pos;
sink.pos = 0;
}
//將資料複製到sink陣列中
System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
sink.limit += byteCount;
pos += byteCount;
}
}
很簡單,就是處理了sink陣列的整體前移,然後將資料複製到sink中。接下來看split分割的實現
final class Segment {
/**
* Splits this head of a circularly-linked list into two segments. The first
* segment contains the data in {@code [pos..pos+byteCount)}. The second
* segment contains the data in {@code [pos+byteCount..limit)}. This can be
* useful when moving partial segments from one buffer to another.
*
* <p>Returns the new head of the circularly-linked list.
*/
public Segment split(int byteCount) {
if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
//建立一個新的Segment,並且宣告它是共享的,即共享另外一個Segment的資料
Segment prefix = new Segment(this);
//這裡聲明瞭新Segment的有效資料範圍[pos,pos+byteCount],它作為前置的Segment
prefix.limit = prefix.pos + byteCount;
//這裡聲明瞭原來Segment的有效資料範圍[pos+byteCount,limit],它作為後置的Segment
pos += byteCount;
//將新Segment新增到原來Segment的前面,作為前置Segment
prev.push(prefix);
return prefix;
}
Segment(Segment shareFrom) {
this(shareFrom.data, shareFrom.pos, shareFrom.limit);
shareFrom.shared = true;
}
}
以上將一個Segment拆分為相連的兩個Segment,新Segment共享原來Segment的資料,新Segment作為前置Segment,有效範圍[pos,pos+byteCount],原來Segment作為後置,有效範圍[pos+byteCount,limit]。為什麼要split分割?就是為了能將單個Segment邏輯分為兩個Segment,以便完成byteCount的獨立操作,比如整體移動,而不用進行耗時的複製操作。因為是共享一個數組資料的,所以沒有多佔用什麼記憶體空間,只是邏輯上分離,有了各自獨立的有效區域標識而已,資料還是公用的,這個想法很秒。接下來再看compact合併的實現
final class Segment {
/**
* Call this when the tail and its predecessor may both be less than half
* full. This will copy data so that segments can be recycled.
*/
public void compact() {
if (prev == this) throw new IllegalStateException();
if (!prev.owner) return; // Cannot compact: prev isn't writable.
//當前Segment的有效資料size
int byteCount = limit - pos;
//前一個Segment的可用空間,包括pos之前部分和limit之後的部分
int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
//若果前一個Segment的可用空間能容納當前Segment的資料,則複製資料過去,然後移除當前Segment,交給SegmentPool回收
if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
writeTo(prev, byteCount);
pop();
SegmentPool.recycle(this);
}
}
以上判斷前一個Segment的可用空間是否能容納當前Segment的資料,如果能容納,則複製資料到前一箇中,移除當前的Segment,交給SegmentPool回收,這樣可用騰出一個Segment空間,這個合併資料操作同樣也是個優化操作。
5. Buffer快取的總結
講到這裡,如果你都明白了的話,我想你已經能體會到Okio優化的精妙之處,構建在原生輸入輸出流的基礎上,捨棄原生IO的快取功能,自己實現一套流讀取和寫入的快取機制,大大提高了快取使用的效率。我們對比原生IO和Okio快取處理時,資料從輸入流到輸出流的工程。
- 原生IO快取處理過程
InputStream -> inBuffer -> 臨時byte[] -> outBuffer -> OutpuStream- Okio快取處理過程
InputStream -> inBuffer -> outBuffer -> OutpuStream,可以發現少了中轉臨時byte[]的過程,inBuffer -> outBuffer之間直接互動資料。同時inBuffer和outBuffer很多時候是可以共享Segment資料,這意味這個inBuffer -> outBuffer不是單純的複製資料,而是可以以Segment為單位,直接從inBuffer去pop移除,然後push新增到outBuffer中,這意味著inBuffer和outBuffer之間的資料很多情況下就是共享的,也就是說會更加趨近與InputStream -> Buffer -> OutpuStream,那麼現在是不是更能體會Okio的優化策略,它將之前的inBuffer -> 臨時byte[] -> outBuffer優化成一體的了。
6. TimeOut超時機制
TimeOut超時機制有同步實現的TimeOut和非同步實現的AsynTimeOut。我們來分析這兩個
TimeOut同步超時
同步超時可以作用在sourc和sink中,這裡我們以souce為例,sink原理相同
public final class Okio {
/** Returns a source that reads from {@code in}. */
public static Source source(final InputStream in) {
//傳入一個預設的Timeout,預設是沒有超時效果的
return source(in, new Timeout());
}
private static Source source(final InputStream in, final Timeout timeout) {
if (in == null) throw new IllegalArgumentException("in == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");
return new Source() {
@Override public long read(Buffer sink, long byteCount) throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (byteCount == 0) return 0;
//這裡就是同步判斷是否會超時的,它是阻塞的,如果這個read方法阻塞了,那麼它無法進行正常判斷了。
timeout.throwIfReached();
Segment tail = sink.writableSegment(1);
int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
if (bytesRead == -1) return -1;
tail.limit += bytesRead;
sink.size += bytesRead;
return bytesRead;
}
@Override public void close() throws IOException {
in.close();
}
@Override public Timeout timeout() {
return timeout;
}
@Override public String toString() {
return "source(" + in + ")";
}
};
}
}
我們進去看timeout.throwIfReached()的實現
public class Timeout {
/**
* Throws an {@link InterruptedIOException} if the deadline has been reached or if the current
* thread has been interrupted. This method doesn't detect timeouts; that should be implemented to
* asynchronously abort an in-progress operation.
*/
public void throwIfReached() throws IOException {
//如果執行緒標記為中斷了,丟擲執行緒中斷異常
if (Thread.interrupted()) {
throw new InterruptedIOException("thread interrupted");
}
//如果設定了有超時限制,並且當前時間超過了超時時間,則丟擲超時異常
if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
throw new InterruptedIOException("deadline reached");
}
}
}
同步超時異常還是很簡單的,就是在read或者write方法中進行時間判斷是否超時。它有缺陷就是如果read或者write傳送阻塞了,就不能及時判斷是否超時了。所以有了AsynTimeOut非同步超時的機制。
AsyncTimeOut非同步超時
AsyncTimeOut在Okio中只用作了處理Socket中,當然它也可以用到其他地方,由你實現。我們從這裡分析
public final class Okio {
/**
* Returns a source that reads from {@code socket}. Prefer this over {@link
* #source(InputStream)} because this method honors timeouts. When the socket
* read times out, the socket is asynchronously closed by a watchdog thread.
*/
public static Source source(final Socket socket) throws IOException {
if (socket == null) throw new IllegalArgumentException("socket == null");
//獲取一個針對Socket的AsyncTimeout
AsyncTimeout timeout = timeout(socket);
//根據socket獲取source
Source source = source(socket.getInputStream(), timeout);
//這裡是重點,包裝一個新的Source來處理TimeOut
return timeout.source(source);
}
private static AsyncTimeout timeout(final Socket socket) {
return new AsyncTimeout() {
@Override protected IOException newTimeoutException(IOException cause) {
InterruptedIOException ioe = new SocketTimeoutException("timeout");
if (cause != null) {
ioe.initCause(cause);
}
return ioe;
}
@Override protected void timedOut() {
//發生超時,關閉Socket
try {
socket.close();
} catch (Exception e) {
logger.log(Level.WARNING, "Failed to close timed out socket " + socket, e);
} catch (AssertionError e) {
if (e.getCause() != null && e.getMessage() != null
&& e.getMessage().contains("getsockname failed")) {
// Catch this exception due to a Firmware issue up to android 4.2.2
// https://code.google.com/p/android/issues/detail?id=54072
logger.log(Level.WARNING, "Failed to close timed out socket " + socket, e);
} else {
throw e;
}
}
}
};
}
}
上面建立了一個AsycTimeOut物件,用於處理髮生超時時關閉Socket。根據Socket得到一個輸入流的Source,然後交給AsycTimeOut.source處理得到一個新的Source物件,那麼新的Source有什麼特別的呢,我們往下看
public class AsyncTimeout extends Timeout {
/**
* Returns a new source that delegates to {@code source}, using this to
* implement timeouts. This works best if {@link #timedOut} is overridden to
* interrupt {@code sink}'s current operation.
*/
public final Source source(final Source source) {
//新建了一個source物件
return new Source() {
@Override public long read(Buffer sink, long byteCount) throws IOException {
boolean throwOnTimeout = false;
//這裡enter中呼叫了scheduleTimeout,進行TimeOut的排程
enter();
//以下是資料讀取,沒什麼特別的
try {
long result = source.read(sink, byteCount);
throwOnTimeout = true;
return result;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
@Override public void close() throws IOException {
boolean throwOnTimeout = false;
try {
source.close();
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
//這裡會從超時佇列中移除超時
exit(throwOnTimeout);
}
}
@Override public Timeout timeout() {
return AsyncTimeout.this;
}
@Override public String toString() {
return "AsyncTimeout.source(" + source + ")";
}
};
}
public final void enter() {
if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
//超時時間
long timeoutNanos = timeoutNanos();
//是否有設定超時
boolean hasDeadline = hasDeadline();
if (timeoutNanos == 0 && !hasDeadline) {
return; // No timeout and no deadline? Don't bother with the queue.
}
inQueue = true;
//排程TimeOut
scheduleTimeout(this, timeoutNanos, hasDeadline);
}
/**
* Returns either {@code cause} or an IOException that's caused by
* {@code cause} if a timeout occurred. See
* {@link #newTimeoutException(java.io.IOException)} for the type of
* exception returned.
*/
final IOException exit(IOException cause) throws IOException {
if (!exit()) return cause;
return newTimeoutException(cause);
}
/** Returns true if the timeout occurred. */
public final boolean exit() {
if (!inQueue) return false;
inQueue = false;
//從超時佇列移除超時
return cancelScheduledTimeout(this);
}
}
以上就是建立了一個新的Source,在Source每次在read的之前,呼叫enter,其實就是scheduleTimeout來排程AsyncTimeout,而完成read時,呼叫exit,其實就是cancelScheduledTimeout來