1. 程式人生 > 實用技巧 >大多數人可能都不會使用socketTimeout,看了底層才知道一直都做錯了

大多數人可能都不會使用socketTimeout,看了底層才知道一直都做錯了

  前幾天一個機房網路抖動,引發了很多對外請求的超時問題,在發生問題排查日誌的時候,發現了這麼一個現象,httpclient我們的請求超時時間並沒有按照我們的設定報超時異常

我們的大概配置如下:

RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(1000)
.setSocketTimeout(2000)
.setConnectionRequestTimeout(1000)
.build();

但實際卻發現很多請求超時時間都達到了10幾秒甚至有的二十幾秒,大大超過了我們的預期時間,決定通過跟蹤原始碼一探究竟:

原來http讀取網路資料的時候是其實是使用的BufferedReader類,而我們知道java的io類其實都是對基本輸入流的裝飾,其底層其實是利用的SocketInputStream來讀取資料,一路程式碼跟蹤,我們跟蹤到了這個方法

int read(byte b[], int off, int length, int timeout) throws IOException {
int n = 0; // EOF already encountered
if (eof) {
return -1;
} // connection reset
if (impl.isConnectionReset()) {
throw new SocketException("Connection reset");
} // bounds check
if (length <= 0 || off < 0 || off + length > b.length) {
if (length == 0) {
return 0;
}
throw new ArrayIndexOutOfBoundsException();
} boolean gotReset = false; Object traceContext = IoTrace.socketReadBegin();
// acquire file descriptor and do the read
FileDescriptor fd = impl.acquireFD();
try {
n = socketRead0(fd, b, off, length, timeout);
if (n > 0) {
return n;
}
} catch (ConnectionResetException rstExc) {
gotReset = true;
} finally {
impl.releaseFD();
IoTrace.socketReadEnd(traceContext, impl.address, impl.port,
timeout, n > 0 ? n : 0);
} /*
* We receive a "connection reset" but there may be bytes still
* buffered on the socket
*/
if (gotReset) {
traceContext = IoTrace.socketReadBegin();
impl.setConnectionResetPending();
impl.acquireFD();
try {
n = socketRead0(fd, b, off, length, timeout);
if (n > 0) {
return n;
}
} catch (ConnectionResetException rstExc) {
} finally {
impl.releaseFD();
IoTrace.socketReadEnd(traceContext, impl.address, impl.port,
timeout, n > 0 ? n : 0);
}
} /*
* If we get here we are at EOF, the socket has been closed,
* or the connection has been reset.
*/
if (impl.isClosedOrPending()) {
throw new SocketException("Socket closed");
}
if (impl.isConnectionResetPending()) {
impl.setConnectionReset();
}
if (impl.isConnectionReset()) {
throw new SocketException("Connection reset");
}
eof = true;
return -1;
}

這個方法的核心其實就是socketRead0(fd, b, off, length, timeout)這個方法的呼叫,而這個方法是這樣的:

private native int socketRead0(FileDescriptor fd,
byte b[], int off, int len,
int timeout)
throws IOException;

這個是native方法,通過下載openjdk1.8原始碼,我們在openjdk\jdk\src\solaris\native\java\net的目錄下找到了相關實現,在SocketInputStream.c檔案裡,程式碼如下:

Java_java_net_SocketInputStream_socketRead0(JNIEnv *env, jobject this,
jobject fdObj, jbyteArray data,
jint off, jint len, jint timeout)
{
char BUF[MAX_BUFFER_LEN];
char *bufP;
jint fd, nread; if (IS_NULL(fdObj)) {
/* shouldn't this be a NullPointerException? -br */
JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException",
"Socket closed");
return -;
} else {
fd = (*env)->GetIntField(env, fdObj, IO_fd_fdID);
/* Bug 4086704 - If the Socket associated with this file descriptor
* was closed (sysCloseFD), then the file descriptor is set to -1.
*/
if (fd == -) {
JNU_ThrowByName(env, "java/net/SocketException", "Socket closed");
return -;
}
} /*
* If the read is greater than our stack allocated buffer then
* we allocate from the heap (up to a limit)
*/
if (len > MAX_BUFFER_LEN) {
if (len > MAX_HEAP_BUFFER_LEN) {
len = MAX_HEAP_BUFFER_LEN;
}
bufP = (char *)malloc((size_t)len);
if (bufP == NULL) {
bufP = BUF;
len = MAX_BUFFER_LEN;
}
} else {
bufP = BUF;
} if (timeout) {
nread = NET_Timeout(fd, timeout);
if (nread <= ) {
if (nread == ) {
JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException",
"Read timed out");
} else if (nread == JVM_IO_ERR) {
if (errno == EBADF) {
JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException", "Socket closed");
} else if (errno == ENOMEM) {
JNU_ThrowOutOfMemoryError(env, "NET_Timeout native heap allocation failed");
} else {
NET_ThrowByNameWithLastError(env, JNU_JAVANETPKG "SocketException",
"select/poll failed");
}
} else if (nread == JVM_IO_INTR) {
JNU_ThrowByName(env, JNU_JAVAIOPKG "InterruptedIOException",
"Operation interrupted");
}
if (bufP != BUF) {
free(bufP);
}
return -;
}
} nread = NET_Read(fd, bufP, len); if (nread <= ) {
if (nread < ) { switch (errno) {
case ECONNRESET:
case EPIPE:
JNU_ThrowByName(env, "sun/net/ConnectionResetException",
"Connection reset");
break; case EBADF:
JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException",
"Socket closed");
break; case EINTR:
JNU_ThrowByName(env, JNU_JAVAIOPKG "InterruptedIOException",
"Operation interrupted");
break; default:
NET_ThrowByNameWithLastError(env,
JNU_JAVANETPKG "SocketException", "Read failed");
}
}
} else {
(*env)->SetByteArrayRegion(env, data, off, nread, (jbyte *)bufP);
} if (bufP != BUF) {
free(bufP);
}
return nread;
}

通過程式碼我們可以知道,資料的讀取是通過NET_Timeout (fd, timeout)來實現的,我們繼續跟蹤程式碼,在linux_close.c檔案中,發現了NET_Timeout的實現:

int NET_Timeout(int s, long timeout) {
long prevtime = , newtime;
struct timeval t;
fdEntry_t *fdEntry = getFdEntry(s); /*
* Check that fd hasn't been closed.
*/
if (fdEntry == NULL) {
errno = EBADF;
return -;
} /*
* Pick up current time as may need to adjust timeout
*/
if (timeout > ) {
gettimeofday(&t, NULL);
prevtime = t.tv_sec * + t.tv_usec / ;
} for(;;) {
struct pollfd pfd;
int rv;
threadEntry_t self; /*
* Poll the fd. If interrupted by our wakeup signal
* errno will be set to EBADF.
*/
pfd.fd = s;
pfd.events = POLLIN | POLLERR; startOp(fdEntry, &self);
rv = poll(&pfd, , timeout);
endOp(fdEntry, &self); /*
* If interrupted then adjust timeout. If timeout
* has expired return 0 (indicating timeout expired).
*/
if (rv < && errno == EINTR) {
if (timeout > ) {
gettimeofday(&t, NULL);
newtime = t.tv_sec * + t.tv_usec / ;
timeout -= newtime - prevtime;
if (timeout <= ) {
return ;
}
prevtime = newtime;
}
} else {
return rv;
} }
}

程式碼中的關鍵點在 poll(&pfd, 1, timeout);poll是linux中的字元裝置驅動中的一個函式,作用是把當前的檔案指標掛到裝置內部定義的等待

這樣就很好理解了,其實這個時間是我兩次讀取資料之間的最長阻塞時間,如果我在網路抖動的情況下,我每次2秒之內返回一部分資料,這樣我就一直不會超時了,為了驗證我們的理解寫了test,程式碼如下,一個controller,用來接受http請求:

@org.springframework.stereotype.Controller
@RequestMapping("/hello")
public class Controller {
@RequestMapping("/test")
public void tets(HttpServletRequest request ,HttpServletResponse response) throws IOException, InterruptedException {
System.out.println("I'm coming");
PrintWriter writer = response.getWriter();
while (true){
writer.print("ha ha ha");
writer.flush();
Thread.sleep(2000);
System.out.println("I'm ha ha ha");
}
}
}

這個程式碼就是每隔2s傳送一條資料,迴圈傳送,模擬網路不好的時候,收到的資料斷斷續續,再來一個test用來傳送請求:

 @Test
public void tetsHttpClientHttp() throws IOException {
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(1000)
.setSocketTimeout(3000)
.setConnectionRequestTimeout(1000)
.build(); CloseableHttpClient httpClient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build();
// 建立Get請求
HttpGet httpGet = new HttpGet("http://127.0.0.1:8080/hello/test");
CloseableHttpResponse response =httpClient.execute(httpGet);
HttpEntity responseEntity = response.getEntity();
if (responseEntity != null) {
System.out.println("響應內容為:" + EntityUtils.toString(responseEntity));
}
}

服務端結果如下:

客戶端結果如下:

程式並沒有如期丟擲異常,和我們預想的一樣,而當我們修改socketTimeout為1000時,經驗證可以丟擲java.net.SocketTimeoutException: Read timed out 異常

為此,為了更準確控制時間,我們需要自己實現超時機制:

ExecutorService executor = Executors.newFixedThreadPool(1);
Callable<String> callable = new Callable<String>() {
@Override
public String call() throws Exception {
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(1000)
.setSocketTimeout(3000)
.setConnectionRequestTimeout(1000)
.build(); CloseableHttpClient httpClient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build();
// 建立Get請求
HttpGet httpGet = new HttpGet("http://127.0.0.1:8080/hello/test");
CloseableHttpResponse response =httpClient.execute(httpGet);
HttpEntity responseEntity = response.getEntity();
return EntityUtils.toString(responseEntity);
}
};
Future<String> future = executor.submit(callable);
System.out.print(future.get(5,TimeUnit.SECONDS));

這樣就可以避免這種情況,在請求執行緒超時時丟擲java.util.concurrent.TimeoutException避免長時間佔住業務執行緒影響我們的服務,當然這只是個例子,現實我們可能還要考慮執行緒數,拒絕策略等情況。