1. 程式人生 > >從TcpSocket上讀取資料的三種方式

從TcpSocket上讀取資料的三種方式

我在一個專案中碰到了一個TcpSocket的應用。在java程式中使用TcpSocket同本機的一個服務進行程序間的通訊。

由於通訊路徑只是單機並沒有經過網路,因此兩個程序之間的互通相對與網路傳輸是比較快速的。因此,程序間的互動使用瞭如下方式:

(見上傳圖片)

讓我們看一下程式碼實現:


public synchronized void send(byte[] bytes) throws IOException
{
if (bytes != null && bytes.length > 0)
{
this.bos.write(bytes);
this.bos.flush();
}
}

/**
* 嘗試讀取一次,速度最快,但返回的資訊可能不全
*/
public synchronized byte[] receiveOnce() throws IOException
{
byte[] reciveBytes = new byte[0];
int len = this.bis.read(this.b_buf, 0, this.bufferSize);

return ArrayUtils.addAll(reciveBytes, ArrayUtils.subarray(this.b_buf, 0, len));
}


/**
* 用於傳送並接收資料,在返回資料比較少的情況下使用
*/
public byte[] sendAndReceiveOnce(byte[] bytes) throws IOException
{
this.send(bytes);
return this.receiveOnce();
}


我們通過呼叫sendAndReceiveOnce()來接收並返回資料。

這樣實現會導致什麼問題呢?

1. 最明顯的就是傳送資料,和接收資料在同一個執行緒裡邊,如果執行在主執行緒,那麼主執行緒將會在執行receiveOnce()的時候停滯,除非接收到資料或者觸發超時異常。但之前已經說明了,應用本函式的環境是單機的程序間的通訊,因此接收到資料的時間實際上就取決於Server處理並給予響應時間的長短,不存在網路傳輸的滯留時間。所以,如果在Server響應快速的情況下,客戶端Socket幾乎感覺不到延遲太多。

2. 再一個明顯問題就是,receiveOnce()根據其函式名稱就可以得知,它只能讀取一次,如果返回資訊的長度,大於其緩衝區的長度,那它只能得到部分資料了。

綜合分析以上兩種情況,我們先解決問題2吧,看看如何讀取完整的響應資訊?

下面是我的實現方法:

   
/**
* 從流的開頭開始讀取,讀取全部的輸入資料並返回 如果執行read的時候流中沒有資料則阻塞讀取直到超時
* 最少經過一次超時,因此速度比較慢,但讀的時候流中沒有資料可以等到超時 ,因此獲取資料比較準確
*
* @return
* @throws 除了SocketTimeoutException的一切IOException
*/
public synchronized byte[] blockReceive() throws IOException
{
byte[] reciveBytes = new byte[0];
// 偏移量
int offset = 0;
// 每次讀取的位元組數
int len = 0;
while(len != -1)
{
try
{
len = this.in.read(this.buffer, 0, this.bufferSize);
}
catch(SocketTimeoutException e)
{
break;
}

if(len != -1)
{
reciveBytes = ArrayUtils.addAll(reciveBytes, ArrayUtils
.subarray(this.buffer, 0, len));
offset = offset + len;
}
}

return reciveBytes;
}


這個方法就如同它的註釋所說的那樣,它總是嘗試去僅可能多的讀取資訊,但是在觸發了一次超時之後將會返回讀取到的位元組。

有人提問:那判斷流是否已經讀完不是看讀到最後會返回-1麼?
我的回答是:返回-1取決於處理的流的源頭是什麼,如果是檔案流,這種想法或許是對的,因為檔案流的大小是固定的,持續的讀,總會讀到檔案的末尾(EOF),它總會返回-1的。

就像下面的檔案流在讀取檔案的實現一樣,我們這樣讀取檔案流:
    
protected byte[] receiveBytes() throws IOException
{
byte[] reciveBytes = new byte[0];
// 偏移量
int offset = 0;
// 每次讀取的位元組數
int len = 0;
while(len != -1)
{
this.buffer = new byte[bufferSize];
len = this.in.read(this.buffer, 0, this.bufferSize);

if(len != -1)
{
reciveBytes = ArrayUtils.addAll(reciveBytes, this.buffer);
offset = offset + len;
}
}
return ArrayUtils.subarray(reciveBytes, 0, offset);
}


但是,如果是網路流,例如TcpSocket,這樣的流是沒有末尾(EOF)的,如果你想讀到-1,或許在遠端被關閉了,而你還在讀取,還是有可能讀到-1的。實際情況是:網路連線狀況很複雜,很有可能遠端沒有正常關閉而是程序死掉了,而是連線的線路斷掉了,或者任何一個原因導致連線的通路無法正常傳輸資料。[b]由於在這種情況下,java中BufferedInputStream的read(byte[] b, int off, int len)函式(其它流物件也一樣)總是嘗試讀取更多的資料,如果沒有設定超時,就會一直堵塞在那裡,像死掉了一樣。而不是像你所期待的那樣,返回-1。[/b]因此,我們才才用讓它最少經過一次超時,來嘗試讀取更多的資料。當然,這也是僅僅在網路狀況足夠好的情況下,或者超時對於響應結果不會影響太多的情況下的解決方法。

[i](加一個小插曲:前段時間本人曾經在電話裡面被一個面試我的作開發的兄弟考到“怎麼獲取tcpScoket遠端關閉”,我就是闡述了因為以上觀點“檢測遠端是否關閉的最好方法就是向遠端傳送資料,看是否發生IO異常”,而那位仁兄一直堅持遠端關閉得到-1是對的,我說不對就反問:如果不是關閉,而是把網線切斷或者網路不通也能得到-1麼?仁兄語塞,面試後來以尷尬結束。後來反思自己當時實在太輕狂了,沒給仁兄面子。去不去應聘倒無所謂,但是態度還是不是面試時應該有的態度啊。現在想向那位仁兄道歉,也沒機會了)[/i]

那現在又有一個問題了,雖然與遠端的互動出現無法讀取到資料的時候不會一直堵塞在那裡,像死掉了一樣。但是我在使用blockReceive()的時候[b]總是需要等一個超時的時間[/b]才能返回!

那我如果設超時為5秒,操作完了之後,也至少等到5秒才能達到訊息的反饋。哦,天哪,慢到要死了!!!!
這個問題必須解決,那麼:
讓我們用更聰明的實現方法吧,我們不要阻塞了!!!


/**
* 如果流中有資料,則從流的開頭開始讀取,讀取全部的輸入資料並返回,否則馬上返回空 嘗試獲取當前流中的最大資料量,
* 由於使用了非阻塞接收,需要保證在執行本函式的時候流中恰好有資料, 在執行此函式之前必須給後臺足夠的響應時間
*
* @return
* @throws 除了SocketTimeoutException的一切IOException
*/
public synchronized byte[] unblocReceive() throws IOException
{
byte[] reciveBytes = new byte[0];
// 當前流中的最大可讀數
int contentLength = this.in.available();
// 偏移量
int offset = 0;
// 每次讀取的位元組數
int len = 0;

while(contentLength > 0 && offset < contentLength && len != -1)
{
try
{
len = this.in.read(this.buffer, 0, this.bufferSize);
}
catch(SocketTimeoutException e)
{
break;
}

if(len != -1)
{
reciveBytes = ArrayUtils.addAll(reciveBytes, ArrayUtils
.subarray(this.buffer, 0, len));
offset = offset + len;
}
}

return reciveBytes;
}


我們發現:這個方法真是不錯!我們每次在讀取資料之前,總是先用available()方法獲取流中當前的最大可讀位元組數,然後再讀。否則,我直接返回。但是為了以防我在讀取資料的時候也出現超時問題導致堵塞,我還是小心的加入了超時的處理,雖然它在絕大部分情況下並不會發生。

好了!現在我們滿懷希望的來呼叫所謂的“完美”解決方案:

public byte[] sendAndUnblockReceive(byte[] bytes) throws IOException
{
this.send(bytes);
return this.unblocReceive();
}


然後,測試,卻發現了一個奇怪的現象:

我們在程式裡面[b]連續呼叫了兩次sendAndUnblockReceive(),並期待每次傳送都會迅速並完整準確的接收它們每次的響應。[/b]但是,沒有效果。事實是:[b]我第一次傳送的請求,並沒有接收到它需要的正確響應。而我們第二次傳送的請求,卻接收到了第一次的響應,還要第二次的響應,這樣兩條資料!!![/b]

這是為什麼呢?因為:

[b]
我們第一次在傳送完資料之後,馬上就呼叫了unblocReceive()。但是由於這次我們呼叫的實在太快了,Server那一端沒來的及處理,甚至沒來的及接收,更不用說響應了。因此unblocReceive()裡面我們用available()方法獲取流中當前的最大可讀位元組數為0!!!因此,當然就不會讀取了!!!而第二次再發送時,第一次的響應剛剛到達,因此,unblocReceive()再被第二次呼叫的時候“盡最大可能”的讀取到了這兩次的響應資訊。
[/b]

唉,看來就沒有更好的方法了麼?或許,還是有的吧!!!

我們先這樣改一下:


public byte[] sendAndUnblockReceive(byte[] bytes) throws IOException
{
this.send(bytes);
// 由於使用了非阻塞接收,為保證在執行read的時候流中恰好有資料,
// 必須給後臺足夠的響應時間
try
{
Thread.sleep(500);
}
catch (InterruptedException e)
{
logger.error("InterruptedException error.", e);
}
return this.unblocReceive();
}


強制的讓執行緒在傳送完後sleep一端時間,半秒鐘,給Server足夠的響應時間,然後再去讀取,或許,這樣比那個blockReceive()的實現要好一點吧。



最後來一下總結:

我們在這個TcpScoket中,在傳送和讀取使用同一執行緒的情況下,使用了三種讀取方式:

[b]一次讀取,阻塞式完整讀取,非阻塞式完整讀取[/b]。

這三種讀取方式的優缺點分析如下:

[b]一次讀取receiveOnce()[/b]: 是最快速,並且在緩衝區足夠大的情況下能夠完整讀取的方法,當然如果沒有設定超時,它仍然用可能存在阻塞。

[b]阻塞式完整讀取blockReceive()[/b]:在返回資料之前總是至少經過一次超時以讀取更多資料,因此在網路狀況足夠好的情況下,速度仍然比較慢。

[b]非阻塞式完整讀取unblocReceive()[/b]: 在嘗試讀取資料之前,首先判斷可以讀取的最大位元組數,如果有資料則嘗試去讀,否則直接返回。所有是這一種不用考慮緩衝區大小,還能兼顧速度的方法。但是如果遠端響應慢的情況下,依然會錯過讀取資料。

綜合上述三中讀取方式:我們可以在確定返回資料量較少,而又要求速度快而準確的情況下,使用receiveOnce()。在返回資料量較多,而又要求速度快而準確的情況下,使用unblocReceive(),不過需要留給遠端足夠的響應時間。在不需要響應速度很快,而需要返回大量資料,而且準確的情況下使用blockReceive()。

現在我們拋開這些,想一想:

我們真的需要這樣三種讀取方式嗎?需要嗎?

我們為什麼這麼羅裡羅嗦使用這三種方式?

因為,[size=x-large][b]我們把傳送資料,和接收資料這兩個功能放在一個執行緒裡執行了!!![/b][/size]

這才是最主要的問題!

因此,:

[size=x-large][b]儘量不要在使用Socket的流的時候,把傳送資料和接收資料的呼叫放在一個執行緒裡。[/b][/size]

因為,網路上的流是不穩定的,因此java在設計流的時候也是儘量去讀取儘可能多的資料,很可能發生堵塞。如果放在一個執行緒裡面,試圖我傳送了就會想當然的又快又準確的接收到,就會像上面的解決方案一樣,用盡招數,仍然束手無策。