1. 程式人生 > 實用技巧 >java的nio 之 select,poll和epoll

java的nio 之 select,poll和epoll

我不生產知識,我只是知識的搬運工。努力通過實踐與各位博友交流一些自己的見解。

引文:

  由於cpu和磁碟等儲存裝置的處理速度的差異,巧妙的io設計能夠極大的提升工作效率。從硬體設計角度包括 SPOOLING(假離線)技術(實現獨佔裝置的共享),DMA(通過中斷的方式實現記憶體到磁碟的傳輸通道)大大降低了io傳輸到cpu的呼叫和阻塞,通道IO(有自己的指令和程式,相比DMA有更強的獨立處理資料能力。並且可以控制多臺同類或不同類的裝置)。————來自王道考研作業系統

  總結:硬體實現cpu與磁碟的儘可能獨立執行,磁碟讀取儘可能少的通過中斷程式來獲取cpu的執行權。

  解決了單個IO的CPU和磁碟獨立執行,我們來看下多個IO連線時,作業系統如何優化? 也就是多個io連結如何管理的問題。IO作為計算機的核心功能,使用者只能通過系統呼叫實現使用者態到核心態的切換來讀寫磁碟資料。傳統io 每建立一個io連結就要新建一個執行緒阻塞在當前操作中,在IO密集型任務中會大大降低CPU的利用率。通過IO複用監聽多個檔案描述符來提升程式的效能。

  注意:IO複用雖然能同時監聽多個檔案描述符,但它本身是阻塞的。並且當多個檔案描述符同時就緒時,如果不採取額外的措施,程式就只能按順序依次處理其中的每一個檔案描述符,這使得程式看起來就像是序列工作。如果要實現併發,只能使用多執行緒和多程序等程式設計手段。————來自Linux高效能伺服器程式設計

理論知識:

select,poll和epoll的區別
系統呼叫 select poll  epoll
事件集合  使用者通過3個引數分別傳入感興趣的可讀,可寫和異常等事件,核心通過對這些引數的線上修改來反饋其中的就緒事件。這使得使用者每次呼叫select都要重置這3個引數 統一處理所有事件型別,因此只需一個事件集引數。使用者通過pollfd.events傳入感興趣的事件,核心通過修改polld.revents反饋其中就緒的事件 核心通過一個時間表直接管理使用者感興趣的所有事件。因此每次呼叫epoll_wait時,無需反覆傳入使用者感興趣的事件。epoll_wait系統呼叫的引數events僅用來反饋就緒的事件
應用程式索引就緒檔案描述符的事件複雜度 O(n) O(n) O(1)
最大支援檔案描述符數   有最大值限制(核心預設值為1024) 65535 65535
工作模式 LT LT 支援ET高效模式
核心實現和工作效率 採用輪詢方法來檢測就緒事件,演算法事件複雜度為O(n) 採用輪詢方法來檢測就緒事件,演算法事件複雜度為O(n) 採用回撥方法來檢測就緒事件,演算法事件複雜度為O(n)

系統呼叫API的演進路線:select————》poll(事件處理統一,程式設計介面更簡潔。不需要重置引數)————》epoll(使用回撥替換輪詢機制,降低事件複雜度為O(1))

注意:當活動連結比較多的時候,epoll_wait的效率未必比selelct和poll高,因為此時回撥函數出發得過於頻繁。所有epoll_wait適用於連線數量多,但活動連結少的情況。

實踐程式碼:

聊天室程式:

服務端:

/**
 * @author: ljf
 * @date: 2020/12/29 19:25
 * @description: 聊天室服務端
 * 功能:
 * 1.資料轉發broadcast
 * 功能推導類屬性:
 * 1.userNames
 * 2.userThreads
 * 3.port 監聽埠
 * <p>
 * accept阻塞監聽客戶端的連結,將連結新增到userNames,userThreads並啟動使用者執行緒接收資料
 * 服務端的userThread只用來轉發資料,read and write
 * TODO:多個客戶端同時退出的併發問題, 客戶端正常段開時給伺服器能夠檢測到。同時給其他客戶端發信息
 * socket全雙工通訊
 * @modified By:
 * @version: $ 1.0
 */
public class ChatServer {
    private final int port;
    private final HashSet<String> userNames = new HashSet<>();
    private final HashSet<UserThread> userThreads = new HashSet<>();
    public AtomicInteger connectedCount = new AtomicInteger(0);

    public ChatServer(int port) {
        this.port = port;
    }

    public void execute() {
        //監聽埠
        try (ServerSocket serverSocket = new ServerSocket(port)) {
            System.out.println("chat server listening on port:" + port);
            while (true) {
                //建立服務端程序,accept阻塞監聽
                Socket clientSocket = serverSocket.accept();

                UserThread newUser = new UserThread(clientSocket, this);
                userThreads.add(newUser);
                newUser.start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        int port = 12345;
        ChatServer chatServer = new ChatServer(port);
        chatServer.execute();
    }

    public void broadMessage(String serverMessage, UserThread userThread) {
        for (UserThread user : userThreads) {
            if (user != null && user != userThread) { //除當前使用者
                user.sendMessage(serverMessage);
            }
        }
    }

    public Set<String> getUserNames() {
        return this.userNames;
    }

    /**
     * 刪除使用者,刪除userName和userThread
     */
    public void removeUser(String user, UserThread userThread) {
        boolean removed = userNames.remove(user);
        if (removed) {
            userThreads.remove(userThread);
            System.out.println(user + " quit group chat");
        }
    }

    public boolean hasUsers() {
        return !this.userNames.isEmpty();
    }

    public void addUserName(String userName) {
        this.userNames.add(userName);
    }
}
View Code

服務端每建立一個socket連結,就建立一個使用者執行緒:

import java.io.*;
import java.net.Socket;

/**
 * @author: ljf
 * @date: 2020/12/29 19:34
 * @description: 服務端socket執行緒,用來向客戶端轉發訊息
 * @modified By:
 * @version: $ 1.0
 */
public class UserThread extends Thread {
    private final Socket clientSocket;
    private PrintWriter printWriter;
    private String userName; //客戶端建立時輸入使用者名稱,網路傳輸獲取
    private final ChatServer chatServer;

    public UserThread(Socket clientSocket, ChatServer chatServer) {
        this.clientSocket = clientSocket;
        this.chatServer = chatServer;
    }

    public String getUserName() {
        return this.userName;
    }

    @Override
    public void run() {
        String serverMessage;
        try {
            OutputStream outputStream = clientSocket.getOutputStream();
            printWriter = new PrintWriter(outputStream,true);

            //阻塞監聽客戶端發來的訊息,然後轉發給其他客戶端
            InputStream inputStream = clientSocket.getInputStream();
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));

            printUsers();
            //首條訊息是 客戶端姓名
            userName = bufferedReader.readLine();
            chatServer.addUserName(userName);
            System.out.println("connectedCount: " + chatServer.connectedCount.getAndIncrement()
                    + " new user connected: " + userName);

            serverMessage = "new user " + userName + " connected";
            chatServer.broadMessage(serverMessage, this);

            //read 阻塞,直到客戶端發來"bye"訊息,斷開連線
            String clientMessage;
            while (!(clientMessage = bufferedReader.readLine()).equals("bye")) {
                //拼接上當前socket的使用者,轉發給其他使用者
                serverMessage = "[" + userName + "]: " + clientMessage;
                chatServer.broadMessage(serverMessage, this);
            }

        } catch (IOException e) {
//            e.printStackTrace();
            System.err.println(e.getMessage());
        } finally {
            //與客戶端socket斷開連線
            chatServer.removeUser(userName, this);
            if(clientSocket!=null && clientSocket.isConnected()){
                try {
                    clientSocket.shutdownOutput();//立即關閉輸出流
                    clientSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            //轉發我離開的訊息
            serverMessage = userName + " has quited";
            chatServer.broadMessage(serverMessage, this);
        }
    }

    /**
     * 向新連結的客戶端傳送當前伺服器的使用者列表
     */
    public void printUsers() {
        if (chatServer.hasUsers()) {
            printWriter.println("connected users: " + chatServer.getUserNames());
        } else {
            printWriter.println("no other users connected");
        }
    }

    /**
     * 向客戶端傳送訊息
     *
     * @param message:訊息內容
     */
    public void sendMessage(String message) {
        printWriter.println(message);
    }
}
View Code

客戶端:

public class ChatClient {
    private volatile String userName;
    private final String hostName;
    private final int port;
    private volatile boolean closed = false;

    public ChatClient(String hostName,int port){
        this.hostName = hostName;
        this.port = port;
    }

    public static void main(String[] args) {
        String hostName = "localhost";
        int port =  12345;

        ChatClient chatClient = new ChatClient(hostName, port);
        chatClient.execute();
    }
    /**
     * 與服務端建立連線
     */
    public void execute(){
        try {
            //必須輸入使用者名稱字後才能建立socket
            Scanner scanner = new Scanner(System.in);
            System.out.print("\nEnter your name: ");
            userName = scanner.nextLine();

            Socket clientSocket = new Socket(hostName, port);
            System.out.println("connected to chat server");
            new ReadThread(clientSocket,this).start();
            new WriteThread(clientSocket,this).start();

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void setClosed(){
        closed = true;
    }
    public boolean isClosed(){
        return closed;
    }

    public String getUserName() {
        return userName;
    }
}

客戶端socket讀執行緒

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Socket;
import java.net.SocketException;

/**
 * @author: ljf
 * @date: 2020/12/29 20:55
 * @description:
 * @modified By:
 * @version: $ 1.0
 */
public class ReadThread extends Thread{
    private final ChatClient chatClient;
    private final Socket clientSocket;

    public ReadThread(Socket clientSocket,ChatClient chatClient){
        this.chatClient = chatClient;
        this.clientSocket = clientSocket;
    }

    @Override
    public void run() {
        while (!chatClient.isClosed()) {
            try {
                InputStream inputStream = clientSocket.getInputStream();
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
                String response = bufferedReader.readLine();
                System.out.println("\n" + response);

                // prints the username after displaying the server's message
                if (chatClient.getUserName() != null) {
                    System.out.print("[" + chatClient.getUserName() + "]: ");
                }
            } catch (SocketException se){ //TODO:正常退出替代這裡
                System.out.println("quit");
                System.err.println(se.getMessage());
                break;
            } catch (IOException ex) {
                System.out.println("Error reading from server: " + ex.getMessage());
                ex.printStackTrace();
                break;
            }
        }
    }
}

客戶端socket寫執行緒

import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Scanner;

/**
 * @author: ljf
 * @date: 2020/12/29 20:59
 * @description:
 * @modified By:
 * @version: $ 1.0
 */
public class WriteThread extends Thread {
    private final ChatClient chatClient;
    private final Socket clientSocket;
    private PrintWriter printWriter;

    public WriteThread(Socket clientSocket, ChatClient chatClient) {
        this.chatClient = chatClient;
        this.clientSocket = clientSocket;

        try {
            OutputStream outputStream = clientSocket.getOutputStream();
            printWriter = new PrintWriter(outputStream,true);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        Scanner scanner = new Scanner(System.in);

        String userName = chatClient.getUserName();
        printWriter.println(userName);

        String text;
//        System.out.print("[" + userName + "]: ");
        while (!(text = scanner.nextLine()).equals("bye")) {
            printWriter.println(text);
            System.out.print("[" + userName + "]: ");
        }

        try {
            printWriter.println(text);
            clientSocket.close();
        } catch (IOException ex) {

            System.out.println("Error writing to server: " + ex.getMessage());
        }
    }
}

TODO:當前是用傳統io流實現,後期加入nio,零拷貝。