聊聊dubbo的ExecutionDispatcher
阿新 • • 發佈:2019-12-31
序
本文主要研究一下dubbo的ExecutionDispatcher
ExecutionDispatcher
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionDispatcher.java
public class ExecutionDispatcher implements Dispatcher {
public static final String NAME = "execution";
@Override
public ChannelHandler dispatch(ChannelHandler handler,URL url) {
return new ExecutionChannelHandler(handler,url);
}
}
複製程式碼
- ExecutionDispatcher實現了Dispatcher介面,其dispatch方法返回的是ExecutionChannelHandler
ExecutionChannelHandler
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionChannelHandler.java
public class ExecutionChannelHandler extends WrappedChannelHandler {
public ExecutionChannelHandler(ChannelHandler handler,URL url) {
super(handler,url);
}
@Override
public void received(Channel channel,Object message) throws RemotingException {
ExecutorService executor = getExecutorService();
if (message instanceof Request) {
try {
executor.execute(new ChannelEventRunnable(channel,handler,ChannelState.RECEIVED,message));
} catch (Throwable t) {
// FIXME: when the thread pool is full,SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly,// therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent
// this scenario from happening,but a better solution should be considered later.
if (t instanceof RejectedExecutionException) {
Request request = (Request) message;
if (request.isTwoWay()) {
String msg = "Server side(" + url.getIp() + "," + url.getPort()
+ ") thread pool is exhausted,detail msg:" + t.getMessage();
Response response = new Response(request.getId(),request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message,channel,getClass() + " error when process received event.",t);
}
} else {
handler.received(channel,message);
}
}
}
複製程式碼
- ExecutionChannelHandler繼承了WrappedChannelHandler,其received方法判斷message是否是Request型別,如果是則建立ChannelEventRunnable放到執行緒池裡頭執行,如果不是則直接執行handler.received
PerformanceServerTest
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceServerTest.java
public class PerformanceServerTest {
private static final Logger logger = LoggerFactory.getLogger(PerformanceServerTest.class);
private static ExchangeServer server = null;
private static void restartServer(int times,int alive,int sleep) throws Exception {
if (server != null && !server.isClosed()) {
server.close();
Thread.sleep(100);
}
for (int i = 0; i < times; i++) {
logger.info("restart times:" + i);
server = statServer();
if (alive > 0) Thread.sleep(alive);
server.close();
if (sleep > 0) Thread.sleep(sleep);
}
server = statServer();
}
private static ExchangeServer statServer() throws Exception {
final int port = PerformanceUtils.getIntProperty("port",9911);
final String transporter = PerformanceUtils.getProperty(Constants.TRANSPORTER_KEY,Constants.DEFAULT_TRANSPORTER);
final String serialization = PerformanceUtils.getProperty(Constants.SERIALIZATION_KEY,Constants.DEFAULT_REMOTING_SERIALIZATION);
final String threadpool = PerformanceUtils.getProperty(THREADPOOL_KEY,DEFAULT_THREADPOOL);
final int threads = PerformanceUtils.getIntProperty(THREADS_KEY,DEFAULT_THREADS);
final int iothreads = PerformanceUtils.getIntProperty(IO_THREADS_KEY,Constants.DEFAULT_IO_THREADS);
final int buffer = PerformanceUtils.getIntProperty(BUFFER_KEY,DEFAULT_BUFFER_SIZE);
final String channelHandler = PerformanceUtils.getProperty(Constants.DISPATCHER_KEY,ExecutionDispatcher.NAME);
// Start server
ExchangeServer server = Exchangers.bind("exchange://0.0.0.0:" + port + "?transporter="
+ transporter + "&serialization="
+ serialization + "&threadpool=" + threadpool
+ "&threads=" + threads + "&iothreads=" + iothreads + "&buffer=" + buffer + "&channel.handler=" + channelHandler,new ExchangeHandlerAdapter() {
public String telnet(Channel channel,String message) throws RemotingException {
return "echo: " + message + "\r\ntelnet> ";
}
public CompletableFuture<Object> reply(ExchangeChannel channel,Object request) throws RemotingException {
if ("environment".equals(request)) {
return CompletableFuture.completedFuture(PerformanceUtils.getEnvironment());
}
if ("scene".equals(request)) {
List<String> scene = new ArrayList<String>();
scene.add("Transporter: " + transporter);
scene.add("Service Threads: " + threads);
return CompletableFuture.completedFuture(scene);
}
return CompletableFuture.completedFuture(request);
}
});
return server;
}
private static ExchangeServer statTelnetServer(int port) throws Exception {
// Start server
ExchangeServer telnetserver = Exchangers.bind("exchange://0.0.0.0:" + port,String message) throws RemotingException {
if (message.equals("help")) {
return "support cmd: \r\n\tstart \r\n\tstop \r\n\tshutdown \r\n\trestart times [alive] [sleep] \r\ntelnet>";
} else if (message.equals("stop")) {
logger.info("server closed:" + server);
server.close();
return "stop server\r\ntelnet>";
} else if (message.startsWith("start")) {
try {
restartServer(0,0);
} catch (Exception e) {
e.printStackTrace();
}
return "start server\r\ntelnet>";
} else if (message.startsWith("shutdown")) {
System.exit(0);
return "start server\r\ntelnet>";
} else if (message.startsWith("channels")) {
return "server.getExchangeChannels():" + server.getExchangeChannels().size() + "\r\ntelnet>";
} else if (message.startsWith("restart ")) { //r times [sleep] r 10 or r 10 100
String[] args = message.split(" ");
int times = Integer.parseInt(args[1]);
int alive = args.length > 2 ? Integer.parseInt(args[2]) : 0;
int sleep = args.length > 3 ? Integer.parseInt(args[3]) : 100;
try {
restartServer(times,alive,sleep);
} catch (Exception e) {
e.printStackTrace();
}
return "restart server,times:" + times + " stop alive time: " + alive + ",sleep time: " + sleep + " usage:r times [alive] [sleep] \r\ntelnet>";
} else {
return "echo: " + message + "\r\ntelnet> ";
}
}
});
return telnetserver;
}
@Test
public void testServer() throws Exception {
// Read port from property
if (PerformanceUtils.getProperty("port",null) == null) {
logger.warn("Please set -Dport=9911");
return;
}
final int port = PerformanceUtils.getIntProperty("port",9911);
final boolean telnet = PerformanceUtils.getBooleanProperty("telnet",true);
if (telnet) statTelnetServer(port + 1);
server = statServer();
synchronized (PerformanceServerTest.class) {
while (true) {
try {
PerformanceServerTest.class.wait();
} catch (InterruptedException e) {
}
}
}
}
}
複製程式碼
- PerformanceServerTest的statServer方法使用PerformanceUtils.getProperty(Constants.DISPATCHER_KEY,ExecutionDispatcher.NAME)獲取channelHandler,找不到則使用ExecutionDispatcher.NAME
小結
ExecutionChannelHandler繼承了WrappedChannelHandler,其received方法判斷message是否是Request型別,如果是則建立ChannelEventRunnable放到執行緒池裡頭執行,如果不是則直接執行handler.received