聊聊dubbo的NettyServer
阿新 • • 發佈:2019-12-31
序
本文主要研究一下dubbo的NettyServer
AbstractServer
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
public abstract class AbstractServer extends AbstractEndpoint implements Server {
protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler" ;
private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);
ExecutorService executor;
private InetSocketAddress localAddress;
private InetSocketAddress bindAddress;
private int accepts;
private int idleTimeout;
public AbstractServer(URL url,ChannelHandler handler) throws RemotingException {
super(url,handler);
local Address = getUrl().toInetSocketAddress();
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY,getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY,getUrl().getPort());
if (url.getParameter(ANYHOST_KEY,false) || NetUtils.isInvalidLocalHost(bind Ip)) {
bindIp = ANYHOST_VALUE;
}
bindAddress = new InetSocketAddress(bindIp,bindPort);
this.accepts = url.getParameter(ACCEPTS_KEY,DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY,DEFAULT_IDLE_TIMEOUT);
try {
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ",export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(),null,"Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ",cause: " + t.getMessage(),t);
}
//fixme replace this with better method
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY,Integer.toString(url.getPort()));
}
protected abstract void doOpen() throws Throwable;
protected abstract void doClose() throws Throwable;
@Override
public void reset(URL url) {
if (url == null) {
return;
}
try {
if (url.hasParameter(ACCEPTS_KEY)) {
int a = url.getParameter(ACCEPTS_KEY,0);
if (a > 0) {
this.accepts = a;
}
}
} catch (Throwable t) {
logger.error(t.getMessage(),t);
}
try {
if (url.hasParameter(IDLE_TIMEOUT_KEY)) {
int t = url.getParameter(IDLE_TIMEOUT_KEY,0);
if (t > 0) {
this.idleTimeout = t;
}
}
} catch (Throwable t) {
logger.error(t.getMessage(),t);
}
try {
if (url.hasParameter(THREADS_KEY)
&& executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
int threads = url.getParameter(THREADS_KEY,0);
int max = threadPoolExecutor.getMaximumPoolSize();
int core = threadPoolExecutor.getCorePoolSize();
if (threads > 0 && (threads != max || threads != core)) {
if (threads < core) {
threadPoolExecutor.setCorePoolSize(threads);
if (core == max) {
threadPoolExecutor.setMaximumPoolSize(threads);
}
} else {
threadPoolExecutor.setMaximumPoolSize(threads);
if (core == max) {
threadPoolExecutor.setCorePoolSize(threads);
}
}
}
}
} catch (Throwable t) {
logger.error(t.getMessage(),t);
}
super.setUrl(getUrl().addParameters(url.getParameters()));
}
@Override
public void send(Object message,boolean sent) throws RemotingException {
Collection<Channel> channels = getChannels();
for (Channel channel : channels) {
if (channel.isConnected()) {
channel.send(message,sent);
}
}
}
@Override
public void close() {
if (logger.isInfoEnabled()) {
logger.info("Close " + getClass().getSimpleName() + " bind " + getBindAddress() + ",export " + getLocalAddress());
}
ExecutorUtil.shutdownNow(executor,100);
try {
super.close();
} catch (Throwable e) {
logger.warn(e.getMessage(),e);
}
try {
doClose();
} catch (Throwable e) {
logger.warn(e.getMessage(),e);
}
}
@Override
public void close(int timeout) {
ExecutorUtil.gracefulShutdown(executor,timeout);
close();
}
@Override
public InetSocketAddress getLocalAddress() {
return localAddress;
}
public InetSocketAddress getBindAddress() {
return bindAddress;
}
public int getAccepts() {
return accepts;
}
public int getIdleTimeout() {
return idleTimeout;
}
@Override
public void connected(Channel ch) throws RemotingException {
// If the server has entered the shutdown process,reject any new connection
if (this.isClosing() || this.isClosed()) {
logger.warn("Close new channel " + ch + ",cause: server is closing or has been closed. For example,receive a new connect request while in shutdown process.");
ch.close();
return;
}
Collection<Channel> channels = getChannels();
if (accepts > 0 && channels.size() > accepts) {
logger.error("Close channel " + ch + ",cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);
ch.close();
return;
}
super.connected(ch);
}
@Override
public void disconnected(Channel ch) throws RemotingException {
Collection<Channel> channels = getChannels();
if (channels.isEmpty()) {
logger.warn("All clients has disconnected from " + ch.getLocalAddress() + ". You can graceful shutdown now.");
}
super.disconnected(ch);
}
}
複製程式碼
- AbstractServer的構造器會從url讀取bindAddress、accepts、idleTimeout,然後執行doOpen方法;close方法會關閉executor,執行父類close方法,然後執行doClose方法;connected方法會先判斷channels是否超出accepts值,超過則直接close;disconnected則執行父類disconnected方法
NettyServer
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java
public class NettyServer extends AbstractServer implements Server {
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
/**
* the cache for alive worker channel.
* <ip:port,dubbo channel>
*/
private Map<String,Channel> channels;
/**
* netty server bootstrap.
*/
private ServerBootstrap bootstrap;
/**
* the boss channel that receive connections and dispatch these to worker channel.
*/
private io.netty.channel.Channel channel;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
public NettyServer(URL url,ChannelHandler handler) throws RemotingException {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
super(url,ChannelHandlers.wrap(handler,ExecutorUtil.setThreadName(url,SERVER_THREAD_POOL_NAME)));
}
/**
* Init and start netty server
*
* @throws Throwable
*/
@Override
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1,new DefaultThreadFactory("NettyServerBoss",true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY,Constants.DEFAULT_IO_THREADS),new DefaultThreadFactory("NettyServerWorker",true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(),this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY,Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR,Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(),getUrl(),NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder",adapter.getDecoder())
.addLast("encoder",adapter.getEncoder())
.addLast("server-idle-handler",new IdleStateHandler(0,idleTimeout,MILLISECONDS))
.addLast("handler",nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
@Override
protected void doClose() throws Throwable {
try {
if (channel != null) {
// unbind.
channel.close();
}
} catch (Throwable e) {
logger.warn(e.getMessage(),e);
}
try {
Collection<org.apache.dubbo.remoting.Channel> channels = getChannels();
if (channels != null && channels.size() > 0) {
for (org.apache.dubbo.remoting.Channel channel : channels) {
try {
channel.close();
} catch (Throwable e) {
logger.warn(e.getMessage(),e);
}
}
}
} catch (Throwable e) {
logger.warn(e.getMessage(),e);
}
try {
if (bootstrap != null) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
} catch (Throwable e) {
logger.warn(e.getMessage(),e);
}
try {
if (channels != null) {
channels.clear();
}
} catch (Throwable e) {
logger.warn(e.getMessage(),e);
}
}
@Override
public Collection<Channel> getChannels() {
Collection<Channel> chs = new HashSet<Channel>();
for (Channel channel : this.channels.values()) {
if (channel.isConnected()) {
chs.add(channel);
} else {
channels.remove(NetUtils.toAddressString(channel.getRemoteAddress()));
}
}
return chs;
}
@Override
public Channel getChannel(InetSocketAddress remoteAddress) {
return channels.get(NetUtils.toAddressString(remoteAddress));
}
@Override
public boolean canHandleIdle() {
return true;
}
@Override
public boolean isBound() {
return channel.isActive();
}
}
複製程式碼
- NettyServer繼承了AbstractServer,其實現了doOpen、doClose方法;doOpen方法會建立netty的ServerBootstrap、bossGroup、workerGroup;doClose方法會關閉channel,關閉bossGroup、workerGroup
NettyTransporter
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyTransporter.java
public class NettyTransporter implements Transporter {
public static final String NAME = "netty";
@Override
public Server bind(URL url,ChannelHandler listener) throws RemotingException {
return new NettyServer(url,listener);
}
@Override
public Client connect(URL url,ChannelHandler listener) throws RemotingException {
return new NettyClient(url,listener);
}
}
複製程式碼
- NettyTransporter實現了Transporter介面,其bind方法建立的是NettyServer
小結
NettyServer繼承了AbstractServer,其實現了doOpen、doClose方法;doOpen方法會建立netty的ServerBootstrap、bossGroup、workerGroup;doClose方法會關閉channel,關閉bossGroup、workerGroup