1. 程式人生 > 其它 >Vert.X 介紹

Vert.X 介紹

Vert.X簡介

Vert.X 是一個基於JVM的響應式應用/工具包(Reactive applications)。Vert.X是事件驅動,非阻塞純非同步IO : 極少執行緒資源處理大量併發請求,高併發系統的優選

官網上Vert.X呈現的3大特點:1. 節約資源/高併發處理優秀 2. 非同步程式設計變得簡單 3. 靈活,它不是一個框架,你可以只用它的某些模組之類

事件驅動


事件驅動 - 框架,軟體設計模式/模型。與傳統的函式呼叫區別:傳統函式呼叫一定要有一個呼叫者並輸入引數最後得到結果。事件驅動沒有呼叫者,只關心響應(如何處理),它有一個事件收集器,誰產生了不是很重要,也不知道,收集到了這個事件就響應。事件驅動的核心是事件,關注的是響應(我關心的事件發生瞭如何處理)。經常使用在I/O框架裡面,可以很好的實現I/O複用。

Reactor模式

在Web服務中,處理Web請求通常有兩種體系結構,一個是基於執行緒的架構(Thread-based architecture),一個是事件驅動模型(Event-driven architecture)

基於執行緒

缺點:

  • 執行緒本身佔用記憶體
  • 建立銷燬執行緒需要代價(執行緒池)
  • 作業系統執行緒切換的開銷
  • 執行緒中處理I/O,等待過程造成CPU浪費

事件驅動

Reactor單執行緒

單執行緒:針對I/O的accept(),read(),write(),connect()是一個執行緒完成的,把非I/O業務邏輯從Reactor執行緒摘除,加速Reactor執行緒響應I/O請求

Reactor工作者執行緒池

與單執行緒的唯一區別是非I/O業務邏輯使用了一個工作執行緒池

Reactor 多執行緒

把Reactor拆分成mainReactor和subReactor。mainReactor(一個執行緒)負責監聽server socket, 處理網路新連線,將建立的socketChannel指定註冊給subReactor。subReactor維護自己的selector, 基於mainReactor 註冊的socketChannel多路分離I/O讀寫事件,讀寫網路資料,通常使用多執行緒

  • mainReactor對應Netty中配置的BossGroup執行緒組,主要負責接受客戶端連線的建立。一般只暴露一個服務埠,BossGroup執行緒組一般一個執行緒工作即可
  • subReactor對應Netty中配置的WorkerGroup執行緒組,BossGroup執行緒組接受並建立完客戶端的連線後,將網路socket轉交給WorkerGroup執行緒組,然後在WorkerGroup執行緒組內選擇一個執行緒,進行I/O的處理。WorkerGroup執行緒組主要處理I/O,一般設定2*CPU核數個執行緒
package reactor.pattern;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * An Actor class which accepts the incoming connections
 */
public class Acceptor {

    private ServerSocketChannel serverSocketChannel;
    private Reactor[] reactors;

    private Selector selector;
    private int noOfReactors;

    public Acceptor(String host, int port, int noOfWorkerThreads) {
        try {
            selector = Selector.open();
            reactors = new Reactor[noOfWorkerThreads];
            this.noOfReactors = noOfWorkerThreads;
            for (int i = 0; i < noOfWorkerThreads; i++) {
                reactors[i] = new Reactor();
                Thread thread = new Thread(reactors[i]);
                thread.start();
            }

            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(host, 9000));
            serverSocketChannel.configureBlocking(false);

            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        } catch (IOException e) {
            //handle exceptions
        }
    }

    public void start() throws IOException {

        int i = 0;

        while (true) {

            int readyChannels = selector.select();
            if (readyChannels == 0)
                continue;

            Set<SelectionKey> selectedKeys = selector.selectedKeys();

            Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

            while (keyIterator.hasNext()) {

                SelectionKey key = keyIterator.next();

                if (key.isAcceptable()) {

                    ServerSocketChannel serverSocket = (ServerSocketChannel) key.channel();
                    SocketChannel socket = serverSocket.accept();
                    reactors[i % noOfReactors].addChannel(socket);
                    i++;

                }

                keyIterator.remove();
            }
        }
    }

    public static void main(String[] args) {
        Acceptor acceptor = new Acceptor("localhost",9000,4);
        try {
            acceptor.start();
        } catch (IOException e) {
           // e.printStackTrace();
        }
    }
}
package reactor.pattern;

import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Reactor implements Runnable {

    private Queue queue = new ConcurrentLinkedQueue<SocketChannel>();

    private Selector selector;

    public Reactor() {
        try {
            this.selector = Selector.open();
        } catch (IOException e) {
            // e.printStackTrace();
        }
    }

    public void addChannel(SocketChannel socketChannel) {
        queue.add(socketChannel);
    }

    @Override
    public void run() {

        while (true) {

            try {
                SocketChannel socketChannel = (SocketChannel) queue.poll();
                if (socketChannel == null)
                    continue;

                socketChannel.configureBlocking(false);
                socketChannel.register(selector, SelectionKey.OP_READ);

                int readyChannels = selector.select();

                if (readyChannels == 0)
                    continue;

                Set<SelectionKey> selectedKeys = selector.selectedKeys();

                Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

                while (keyIterator.hasNext()) {

                    SelectionKey key = keyIterator.next();

                    if (key.isReadable()) {
                        // Read the channel

                    }

                    keyIterator.remove();
                }
            } catch (IOException e) {
                //handle IOExceptions
            }
        }
    }
}

參考: