1. 程式人生 > 程式設計 >聊聊dubbo的NettyServer

聊聊dubbo的NettyServer

本文主要研究一下dubbo的NettyServer

AbstractServer

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java

public abstract class AbstractServer extends AbstractEndpoint implements Server {

    protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler"
; private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class); ExecutorService executor; private InetSocketAddress localAddress; private InetSocketAddress bindAddress; private int accepts; private int idleTimeout; public AbstractServer(URL url,ChannelHandler handler) throws RemotingException { super(url,handler); local
Address = getUrl().toInetSocketAddress(); String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY,getUrl().getHost()); int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY,getUrl().getPort()); if (url.getParameter(ANYHOST_KEY,false) || NetUtils.isInvalidLocalHost(bind
Ip)) { bindIp = ANYHOST_VALUE; } bindAddress = new InetSocketAddress(bindIp,bindPort); this.accepts = url.getParameter(ACCEPTS_KEY,DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY,DEFAULT_IDLE_TIMEOUT); try { doOpen(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ",export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(),null,"Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ",cause: " + t.getMessage(),t); } //fixme replace this with better method DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY,Integer.toString(url.getPort())); } protected abstract void doOpen() throws Throwable; protected abstract void doClose() throws Throwable; @Override public void reset(URL url) { if (url == null) { return; } try { if (url.hasParameter(ACCEPTS_KEY)) { int a = url.getParameter(ACCEPTS_KEY,0); if (a > 0) { this.accepts = a; } } } catch (Throwable t) { logger.error(t.getMessage(),t); } try { if (url.hasParameter(IDLE_TIMEOUT_KEY)) { int t = url.getParameter(IDLE_TIMEOUT_KEY,0); if (t > 0) { this.idleTimeout = t; } } } catch (Throwable t) { logger.error(t.getMessage(),t); } try { if (url.hasParameter(THREADS_KEY) && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) { ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; int threads = url.getParameter(THREADS_KEY,0); int max = threadPoolExecutor.getMaximumPoolSize(); int core = threadPoolExecutor.getCorePoolSize(); if (threads > 0 && (threads != max || threads != core)) { if (threads < core) { threadPoolExecutor.setCorePoolSize(threads); if (core == max) { threadPoolExecutor.setMaximumPoolSize(threads); } } else { threadPoolExecutor.setMaximumPoolSize(threads); if (core == max) { threadPoolExecutor.setCorePoolSize(threads); } } } } } catch (Throwable t) { logger.error(t.getMessage(),t); } super.setUrl(getUrl().addParameters(url.getParameters())); } @Override public void send(Object message,boolean sent) throws RemotingException { Collection<Channel> channels = getChannels(); for (Channel channel : channels) { if (channel.isConnected()) { channel.send(message,sent); } } } @Override public void close() { if (logger.isInfoEnabled()) { logger.info("Close " + getClass().getSimpleName() + " bind " + getBindAddress() + ",export " + getLocalAddress()); } ExecutorUtil.shutdownNow(executor,100); try { super.close(); } catch (Throwable e) { logger.warn(e.getMessage(),e); } try { doClose(); } catch (Throwable e) { logger.warn(e.getMessage(),e); } } @Override public void close(int timeout) { ExecutorUtil.gracefulShutdown(executor,timeout); close(); } @Override public InetSocketAddress getLocalAddress() { return localAddress; } public InetSocketAddress getBindAddress() { return bindAddress; } public int getAccepts() { return accepts; } public int getIdleTimeout() { return idleTimeout; } @Override public void connected(Channel ch) throws RemotingException { // If the server has entered the shutdown process,reject any new connection if (this.isClosing() || this.isClosed()) { logger.warn("Close new channel " + ch + ",cause: server is closing or has been closed. For example,receive a new connect request while in shutdown process."); ch.close(); return; } Collection<Channel> channels = getChannels(); if (accepts > 0 && channels.size() > accepts) { logger.error("Close channel " + ch + ",cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts); ch.close(); return; } super.connected(ch); } @Override public void disconnected(Channel ch) throws RemotingException { Collection<Channel> channels = getChannels(); if (channels.isEmpty()) { logger.warn("All clients has disconnected from " + ch.getLocalAddress() + ". You can graceful shutdown now."); } super.disconnected(ch); } } 複製程式碼
  • AbstractServer的構造器會從url讀取bindAddress、accepts、idleTimeout,然後執行doOpen方法;close方法會關閉executor,執行父類close方法,然後執行doClose方法;connected方法會先判斷channels是否超出accepts值,超過則直接close;disconnected則執行父類disconnected方法

NettyServer

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java

public class NettyServer extends AbstractServer implements Server {

    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
    /**
     * the cache for alive worker channel.
     * <ip:port,dubbo channel>
     */
    private Map<String,Channel> channels;
    /**
     * netty server bootstrap.
     */
    private ServerBootstrap bootstrap;
    /**
     * the boss channel that receive connections and dispatch these to worker channel.
     */
	private io.netty.channel.Channel channel;

    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;

    public NettyServer(URL url,ChannelHandler handler) throws RemotingException {
        // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
        // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
        super(url,ChannelHandlers.wrap(handler,ExecutorUtil.setThreadName(url,SERVER_THREAD_POOL_NAME)));
    }

    /**
     * Init and start netty server
     *
     * @throws Throwable
     */
    @Override
    protected void doOpen() throws Throwable {
        bootstrap = new ServerBootstrap();

        bossGroup = new NioEventLoopGroup(1,new DefaultThreadFactory("NettyServerBoss",true));
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY,Constants.DEFAULT_IO_THREADS),new DefaultThreadFactory("NettyServerWorker",true));

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(),this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup,workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY,Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR,Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        // FIXME: should we use getTimeout()?
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(),getUrl(),NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder",adapter.getDecoder())
                                .addLast("encoder",adapter.getEncoder())
                                .addLast("server-idle-handler",new IdleStateHandler(0,idleTimeout,MILLISECONDS))
                                .addLast("handler",nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

    @Override
    protected void doClose() throws Throwable {
        try {
            if (channel != null) {
                // unbind.
                channel.close();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(),e);
        }
        try {
            Collection<org.apache.dubbo.remoting.Channel> channels = getChannels();
            if (channels != null && channels.size() > 0) {
                for (org.apache.dubbo.remoting.Channel channel : channels) {
                    try {
                        channel.close();
                    } catch (Throwable e) {
                        logger.warn(e.getMessage(),e);
                    }
                }
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(),e);
        }
        try {
            if (bootstrap != null) {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(),e);
        }
        try {
            if (channels != null) {
                channels.clear();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(),e);
        }
    }

    @Override
    public Collection<Channel> getChannels() {
        Collection<Channel> chs = new HashSet<Channel>();
        for (Channel channel : this.channels.values()) {
            if (channel.isConnected()) {
                chs.add(channel);
            } else {
                channels.remove(NetUtils.toAddressString(channel.getRemoteAddress()));
            }
        }
        return chs;
    }

    @Override
    public Channel getChannel(InetSocketAddress remoteAddress) {
        return channels.get(NetUtils.toAddressString(remoteAddress));
    }

    @Override
    public boolean canHandleIdle() {
        return true;
    }

    @Override
    public boolean isBound() {
        return channel.isActive();
    }

}
複製程式碼
  • NettyServer繼承了AbstractServer,其實現了doOpen、doClose方法;doOpen方法會建立netty的ServerBootstrap、bossGroup、workerGroup;doClose方法會關閉channel,關閉bossGroup、workerGroup

NettyTransporter

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyTransporter.java

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    @Override
    public Server bind(URL url,ChannelHandler listener) throws RemotingException {
        return new NettyServer(url,listener);
    }

    @Override
    public Client connect(URL url,ChannelHandler listener) throws RemotingException {
        return new NettyClient(url,listener);
    }

}
複製程式碼
  • NettyTransporter實現了Transporter介面,其bind方法建立的是NettyServer

小結

NettyServer繼承了AbstractServer,其實現了doOpen、doClose方法;doOpen方法會建立netty的ServerBootstrap、bossGroup、workerGroup;doClose方法會關閉channel,關閉bossGroup、workerGroup

doc