1. 程式人生 > 其它 >網路IO-網路模型

網路IO-網路模型

我的理解

網路模型屬於應用的編碼實現,一種正規化,其根基一定是os核心針對tcp/ip協議棧的支援

上層使用的需求推進著底層的支援力度,底層支援方式作用著上層的使用形式

同步IO-應用程式自己去解決資料讀取的過層,應用過程既關注過程,也關注結果

非同步IO-應用程式向核心傳送資料讀取的需求,過程由os操作,應用程式只關注結果

因此IO是同步還是非同步,一定是由核心開放的api決定的,最終應用程式通過系統呼叫決定使用哪種方式,如果核心不支援非同步,應用程式碼寫出花來也沒辦法非同步

阻塞IO-應用程式操作socket的accept、read、write這些過程存在阻塞點

非阻塞IO-應用程式可以不需要阻塞在上面的步驟上

顯而易見,是否阻塞也一定是核心支援的,比如socket.accpet()對應的某個核心實現就是阻塞的,不支援非阻塞,那麼應用程式也只能阻塞

Java中

BIO就是同步阻塞IO

NIO就是同步非阻塞IO

假使現在核心僅僅支援阻塞方式的IO,那麼在編寫應用程式碼時能有什麼選擇,通過需求驅動,增加對網路模型的理解

一 BIO

1 一個執行緒版本

package debug.io.model;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * <p>功能實現就是獲取請求的連線,獲取該連線傳送的訊息,開展業務邏輯</p>
 * <p>設計到的阻塞操作是<tt>accept</tt>和<tt>read</tt></p>
 * <p>在整個輪詢中,假使獲取到了一個<tt>socket</tt>之後,後續的操作整個被阻塞住,並且與此同時有源源不斷的連線進來,但是因為main執行緒一直阻塞,導致請求無法處理</p>
 * @since 2022/5/20
 * @author dingrui
 */
public class BIOModelByMainThread {

    private static final int PORT = 9991;

    public static void main(String[] args) throws IOException {
        // 在os底層做了bind和listen
        ServerSocket server = new ServerSocket(PORT);
        while (true) {
            /**
             * 第1個阻塞點 拿到連線的socket
             */
            Socket socket = server.accept();
            InputStream in = socket.getInputStream();
            BufferedReader reader = new BufferedReader(new InputStreamReader(in));
            while (true) {
                /**
                 * 第2個阻塞點
                 */
                String msg = reader.readLine();
                // TODO: 2022/5/21 業務邏輯
            }
        }
    }
}

這個版本的問題很明顯,因為阻塞導致請求無法被處理,那麼正常情況下可能就會朝著多執行緒的防線發展,為每個請求連線分配一個執行緒

2 多執行緒版本

package debug.io.model;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * <p>多執行緒</p>
 * <p>為每個連線都建立一個執行緒 每個執行緒中的<tt>read</tt>操作是阻塞點<ul>
 *     <li>假使讀操作這樣的一個阻塞近乎於不阻塞,也就是一個執行緒建立後,拿到cpu執行時間片後可以立馬執行,執行完後進行執行緒銷燬</li>
 *     <li>假使讀操作近乎於無限阻塞,就是一個執行緒建立後,一直被阻塞</li>
 * </ul>
 * 上面是兩個極限情況,實際情況即使沒那麼糟糕也明視訊記憶體在的問題就是<ul>
 *     <li>執行緒建立和銷燬都是比較重的os開銷</li>
 *     <li>執行緒建立過多佔用記憶體資源很大</li>
 *     <li>執行緒之間上下文切換佔用os資源</li>
 * </ul>
 * </p>
 * @since 2022/5/21
 * @author dingrui
 */
public class BIOModelMultipleThread {

    private static final int PORT = 9991;

    public static void main(String[] args) throws IOException {
        ServerSocket server = new ServerSocket(PORT);
        while (true) {
            Socket socket = server.accept();
            new Thread(() -> {
                try {
                    InputStream in = socket.getInputStream();
                    BufferedReader reader = new BufferedReader(new InputStreamReader(in));
                    while (true) {
                        String msg = reader.readLine();
                        // TODO: 2022/5/21 業務邏輯
                    }
                } catch (Exception ignored) {
                }
            }).start();
        }
    }
}

有了多執行緒版本的問題後,可能會想著是不是可以用執行緒池實現,解決繁重的執行緒建立和銷燬的問題,讓執行緒得以服用

3 執行緒池版本

package debug.io.model;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.*;

/**
 * <p>多執行緒</p>
 * <p>為每個連線都建立一個執行緒 每個執行緒中的<tt>read</tt>操作是阻塞點<ul>
 *     <li>假使讀操作這樣的一個阻塞近乎於不阻塞,也就是一個執行緒建立後,拿到cpu執行時間片後可以立馬執行,執行完後進行執行緒銷燬</li>
 *     <li>假使讀操作近乎於無限阻塞,就是一個執行緒建立後,一直被阻塞</li>
 * </ul>
 * 上面是兩個極限情況,實際情況即使沒那麼糟糕也明視訊記憶體在的問題就是<ul>
 *     <li>執行緒建立和銷燬都是比較重的os開銷</li>
 *     <li>執行緒建立過多佔用記憶體資源很大</li>
 *     <li>執行緒之間上下文切換佔用os資源</li>
 * </ul>
 * </p>
 * @since 2022/5/21
 * @author dingrui
 */
public class BIOModelByThreadPool {

    private static final int PORT = 9991;

    private static final ExecutorService myTP = new ThreadPoolExecutor(2, 5, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(Integer.MAX_VALUE));

    /**
     * 任務物件
     */
    private static class MyTask implements Runnable {

        private Socket socket;

        public MyTask(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            InputStream in = null;
            try {
                in = this.socket.getInputStream();
                BufferedReader reader = new BufferedReader(new InputStreamReader(in));
                while (true) {
                    String msg = reader.readLine();
                    // TODO: 2022/5/21 業務邏輯
                }
            } catch (Exception ignored) {
            }
        }
    }

    public static void main(String[] args) throws IOException {
        ServerSocket server = new ServerSocket(PORT);
        while (true) {
            Socket socket = server.accept();
            // 封裝成任務丟進執行緒池
            myTP.submit(new MyTask(socket));
        }
    }
}

即使是多執行緒版本,依然存在不容忽視的問題

  1. 執行緒池資源
  2. 任務存在阻塞點,單個任務存在執行效率問題
  3. 任務一直阻塞的話,引發執行緒池任務佇列容量設定問題,更甚觸發執行緒池拒絕策略或者直接OOM

其上,os核心提供的系統呼叫只支援阻塞式,業務程式碼再怎麼寫都是存在著或多或少的問題和弊端,其根本原因就是核心的阻塞呼叫

換言之,想要改變這樣的編碼,就得需要核心做出相應的支援

二 NIO

此時,os核心對某幾個方法做出改變

隨便查詢幾個方法的手冊

1 socket

man 2 socket

NAME
     socket -- create an endpoint for communication

SYNOPSIS
     #include <sys/socket.h>

     int
     socket(int domain, int type, int protocol);

RETURN VALUES
     A -1 is returned if an error occurs, otherwise the return value is a descriptor referencing the socket.

2 bind

man 2 bind

NAME
     bind -- bind a name to a socket

SYNOPSIS
     #include <sys/socket.h>

     int
     bind(int socket, const struct sockaddr *address, socklen_t address_len);

DESCRIPTION
     bind() assigns a name to an unnamed socket.  When a socket is created with socket(2) it exists in a name space
     (address family) but has no name assigned.  bind() requests that address be assigned to the socket.

NOTES
     Binding a name in the UNIX domain creates a socket in the file system that must be deleted by the caller when it is no
     longer needed (using unlink(2)).

     The rules used in name binding vary between communication domains.  Consult the manual entries in section 4 for
     detailed information.

RETURN VALUES
     Upon successful completion, a value of 0 is returned.  Otherwise, a value of -1 is returned and the global integer
     variable errno is set to indicate the error.

3 listen

man 2 listen

LISTEN(2)                   BSD System Calls Manual                  LISTEN(2)

NAME
     listen -- listen for connections on a socket

SYNOPSIS
     #include <sys/socket.h>

     int
     listen(int socket, int backlog);

DESCRIPTION
     Creation of socket-based connections requires several operations.  First, a socket is created with socket(2).  Next, a
     willingness to accept incoming connections and a queue limit for incoming connections are specified with listen().
     Finally, the connections are accepted with accept(2).  The listen() call applies only to sockets of type SOCK_STREAM.

     The backlog parameter defines the maximum length for the queue of pending connections.  If a connection request
     arrives with the queue full, the client may receive an error with an indication of ECONNREFUSED.  Alternatively, if
     the underlying protocol supports retransmission, the request may be ignored so that retries may succeed.

RETURN VALUES
     The listen() function returns the value 0 if successful; otherwise the value -1 is returned and the global variable
     errno is set to indicate the error.

4 accept

man 2 accept

ACCEPT(2)                   BSD System Calls Manual                  ACCEPT(2)

NAME
     accept -- accept a connection on a socket

SYNOPSIS
     #include <sys/socket.h>

     int
     accept(int socket, struct sockaddr *restrict address, socklen_t *restrict address_len);

DESCRIPTION
     The argument socket is a socket that has been created with socket(2), bound to an address with bind(2), and is listen-
     ing for connections after a listen(2).  accept() extracts the first connection request on the queue of pending connec-
     tions, creates a new socket with the same properties of socket, and allocates a new file descriptor for the socket.
     If no pending connections are present on the queue, and the socket is not marked as non-blocking, accept() blocks the
     caller until a connection is present.  If the socket is marked non-blocking and no pending connections are present on
     the queue, accept() returns an error as described below.  The accepted socket may not be used to accept more connec-
     tions.  The original socket socket, remains open.

     The argument address is a result parameter that is filled in with the address of the connecting entity, as known to
     the communications layer.  The exact format of the address parameter is determined by the domain in which the communi-
     cation is occurring.  The address_len is a value-result parameter; it should initially contain the amount of space
     pointed to by address; on return it will contain the actual length (in bytes) of the address returned.  This call is
     used with connection-based socket types, currently with SOCK_STREAM.

     It is possible to select(2) a socket for the purposes of doing an accept() by selecting it for read.

     For certain protocols which require an explicit confirmation, such as ISO or DATAKIT, accept() can be thought of as
     merely dequeuing the next connection request and not implying confirmation.  Confirmation can be implied by a normal
     read or write on the new file descriptor, and rejection can be implied by closing the new socket.

     One can obtain user connection request data without confirming the connection by issuing a recvmsg(2) call with an
     msg_iovlen of 0 and a non-zero msg_controllen, or by issuing a getsockopt(2) request.  Similarly, one can provide user
     connection rejection information by issuing a sendmsg(2) call with providing only the control information, or by call-
     ing setsockopt(2).

RETURN VALUES
     The call returns -1 on error and the global variable errno is set to indicate the error.  If it succeeds, it returns a
     non-negative integer that is a descriptor for the accepted socket.

5 recv

man 2 recv

RECV(2)                     BSD System Calls Manual                    RECV(2)

NAME
     recv, recvfrom, recvmsg -- receive a message from a socket

LIBRARY
     Standard C Library (libc, -lc)

SYNOPSIS
     #include <sys/socket.h>

     ssize_t
     recv(int socket, void *buffer, size_t length, int flags);

     ssize_t
     recvfrom(int socket, void *restrict buffer, size_t length, int flags, struct sockaddr *restrict address,
         socklen_t *restrict address_len);

     ssize_t
     recvmsg(int socket, struct msghdr *message, int flags);
     
RETURN VALUES
     These calls return the number of bytes received, or -1 if an error occurred.

     For TCP sockets, the return value 0 means the peer has closed its half side of the connection.

6 send

man 2 send

SEND(2)                     BSD System Calls Manual                    SEND(2)

NAME
     send, sendmsg, sendto -- send a message from a socket

SYNOPSIS
     #include <sys/socket.h>

     ssize_t
     send(int socket, const void *buffer, size_t length, int flags);

     ssize_t
     sendmsg(int socket, const struct msghdr *message, int flags);

     ssize_t
     sendto(int socket, const void *buffer, size_t length, int flags, const struct sockaddr *dest_addr,
         socklen_t dest_len);
         
RETURN VALUES
     Upon successful completion, the number of bytes which were sent is returned.  Otherwise, -1 is returned and the global
     variable errno is set to indicate the error.

凡此種種,也就意味著在java層面的程式碼最終執行到os sc的時候不需要阻塞等待結果返回,而是一定可以拿到一個有明確語義的返回值,java再根據語義封裝成類物件,使用者程式碼根據一定規則判斷是否獲取到連線物件或者是否可讀可寫之類的

NIO單路版本

package debug.io.model;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.channels.ServerSocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

/**
 * <p>NIO非阻塞下 單路模型</p>
 *
 * <p>逆向理解多路複用 當前連線的獲取不存在阻塞 也就是說可以源源不斷獲取大量的連線 但是連線的讀寫狀態我們並不知道
 * <p>現在有個集合 裡面全是socket<ul>
 *     <li>使用者層可以輪詢挨個向os傳送sc 問它這個socket的狀態 拿到讀寫狀態後進行操作 這個時候發生了一次系統呼叫 向知道整個集合的socket狀態就得發生N次系統呼叫</li>
 *     <li>os提供一個函式 入參是集合 我們一次性將所有socket發給os os告訴使用者這些連線的讀寫狀態 發生一次系統呼叫</li>
 * </ul></p>
 *
 * <p>如上的這種方式就叫多路複用 實現三劍客<ul>
 *     <li>select</li>
 *     <li>poll</li>
 *     <li>epoll</li>
 * </ul></p>
 * @since 2022/5/21
 * @author dingrui
 */
public class NIOModelSingle {

    private static final List<Socket> SOCKETS = new ArrayList<>();

    public static void main(String[] args) throws IOException {
        ServerSocketChannel channel = ServerSocketChannel.open();
        // 非阻塞模式
        channel.configureBlocking(false);
        ServerSocket server = channel.socket();
        server.bind(new InetSocketAddress(9090));
        while (true) {
            Socket socket = server.accept();
            if (Objects.isNull(socket)) continue;
            SOCKETS.add(socket);
            for (Socket s : SOCKETS) {
                InputStream in = s.getInputStream();
                BufferedReader reader = new BufferedReader(new InputStreamReader(in));
                String msg = reader.readLine();
                if(Objects.isNull(msg)) continue;
                // TODO: 2022/5/21 業務邏輯
            }
        }
    }
}

NIO多路複用版本

package debug.io.model;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

/**
 * <p>NIO非阻塞下 多路複用模型</p>
 *
 * @since 2022/5/21
 * @author dingrui
 */
public class NIOModelMultiple {

    private static final List<Socket> SOCKETS = new ArrayList<>();

    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel channel = ServerSocketChannel.open();
        // 非阻塞模式
        channel.configureBlocking(false);
        ServerSocket server = channel.socket();
        server.bind(new InetSocketAddress(9090));
        channel.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            selector.select();
            // 多路複用器
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> it = selectionKeys.iterator();
            while (it.hasNext()) {
                SelectionKey key = it.next();
                it.remove();
                if (key.isAcceptable()) {
                    // TODO: 2022/5/21
                } else if (key.isReadable()) {
                    // TODO: 2022/5/21
                } else if (key.isWritable()) {
                    // TODO: 2022/5/21
                }
            }
        }
    }
}

兩個版本的對比,有了多路複用的加持,同樣的NIO模式,在應用層上的併發顯而易見得到了質的提升

下面是我的推測,還沒研究學習原始碼,留著以後填坑(todo)

我的理解,多路複用僅僅是一種os提供的一種減少系統呼叫的方式,想要真正優雅的使用,還需要對此封裝一個實現,比如上面這個Selector,對於這樣的實現其實就是提供給使用者層一個多路複用器

對於os而言,多路複用就是一個呼叫實現,聽說有3種

  1. select
  2. poll
  3. epoll

三 多路複用

按下不表,還沒學習,日後填坑再來記下筆記(todo)