Android okhttp3 進行socket connect&poll的底層實現跟蹤
由《Android okhttp3 建立Socket的底層實現追蹤》可知,okhttp3在okhttp3.internal.io.RealConnection#connectSocket中通過rawSocket.setSoTimeout(readTimeout)呼叫java.net.Socket#checkOpenAndCreate建立了socket,底層是在/bionic/libc/arch-arm/syscalls/socket.S彙編程式碼中呼叫系統呼叫實現的,其中rawSocket是PlainSocketImpl.java的例項。
下面跟蹤socket connect的底層實現
1.okhttp3.internal.io.RealConnection#connectSocket
try {
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
throw new ConnectException("Failed to connect to " + route.socketAddress());
}
2.
okhttp3.internal#connectSocket
public void connectSocket(Socket socket, InetSocketAddress address,
int connectTimeout) throws IOException {
socket.connect(address, connectTimeout);
}
/libcore/luni/src/main/java/java/net/Socket.java#connect
public void connect(SocketAddress remoteAddr, int timeout) throws IOException {
。。。。
synchronized (connectLock) {
try {
。。。。
impl.connect(remoteAddr, timeout);
isConnected = true ;
cacheLocalAddress();
} catch (IOException e) {
impl.close();
throw e;
}
}
}
3./libcore/luni/src/main/java/java/net/PlainSocketImpl.java
protected void connect(SocketAddress remoteAddr, int timeout) throws IOException {
InetSocketAddress inetAddr = (InetSocketAddress) remoteAddr;
connect(inetAddr.getAddress(), inetAddr.getPort(), timeout);
}
private void connect(InetAddress anAddr, int aPort, int timeout) throws IOException {
InetAddress normalAddr = anAddr.isAnyLocalAddress() ? InetAddress.getLocalHost() : anAddr;
if (streaming && usingSocks()) {
socksConnect(anAddr, aPort, 0);
} else {
IoBridge.connect(fd, normalAddr, aPort, timeout);
}
super.address = normalAddr;
super.port = aPort;
}
4./libcore/luni/src/main/java/libcore/io/IoBridge.java
public static boolean connect(FileDescriptor fd, InetAddress inetAddress, int port, int timeoutMs) throws SocketException, SocketTimeoutException {
try {
return connectErrno(fd, inetAddress, port, timeoutMs);
} catch (ErrnoException errnoException) {
throw new ConnectException(connectDetail(inetAddress, port, timeoutMs, errnoException), errnoException);
} catch (SocketException ex) {
throw ex; // We don't want to doubly wrap these.
} catch (SocketTimeoutException ex) {
throw ex; // We don't want to doubly wrap these.
} catch (IOException ex) {
throw new SocketException(ex);
}
}
5./libcore/luni/src/main/java/libcore/io/IoBridge.java#connectErrno
private static boolean connectErrno(FileDescriptor fd, InetAddress inetAddress, int port, int timeoutMs) throws ErrnoException, IOException {
// With no timeout, just call connect(2) directly.
if (timeoutMs == 0) {
Libcore.os.connect(fd, inetAddress, port);
return true;
}
// For connect with a timeout, we:
// 1. set the socket to non-blocking,
// 2. connect(2),
// 3. loop using poll(2) to decide whether we're connected, whether we should keep
// waiting, or whether we've seen a permanent failure and should give up,
// 4. set the socket back to blocking.
// 1. set the socket to non-blocking.
IoUtils.setBlocking(fd, false);
// 2. call connect(2) non-blocking.
long finishTimeMs = System.currentTimeMillis() + timeoutMs;
try {
Libcore.os.connect(fd, inetAddress, port);
IoUtils.setBlocking(fd, true); // 4. set the socket back to blocking.
return true; // We connected immediately.
} catch (ErrnoException errnoException) {
if (errnoException.errno != EINPROGRESS) {
throw errnoException;
}
// EINPROGRESS means we should keep trying...
}
// 3. loop using poll(2).
int remainingTimeoutMs;
do {
remainingTimeoutMs = (int) (finishTimeMs - System.currentTimeMillis());
if (remainingTimeoutMs <= 0) {
throw new SocketTimeoutException(connectDetail(inetAddress, port, timeoutMs, null));
}
} while (!IoBridge.isConnected(fd, inetAddress, port, timeoutMs, remainingTimeoutMs));
IoUtils.setBlocking(fd, true); // 4. set the socket back to blocking.
return true; // Or we'd have thrown.
}
connectErrno實現了connect+poll的邏輯,註釋已經說明的很清楚了。
有timeout的時候,進行非阻塞connect,然後用poll進行事件輪詢直到timeout
6.
/libcore/luni/src/main/java/libcore/io/BlockGuardOs.java
public void connect(FileDescriptor fd, InetAddress address, int port) throws ErrnoException, SocketException {
BlockGuard.getThreadPolicy().onNetwork();
os.connect(fd, address, port);
}
/libcore/luni/src/main/java/libcore/io/ForwardingOs.java
public void connect(FileDescriptor fd, InetAddress address, int port) throws ErrnoException, SocketException { os.connect(fd, address, port); }
7./libcore/luni/src/main/java/libcore/io/Posix.java
public native void connect(FileDescriptor fd, InetAddress address, int port) throws ErrnoException, SocketException;
8./libcore/luni/src/main/native/libcore_io_Posix.cpp
static void Posix_connect(JNIEnv* env, jobject, jobject javaFd, jobject javaAddress, jint port) {
sockaddr_storage ss;
socklen_t sa_len;
if (!inetAddressToSockaddr(env, javaAddress, port, ss, sa_len)) {
return;
}
const sockaddr* sa = reinterpret_cast<const sockaddr*>(&ss);
// We don't need the return value because we'll already have thrown.
(void) NET_FAILURE_RETRY(env, int, connect, javaFd, sa, sa_len);
}
#define NET_FAILURE_RETRY(jni_env, return_type, syscall_name, java_fd, ...) ({ \
return_type _rc = -1; \
do { \
{ \
int _fd = jniGetFDFromFileDescriptor(jni_env, java_fd); \
AsynchronousSocketCloseMonitor _monitor(_fd); \
_rc = syscall_name(_fd, __VA_ARGS__); \
} \
if (_rc == -1) { \
if (jniGetFDFromFileDescriptor(jni_env, java_fd) == -1) { \
jniThrowException(jni_env, "java/net/SocketException", "Socket closed"); \
break; \
} else if (errno != EINTR) { \
/* TODO: with a format string we could show the arguments too, like strace(1). */ \
throwErrnoException(jni_env, # syscall_name); \
break; \
} \
} \
} while (_rc == -1); \
_rc; })
9./bionic/libc/arch-arm/syscalls/connect.S
ENTRY(connect)
mov ip, r7
ldr r7, =__NR_connect
swi #0
mov r7, ip
cmn r0, #(MAX_ERRNO + 1)
bxls lr
neg r0, r0
b __set_errno
END(connect)
可以看出,最終socket的connect也是在彙編程式碼中呼叫系統呼叫實現的。
接著看下poll的實現。
10./libcore/luni/src/main/java/libcore/io/IoBridge.java#isConnected
public static boolean isConnected(FileDescriptor fd, InetAddress inetAddress, int port, int timeoutMs, int remainingTimeoutMs) throws IOException {
ErrnoException cause;
try {
StructPollfd[] pollFds = new StructPollfd[] { new StructPollfd() };
pollFds[0].fd = fd;
pollFds[0].events = (short) POLLOUT;
int rc = Libcore.os.poll(pollFds, remainingTimeoutMs);
if (rc == 0) {
return false; // Timeout.
}
int connectError = Libcore.os.getsockoptInt(fd, SOL_SOCKET, SO_ERROR);
if (connectError == 0) {
return true; // Success!
}
throw new ErrnoException("isConnected", connectError); // The connect(2) failed.
} catch (ErrnoException errnoException) {
if (!fd.valid()) {
throw new SocketException("Socket closed");
}
if (errnoException.errno == EINTR) {
return false; // Punt and ask the caller to try again.
} else {
cause = errnoException;
}
}
String detail = connectDetail(inetAddress, port, timeoutMs, cause);
if (cause.errno == ETIMEDOUT) {
throw new SocketTimeoutException(detail, cause);
}
throw new ConnectException(detail, cause);
}
通過poll監控fd可寫(POLLOUT)這個事件
11.
/libcore/luni/src/main/java/libcore/io/BlockGuardOs.java
public int poll(StructPollfd[] fds, int timeoutMs) throws ErrnoException {
// Greater than 0 is a timeout in milliseconds and -1 means "block forever",
// but 0 means "poll and return immediately", which shouldn't be subject to BlockGuard.
if (timeoutMs != 0) {
BlockGuard.getThreadPolicy().onNetwork();
}
return os.poll(fds, timeoutMs);
}
/libcore/luni/src/main/java/libcore/io/ForwardingOs.java
public int poll(StructPollfd[] fds, int timeoutMs) throws ErrnoException { return os.poll(fds, timeoutMs); }
12./libcore/luni/src/main/java/libcore/io/Posix.java
public native int poll(StructPollfd[] fds, int timeoutMs) throws ErrnoException;
13./libcore/luni/src/main/native/libcore_io_Posix.cpp
static jint Posix_poll(JNIEnv* env, jobject, jobjectArray javaStructs, jint timeoutMs) {
static jfieldID fdFid = env->GetFieldID(JniConstants::structPollfdClass, "fd", "Ljava/io/FileDescriptor;");
static jfieldID eventsFid = env->GetFieldID(JniConstants::structPollfdClass, "events", "S");
static jfieldID reventsFid = env->GetFieldID(JniConstants::structPollfdClass, "revents", "S");
// Turn the Java libcore.io.StructPollfd[] into a C++ struct pollfd[].
size_t arrayLength = env->GetArrayLength(javaStructs);
UniquePtr<struct pollfd[]> fds(new struct pollfd[arrayLength]);
memset(fds.get(), 0, sizeof(struct pollfd) * arrayLength);
size_t count = 0; // Some trailing array elements may be irrelevant. (See below.)
for (size_t i = 0; i < arrayLength; ++i) {
ScopedLocalRef<jobject> javaStruct(env, env->GetObjectArrayElement(javaStructs, i));
if (javaStruct.get() == NULL) {
break; // We allow trailing nulls in the array for caller convenience.
}
ScopedLocalRef<jobject> javaFd(env, env->GetObjectField(javaStruct.get(), fdFid));
if (javaFd.get() == NULL) {
break; // We also allow callers to just clear the fd field (this is what Selector does).
}
fds[count].fd = jniGetFDFromFileDescriptor(env, javaFd.get());
fds[count].events = env->GetShortField(javaStruct.get(), eventsFid);
++count;
}
// Since we don't know which fds -- if any -- are sockets, be conservative and register
// all fds for asynchronous socket close monitoring.
std::vector<AsynchronousSocketCloseMonitor*> monitors;
for (size_t i = 0; i < count; ++i) {
monitors.push_back(new AsynchronousSocketCloseMonitor(fds[i].fd));
}
int rc = poll(fds.get(), count, timeoutMs);
for (size_t i = 0; i < monitors.size(); ++i) {
delete monitors[i];
}
if (rc == -1) {
throwErrnoException(env, "poll");
return -1;
}
// Update the revents fields in the Java libcore.io.StructPollfd[].
for (size_t i = 0; i < count; ++i) {
ScopedLocalRef<jobject> javaStruct(env, env->GetObjectArrayElement(javaStructs, i));
if (javaStruct.get() == NULL) {
return -1;
}
env->SetShortField(javaStruct.get(), reventsFid, fds[i].revents);
}
return rc;
}
14./bionic/libc/arch-arm/syscalls/poll.S
ENTRY(poll)
mov ip, r7
ldr r7, =__NR_poll
swi #0
mov r7, ip
cmn r0, #(MAX_ERRNO + 1)
bxls lr
neg r0, r0
b __set_errno
END(poll)
跟connect一樣,最後也是彙編實現系統呼叫