1. 程式人生 > >Android okhttp3 進行socket connect&poll的底層實現跟蹤

Android okhttp3 進行socket connect&poll的底層實現跟蹤

《Android okhttp3 建立Socket的底層實現追蹤》可知,okhttp3在okhttp3.internal.io.RealConnection#connectSocket中通過rawSocket.setSoTimeout(readTimeout)呼叫java.net.Socket#checkOpenAndCreate建立了socket,底層是在/bionic/libc/arch-arm/syscalls/socket.S彙編程式碼中呼叫系統呼叫實現的,其中rawSocket是PlainSocketImpl.java的例項。

下面跟蹤socket connect的底層實現
1.okhttp3.internal.io.RealConnection#connectSocket

try {
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
      throw new ConnectException("Failed to connect to " + route.socketAddress());
    }

2.
okhttp3.internal#connectSocket

public void connectSocket(Socket socket, InetSocketAddress address,
      int
connectTimeout) throws IOException { socket.connect(address, connectTimeout); }

/libcore/luni/src/main/java/java/net/Socket.java#connect

    public void connect(SocketAddress remoteAddr, int timeout) throws IOException {
        。。。。
        synchronized (connectLock) {
            try {
                。。。。
                impl.connect(remoteAddr, timeout);
                isConnected = true
; cacheLocalAddress(); } catch (IOException e) { impl.close(); throw e; } } }

3./libcore/luni/src/main/java/java/net/PlainSocketImpl.java

protected void connect(SocketAddress remoteAddr, int timeout) throws IOException {
        InetSocketAddress inetAddr = (InetSocketAddress) remoteAddr;
        connect(inetAddr.getAddress(), inetAddr.getPort(), timeout);
}

private void connect(InetAddress anAddr, int aPort, int timeout) throws IOException {
        InetAddress normalAddr = anAddr.isAnyLocalAddress() ? InetAddress.getLocalHost() : anAddr;
        if (streaming && usingSocks()) {
            socksConnect(anAddr, aPort, 0);
        } else {
            IoBridge.connect(fd, normalAddr, aPort, timeout);
        }
        super.address = normalAddr;
        super.port = aPort;
}

4./libcore/luni/src/main/java/libcore/io/IoBridge.java

    public static boolean connect(FileDescriptor fd, InetAddress inetAddress, int port, int timeoutMs) throws SocketException, SocketTimeoutException {
        try {
            return connectErrno(fd, inetAddress, port, timeoutMs);
        } catch (ErrnoException errnoException) {
            throw new ConnectException(connectDetail(inetAddress, port, timeoutMs, errnoException), errnoException);
        } catch (SocketException ex) {
            throw ex; // We don't want to doubly wrap these.
        } catch (SocketTimeoutException ex) {
            throw ex; // We don't want to doubly wrap these.
        } catch (IOException ex) {
            throw new SocketException(ex);
        }
    }

5./libcore/luni/src/main/java/libcore/io/IoBridge.java#connectErrno

    private static boolean connectErrno(FileDescriptor fd, InetAddress inetAddress, int port, int timeoutMs) throws ErrnoException, IOException {
        // With no timeout, just call connect(2) directly.
        if (timeoutMs == 0) {
            Libcore.os.connect(fd, inetAddress, port);
            return true;
        }

        // For connect with a timeout, we:
        //   1. set the socket to non-blocking,
        //   2. connect(2),
        //   3. loop using poll(2) to decide whether we're connected, whether we should keep
        //      waiting, or whether we've seen a permanent failure and should give up,
        //   4. set the socket back to blocking.

        // 1. set the socket to non-blocking.
        IoUtils.setBlocking(fd, false);

        // 2. call connect(2) non-blocking.
        long finishTimeMs = System.currentTimeMillis() + timeoutMs;
        try {
            Libcore.os.connect(fd, inetAddress, port);
            IoUtils.setBlocking(fd, true); // 4. set the socket back to blocking.
            return true; // We connected immediately.
        } catch (ErrnoException errnoException) {
            if (errnoException.errno != EINPROGRESS) {
                throw errnoException;
            }
            // EINPROGRESS means we should keep trying...
        }

        // 3. loop using poll(2).
        int remainingTimeoutMs;
        do {
            remainingTimeoutMs = (int) (finishTimeMs - System.currentTimeMillis());
            if (remainingTimeoutMs <= 0) {
                throw new SocketTimeoutException(connectDetail(inetAddress, port, timeoutMs, null));
            }
        } while (!IoBridge.isConnected(fd, inetAddress, port, timeoutMs, remainingTimeoutMs));
        IoUtils.setBlocking(fd, true); // 4. set the socket back to blocking.
        return true; // Or we'd have thrown.
    }

connectErrno實現了connect+poll的邏輯,註釋已經說明的很清楚了。
有timeout的時候,進行非阻塞connect,然後用poll進行事件輪詢直到timeout

6.
/libcore/luni/src/main/java/libcore/io/BlockGuardOs.java

public void connect(FileDescriptor fd, InetAddress address, int port) throws ErrnoException, SocketException {
        BlockGuard.getThreadPolicy().onNetwork();
        os.connect(fd, address, port);
    }

/libcore/luni/src/main/java/libcore/io/ForwardingOs.java

public void connect(FileDescriptor fd, InetAddress address, int port) throws ErrnoException, SocketException { os.connect(fd, address, port); }

7./libcore/luni/src/main/java/libcore/io/Posix.java

public native void connect(FileDescriptor fd, InetAddress address, int port) throws ErrnoException, SocketException;

8./libcore/luni/src/main/native/libcore_io_Posix.cpp

static void Posix_connect(JNIEnv* env, jobject, jobject javaFd, jobject javaAddress, jint port) {
    sockaddr_storage ss;
    socklen_t sa_len;
    if (!inetAddressToSockaddr(env, javaAddress, port, ss, sa_len)) {
        return;
    }
    const sockaddr* sa = reinterpret_cast<const sockaddr*>(&ss);
    // We don't need the return value because we'll already have thrown.
    (void) NET_FAILURE_RETRY(env, int, connect, javaFd, sa, sa_len);
}

#define NET_FAILURE_RETRY(jni_env, return_type, syscall_name, java_fd, ...) ({ \
    return_type _rc = -1; \
    do { \
        { \
            int _fd = jniGetFDFromFileDescriptor(jni_env, java_fd); \
            AsynchronousSocketCloseMonitor _monitor(_fd); \
            _rc = syscall_name(_fd, __VA_ARGS__); \
        } \
        if (_rc == -1) { \
            if (jniGetFDFromFileDescriptor(jni_env, java_fd) == -1) { \
                jniThrowException(jni_env, "java/net/SocketException", "Socket closed"); \
                break; \
            } else if (errno != EINTR) { \
                /* TODO: with a format string we could show the arguments too, like strace(1). */ \
                throwErrnoException(jni_env, # syscall_name); \
                break; \
            } \
        } \
    } while (_rc == -1); \
    _rc; })

9./bionic/libc/arch-arm/syscalls/connect.S

ENTRY(connect)
    mov     ip, r7
    ldr     r7, =__NR_connect
    swi     #0
    mov     r7, ip
    cmn     r0, #(MAX_ERRNO + 1)
    bxls    lr
    neg     r0, r0
    b       __set_errno
END(connect)

可以看出,最終socket的connect也是在彙編程式碼中呼叫系統呼叫實現的。
接著看下poll的實現。

10./libcore/luni/src/main/java/libcore/io/IoBridge.java#isConnected

    public static boolean isConnected(FileDescriptor fd, InetAddress inetAddress, int port, int timeoutMs, int remainingTimeoutMs) throws IOException {
        ErrnoException cause;
        try {
            StructPollfd[] pollFds = new StructPollfd[] { new StructPollfd() };
            pollFds[0].fd = fd;
            pollFds[0].events = (short) POLLOUT;
            int rc = Libcore.os.poll(pollFds, remainingTimeoutMs);
            if (rc == 0) {
                return false; // Timeout.
            }
            int connectError = Libcore.os.getsockoptInt(fd, SOL_SOCKET, SO_ERROR);
            if (connectError == 0) {
                return true; // Success!
            }
            throw new ErrnoException("isConnected", connectError); // The connect(2) failed.
        } catch (ErrnoException errnoException) {
            if (!fd.valid()) {
                throw new SocketException("Socket closed");
            }
            if (errnoException.errno == EINTR) {
                return false; // Punt and ask the caller to try again.
            } else {
                cause = errnoException;
            }
        }
        String detail = connectDetail(inetAddress, port, timeoutMs, cause);
        if (cause.errno == ETIMEDOUT) {
            throw new SocketTimeoutException(detail, cause);
        }
        throw new ConnectException(detail, cause);
    }

通過poll監控fd可寫(POLLOUT)這個事件

11.
/libcore/luni/src/main/java/libcore/io/BlockGuardOs.java

public int poll(StructPollfd[] fds, int timeoutMs) throws ErrnoException {
        // Greater than 0 is a timeout in milliseconds and -1 means "block forever",
        // but 0 means "poll and return immediately", which shouldn't be subject to BlockGuard.
        if (timeoutMs != 0) {
            BlockGuard.getThreadPolicy().onNetwork();
        }
        return os.poll(fds, timeoutMs);
    }

/libcore/luni/src/main/java/libcore/io/ForwardingOs.java

public int poll(StructPollfd[] fds, int timeoutMs) throws ErrnoException { return os.poll(fds, timeoutMs); }

12./libcore/luni/src/main/java/libcore/io/Posix.java

public native int poll(StructPollfd[] fds, int timeoutMs) throws ErrnoException;

13./libcore/luni/src/main/native/libcore_io_Posix.cpp

static jint Posix_poll(JNIEnv* env, jobject, jobjectArray javaStructs, jint timeoutMs) {
    static jfieldID fdFid = env->GetFieldID(JniConstants::structPollfdClass, "fd", "Ljava/io/FileDescriptor;");
    static jfieldID eventsFid = env->GetFieldID(JniConstants::structPollfdClass, "events", "S");
    static jfieldID reventsFid = env->GetFieldID(JniConstants::structPollfdClass, "revents", "S");

    // Turn the Java libcore.io.StructPollfd[] into a C++ struct pollfd[].
    size_t arrayLength = env->GetArrayLength(javaStructs);
    UniquePtr<struct pollfd[]> fds(new struct pollfd[arrayLength]);
    memset(fds.get(), 0, sizeof(struct pollfd) * arrayLength);
    size_t count = 0; // Some trailing array elements may be irrelevant. (See below.)
    for (size_t i = 0; i < arrayLength; ++i) {
        ScopedLocalRef<jobject> javaStruct(env, env->GetObjectArrayElement(javaStructs, i));
        if (javaStruct.get() == NULL) {
            break; // We allow trailing nulls in the array for caller convenience.
        }
        ScopedLocalRef<jobject> javaFd(env, env->GetObjectField(javaStruct.get(), fdFid));
        if (javaFd.get() == NULL) {
            break; // We also allow callers to just clear the fd field (this is what Selector does).
        }
        fds[count].fd = jniGetFDFromFileDescriptor(env, javaFd.get());
        fds[count].events = env->GetShortField(javaStruct.get(), eventsFid);
        ++count;
    }

    // Since we don't know which fds -- if any -- are sockets, be conservative and register
    // all fds for asynchronous socket close monitoring.
    std::vector<AsynchronousSocketCloseMonitor*> monitors;
    for (size_t i = 0; i < count; ++i) {
        monitors.push_back(new AsynchronousSocketCloseMonitor(fds[i].fd));
    }
    int rc = poll(fds.get(), count, timeoutMs);
    for (size_t i = 0; i < monitors.size(); ++i) {
        delete monitors[i];
    }
    if (rc == -1) {
        throwErrnoException(env, "poll");
        return -1;
    }

    // Update the revents fields in the Java libcore.io.StructPollfd[].
    for (size_t i = 0; i < count; ++i) {
        ScopedLocalRef<jobject> javaStruct(env, env->GetObjectArrayElement(javaStructs, i));
        if (javaStruct.get() == NULL) {
            return -1;
        }
        env->SetShortField(javaStruct.get(), reventsFid, fds[i].revents);
    }
    return rc;
}

14./bionic/libc/arch-arm/syscalls/poll.S

ENTRY(poll)
    mov     ip, r7
    ldr     r7, =__NR_poll
    swi     #0
    mov     r7, ip
    cmn     r0, #(MAX_ERRNO + 1)
    bxls    lr
    neg     r0, r0
    b       __set_errno
END(poll)

跟connect一樣,最後也是彙編實現系統呼叫