聊聊nacos的TcpSuperSenseProcessor
阿新 • • 發佈:2019-12-31
序
本文主要研究一下nacos的TcpSuperSenseProcessor
TcpSuperSenseProcessor
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/TcpSuperSenseProcessor.java
@Component
public class TcpSuperSenseProcessor implements HealthCheckProcessor,Runnable {
@Autowired
private HealthCheckCommon healthCheckCommon;
@Autowired
private SwitchDomain switchDomain;
public static final int CONNECT_TIMEOUT_MS = 500;
private Map<String,BeatKey> keyMap = new ConcurrentHashMap<>();
private BlockingQueue<Beat> taskQueue = new LinkedBlockingQueue<Beat>();
/**
* this value has been carefully tuned,do not modify unless you're confident
*/
private static final int NIO_THREAD_COUNT = Runtime.getRuntime().availableProcessors() <= 1 ?
1 : Runtime.getRuntime().availableProcessors() / 2;
/**
* because some hosts doesn't support keep-alive connections,disabled temporarily
*/
private static final long TCP_KEEP_ALIVE_MILLIS = 0;
private static ScheduledExecutorService TCP_CHECK_EXECUTOR
= new ScheduledThreadPoolExecutor(1,new ThreadFactory () {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("nacos.naming.tcp.check.worker");
t.setDaemon(true);
return t;
}
});
private static ScheduledExecutorService NIO_EXECUTOR
= Executors.newScheduledThreadPool(NIO_THREAD_COUNT,new ThreadFactory () {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("nacos.supersense.checker");
return thread;
}
}
);
private Selector selector;
public TcpSuperSenseProcessor() {
try {
selector = Selector.open();
TCP_CHECK_EXECUTOR.submit(this);
} catch (Exception e) {
throw new IllegalStateException("Error while initializing SuperSense(TM).");
}
}
@Override
public void process(HealthCheckTask task) {
List<Instance> ips = task.getCluster().allIPs(false);
if (CollectionUtils.isEmpty(ips)) {
return;
}
for (Instance ip : ips) {
if (ip.isMarked()) {
if (SRV_LOG.isDebugEnabled()) {
SRV_LOG.debug("tcp check,ip is marked as to skip health check,ip:" + ip.getIp());
}
continue;
}
if (!ip.markChecking()) {
SRV_LOG.warn("tcp check started before last one finished,service: "
+ task.getCluster().getService().getName() + ":"
+ task.getCluster().getName() + ":"
+ ip.getIp() + ":"
+ ip.getPort());
healthCheckCommon.reEvaluateCheckRT(task.getCheckRTNormalized() * 2,task,switchDomain.getTcpHealthParams());
continue;
}
Beat beat = new Beat(ip,task);
taskQueue.add(beat);
MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet();
}
}
private void processTask() throws Exception {
Collection<Callable<Void>> tasks = new LinkedList<>();
do {
Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2,TimeUnit.MILLISECONDS);
if (beat == null) {
return;
}
tasks.add(new TaskProcessor(beat));
} while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64);
for (Future<?> f : NIO_EXECUTOR.invokeAll(tasks)) {
f.get();
}
}
@Override
public void run() {
while (true) {
try {
processTask();
int readyCount = selector.selectNow();
if (readyCount <= 0) {
continue;
}
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
NIO_EXECUTOR.execute(new PostProcessor(key));
}
} catch (Throwable e) {
SRV_LOG.error("[HEALTH-CHECK] error while processing NIO task",e);
}
}
}
//......
@Override
public String getType() {
return "TCP";
}
}
複製程式碼
- TcpSuperSenseProcessor實現了HealthCheckProcessor、Runnable介面
- 其process方法會遍歷instances,對於非markChecking的會執行healthCheckCommon.reEvaluateCheckRT,對於marked的直接跳過,對於markChecking的會建立Beat新增到taskQueue
- 其構造器會往TCP_CHECK_EXECUTOR註冊自己的Runnable,其run方法不斷執行processTask方法,然後從selector中select key然後建立PostProcessor提交給NIO_EXECUTOR;processTask方法會從taskQueue取出Beat,然後建立TaskProcessor新增到tasks,當tasks大小達到一定值則使用NIO_EXECUTOR.invokeAll(tasks)批量非同步執行
PostProcessor
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/TcpSuperSenseProcessor.java
public class PostProcessor implements Runnable {
SelectionKey key;
public PostProcessor(SelectionKey key) {
this.key = key;
}
@Override
public void run() {
Beat beat = (Beat) key.attachment();
SocketChannel channel = (SocketChannel) key.channel();
try {
if (!beat.isHealthy()) {
//invalid beat means this server is no longer responsible for the current service
key.cancel();
key.channel().close();
beat.finishCheck();
return;
}
if (key.isValid() && key.isConnectable()) {
//connected
channel.finishConnect();
beat.finishCheck(true,false,System.currentTimeMillis() - beat.getTask().getStartTime(),"tcp:ok+");
}
if (key.isValid() && key.isReadable()) {
//disconnected
ByteBuffer buffer = ByteBuffer.allocate(128);
if (channel.read(buffer) == -1) {
key.cancel();
key.channel().close();
} else {
// not terminate request,ignore
}
}
} catch (ConnectException e) {
// unable to connect,possibly port not opened
beat.finishCheck(false,true,switchDomain.getTcpHealthParams().getMax(),"tcp:unable2connect:" + e.getMessage());
} catch (Exception e) {
beat.finishCheck(false,"tcp:error:" + e.getMessage());
try {
key.cancel();
key.channel().close();
} catch (Exception ignore) {
}
}
}
}
複製程式碼
- PostProcessor實現了Runnable介面,其run方法主要是執行beat.finishCheck
TaskProcessor
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/TcpSuperSenseProcessor.java
private class TaskProcessor implements Callable<Void> {
private static final int MAX_WAIT_TIME_MILLISECONDS = 500;
Beat beat;
public TaskProcessor(Beat beat) {
this.beat = beat;
}
@Override
public Void call() {
long waited = System.currentTimeMillis() - beat.getStartTime();
if (waited > MAX_WAIT_TIME_MILLISECONDS) {
Loggers.SRV_LOG.warn("beat task waited too long: " + waited + "ms");
}
SocketChannel channel = null;
try {
Instance instance = beat.getIp();
Cluster cluster = beat.getTask().getCluster();
BeatKey beatKey = keyMap.get(beat.toString());
if (beatKey != null && beatKey.key.isValid()) {
if (System.currentTimeMillis() - beatKey.birthTime < TCP_KEEP_ALIVE_MILLIS) {
instance.setBeingChecked(false);
return null;
}
beatKey.key.cancel();
beatKey.key.channel().close();
}
channel = SocketChannel.open();
channel.configureBlocking(false);
// only by setting this can we make the socket close event asynchronous
channel.socket().setSoLinger(false,-1);
channel.socket().setReuseAddress(true);
channel.socket().setKeepAlive(true);
channel.socket().setTcpNoDelay(true);
int port = cluster.isUseIPPort4Check() ? instance.getPort() : cluster.getDefCkport();
channel.connect(new InetSocketAddress(instance.getIp(),port));
SelectionKey key
= channel.register(selector,SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
key.attach(beat);
keyMap.put(beat.toString(),new BeatKey(key));
beat.setStartTime(System.currentTimeMillis());
NIO_EXECUTOR.schedule(new TimeOutTask(key),CONNECT_TIMEOUT_MS,TimeUnit.MILLISECONDS);
} catch (Exception e) {
beat.finishCheck(false,"tcp:error:" + e.getMessage());
if (channel != null) {
try {
channel.close();
} catch (Exception ignore) {
}
}
}
return null;
}
}
複製程式碼
- TaskProcessor實現了Callable<Void>介面,其call方法主要是對目標instance執行beat操作,同時它會往NIO_EXECUTOR註冊TimeOutTask的延時任務
TimeOutTask
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/TcpSuperSenseProcessor.java
private static class TimeOutTask implements Runnable {
SelectionKey key;
public TimeOutTask(SelectionKey key) {
this.key = key;
}
@Override
public void run() {
if (key != null && key.isValid()) {
SocketChannel channel = (SocketChannel) key.channel();
Beat beat = (Beat) key.attachment();
if (channel.isConnected()) {
return;
}
try {
channel.finishConnect();
} catch (Exception ignore) {
}
try {
beat.finishCheck(false,beat.getTask().getCheckRTNormalized() * 2,"tcp:timeout");
key.cancel();
key.channel().close();
} catch (Exception ignore) {
}
}
}
}
複製程式碼
- TimeOutTask實現了Runnable方法,其run方法會執行channel.finishConnect(),然後執行beat.finishCheck標記success為false,msg為tcp:timeout
小結
- TcpSuperSenseProcessor實現了HealthCheckProcessor、Runnable介面
- 其process方法會遍歷instances,對於非markChecking的會執行healthCheckCommon.reEvaluateCheckRT,對於marked的直接跳過,對於markChecking的會建立Beat新增到taskQueue
- 其構造器會往TCP_CHECK_EXECUTOR註冊自己的Runnable,其run方法不斷執行processTask方法,然後從selector中select key然後建立PostProcessor提交給NIO_EXECUTOR;processTask方法會從taskQueue取出Beat,然後建立TaskProcessor新增到tasks,當tasks大小達到一定值則使用NIO_EXECUTOR.invokeAll(tasks)批量非同步執行