1. 程式人生 > 程式設計 >聊聊nacos的MysqlHealthCheckProcessor

聊聊nacos的MysqlHealthCheckProcessor

本文主要研究一下nacos的MysqlHealthCheckProcessor

MysqlHealthCheckProcessor

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/MysqlHealthCheckProcessor.java

@Component
public class MysqlHealthCheckProcessor implements HealthCheckProcessor {

    @Autowired
    private HealthCheckCommon healthCheckCommon;

    @Autowired
    private SwitchDomain switchDomain;

    public static final int CONNECT_TIMEOUT_MS = 500;

    private static final String CHECK_MYSQL_MASTER_SQL = "show global variables where variable_name='read_only'"
; private static final String MYSQL_SLAVE_READONLY = "ON"; private static ConcurrentMap<String,Connection> CONNECTION_POOL = new ConcurrentHashMap<String,Connection>(); private static ExecutorService EXECUTOR; static { int processorCount = Runtime.getRuntime().availableProcessors(); EXECUTOR = Executors.newFixedThreadPool(processorCount <= 1 ? 1 : processorCount / 2,new ThreadFactory
() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.nacos.mysql.checker"); return thread; } } ); } public MysqlHealthCheckProcessor
() { } @Override public String getType() { return "MYSQL"; } @Override public void process(HealthCheckTask task) { List<Instance> ips = task.getCluster().allIPs(false); SRV_LOG.debug("mysql check,ips:" + ips); if (CollectionUtils.isEmpty(ips)) { return; } for (Instance ip : ips) { try { if (ip.isMarked()) { if (SRV_LOG.isDebugEnabled()) { SRV_LOG.debug("mysql check,ip is marked as to skip health check,ip: {}",ip.getIp()); } continue; } if (!ip.markChecking()) { SRV_LOG.warn("mysql check started before last one finished,service: {}:{}:{}",task.getCluster().getService().getName(),task.getCluster().getName(),ip.getIp()); healthCheckCommon.reEvaluateCheckRT(task.getCheckRTNormalized() * 2,task,switchDomain.getMysqlHealthParams()); continue; } EXECUTOR.execute(new MysqlCheckTask(ip,task)); MetricsMonitor.getMysqlHealthCheckMonitor().incrementAndGet(); } catch (Exception e) { ip.setCheckRT(switchDomain.getMysqlHealthParams().getMax()); healthCheckCommon.checkFail(ip,"mysql:error:" + e.getMessage()); healthCheckCommon.reEvaluateCheckRT(switchDomain.getMysqlHealthParams().getMax(),switchDomain.getMysqlHealthParams()); } } } //...... } 複製程式碼
  • MysqlHealthCheckProcessor實現了HealthCheckProcessor介面,其getType方法返回的是MYSQL;其process方法會遍歷instances,對於非markChecking的會執行healthCheckCommon.reEvaluateCheckRT,對於marked的直接跳過,對於markChecking的會建立MysqlCheckTask提交給EXECUTOR執行

MysqlCheckTask

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/MysqlHealthCheckProcessor.java

    private class MysqlCheckTask implements Runnable {
        private Instance ip;
        private HealthCheckTask task;
        private long startTime = System.currentTimeMillis();

        public MysqlCheckTask(Instance ip,HealthCheckTask task) {
            this.ip = ip;
            this.task = task;
        }

        @Override
        public void run() {

            Statement statement = null;
            ResultSet resultSet = null;

            try {

                Cluster cluster = task.getCluster();
                String key = cluster.getService().getName() + ":" + cluster.getName() + ":" + ip.getIp() + ":" + ip.getPort();
                Connection connection = CONNECTION_POOL.get(key);
                AbstractHealthChecker.Mysql config = (AbstractHealthChecker.Mysql) cluster.getHealthChecker();

                if (connection == null || connection.isClosed()) {
                    MysqlDataSource dataSource = new MysqlDataSource();
                    dataSource.setConnectTimeout(CONNECT_TIMEOUT_MS);
                    dataSource.setSocketTimeout(CONNECT_TIMEOUT_MS);
                    dataSource.setUser(config.getUser());
                    dataSource.setPassword(config.getPwd());
                    dataSource.setLoginTimeout(1);

                    dataSource.setServerName(ip.getIp());
                    dataSource.setPort(ip.getPort());

                    connection = dataSource.getConnection();
                    CONNECTION_POOL.put(key,connection);
                }

                statement = connection.createStatement();
                statement.setQueryTimeout(1);

                resultSet = statement.executeQuery(config.getCmd());
                int resultColumnIndex = 2;

                if (CHECK_MYSQL_MASTER_SQL.equals(config.getCmd())) {
                    resultSet.next();
                    if (MYSQL_SLAVE_READONLY.equals(resultSet.getString(resultColumnIndex))) {
                        throw new IllegalStateException("current node is slave!");
                    }
                }

                healthCheckCommon.checkOK(ip,"mysql:+ok");
                healthCheckCommon.reEvaluateCheckRT(System.currentTimeMillis() - startTime,switchDomain.getMysqlHealthParams());
            } catch (SQLException e) {
                // fail immediately
                healthCheckCommon.checkFailNow(ip,"mysql:" + e.getMessage());
                healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(),switchDomain.getMysqlHealthParams());
            } catch (Throwable t) {
                Throwable cause = t;
                int maxStackDepth = 50;
                for (int deepth = 0; deepth < maxStackDepth && cause != null; deepth++) {
                    if (cause instanceof SocketTimeoutException
                            || cause instanceof ConnectTimeoutException
                            || cause instanceof TimeoutException
                            || cause.getCause() instanceof TimeoutException) {

                        healthCheckCommon.checkFail(ip,"mysql:timeout:" + cause.getMessage());
                        healthCheckCommon.reEvaluateCheckRT(task.getCheckRTNormalized() * 2,switchDomain.getMysqlHealthParams());
                        return;
                    }

                    cause = cause.getCause();
                }

                // connection error,probably not reachable
                healthCheckCommon.checkFail(ip,"mysql:error:" + t.getMessage());
                healthCheckCommon.reEvaluateCheckRT(switchDomain.getMysqlHealthParams().getMax(),switchDomain.getMysqlHealthParams());
            } finally {
                ip.setCheckRT(System.currentTimeMillis() - startTime);
                if (statement != null) {
                    try {
                        statement.close();
                    } catch (SQLException e) {
                        Loggers.SRV_LOG.error("[MYSQL-CHECK] failed to close statement:" + statement,e);
                    }
                }
                if (resultSet != null) {
                    try {
                        resultSet.close();
                    } catch (SQLException e) {
                        Loggers.SRV_LOG.error("[MYSQL-CHECK] failed to close resultSet:" + resultSet,e);
                    }
                }
            }
        }
    }
複製程式碼
  • MysqlCheckTask實現了Runnable方法,其run方法會從CONNECTION_POOL獲取指定例項的connection,如果connection為null或者是closed的則重新建立MysqlDataSource並getConnection()放入到CONNECTION_POOL;獲取到connection之後會執行config.getCmd()指定的查詢,然後根據返回結果或異常情況執行healthCheckCommon.checkOK或者healthCheckCommon.checkFailNow、healthCheckCommon.checkFail操作

小結

MysqlHealthCheckProcessor實現了HealthCheckProcessor介面,其getType方法返回的是MYSQL;其process方法會遍歷instances,對於非markChecking的會執行healthCheckCommon.reEvaluateCheckRT,對於marked的直接跳過,對於markChecking的會建立MysqlCheckTask提交給EXECUTOR執行

doc