1. 程式人生 > 程式設計 >Dubbo原始碼解析(十二)遠端通訊——Telnet

Dubbo原始碼解析(十二)遠端通訊——Telnet

遠端通訊——Telnet

目標:介紹telnet的相關實現邏輯、介紹dubbo-remoting-api中的telnet包內的原始碼解析。

前言

從dubbo 2.0.5開始,dubbo開始支援通過 telnet 命令來進行服務治理。本文就是講解一些公用的telnet命令的實現。下面來看一下telnet實現的類圖:

telnet類圖

可以看到,實現了TelnetHandler介面的有六個類,除了TelnetHandlerAdapter是以外,其他五個分別對應了clear、exit、help、log、status命令的實現,具體用來幹嘛,請看官方檔案的介紹。

原始碼分析

(一)TelnetHandler

@SPI
public interface TelnetHandler { /** * telnet. * 處理對應的telnet命令 * @param channel * @param message telnet命令 */ String telnet(Channel channel,String message) throws RemotingException; } 複製程式碼

該介面上telnet命令處理器介面,是一個可擴充套件介面。它定義了一個方法,就是處理相關的telnet命令。

(二)TelnetHandlerAdapter

該類繼承了ChannelHandlerAdapter,實現了TelnetHandler介面,是TelnetHandler的介面卡類,負責在接收到HeaderExchangeHandler發來的telnet命令後分發給對應的TelnetHandler實現類去實現,並且返回命令結果。

public class TelnetHandlerAdapter extends ChannelHandlerAdapter implements TelnetHandler {

    /**
     * 擴充套件載入器
     */
    private final ExtensionLoader<TelnetHandler> extensionLoader = ExtensionLoader.getExtensionLoader(TelnetHandler.class);

    @Override
public String telnet(Channel channel,String message) throws RemotingException { // 獲得提示鍵配置,用於nc獲取資訊時不顯示提示符 String prompt = channel.getUrl().getParameterAndDecoded(Constants.PROMPT_KEY,Constants.DEFAULT_PROMPT); boolean noprompt = message.contains("--no-prompt"); message = message.replace("--no-prompt",""); StringBuilder buf = new StringBuilder(); // 刪除頭尾空白符的字串 message = message.trim(); String command; // 獲得命令 if (message.length() > 0) { int i = message.indexOf(' '); if (i > 0) { // 獲得命令 command = message.substring(0,i).trim(); // 獲得引數 message = message.substring(i + 1).trim(); } else { command = message; message = ""; } } else { command = ""; } if (command.length() > 0) { // 如果有該命令的擴充套件實現類 if (extensionLoader.hasExtension(command)) { try { // 執行相應命令的實現類的telnet String result = extensionLoader.getExtension(command).telnet(channel,message); if (result == null) { return null; } // 返回結果 buf.append(result); } catch (Throwable t) { buf.append(t.getMessage()); } } else { buf.append("Unsupported command: "); buf.append(command); } } if (buf.length() > 0) { buf.append("\r\n"); } // 新增 telnet 提示語 if (prompt != null && prompt.length() > 0 && !noprompt) { buf.append(prompt); } return buf.toString(); } } 複製程式碼

該類只實現了telnet方法,其中的邏輯還是比較清晰,就是根據對應的命令去讓對應的實現類產生命令結果。

(三)ClearTelnetHandler

該類實現了TelnetHandler介面,封裝了clear命令的實現。

@Activate
@Help(parameter = "[lines]",summary = "Clear screen.",detail = "Clear screen.")
public class ClearTelnetHandler implements TelnetHandler {

    @Override
    public String telnet(Channel channel,String message) {
        // 清除螢幕上的內容行數
        int lines = 100;
        if (message.length() > 0) {
            // 如果不是一個數字
            if (!StringUtils.isInteger(message)) {
                return "Illegal lines " + message + ",must be integer.";
            }
            lines = Integer.parseInt(message);
        }
        StringBuilder buf = new StringBuilder();
        // 一行一行清除
        for (int i = 0; i < lines; i++) {
            buf.append("\r\n");
        }
        return buf.toString();
    }

}
複製程式碼

(四)ExitTelnetHandler

該類實現了TelnetHandler介面,封裝了exit命令的實現。

@Activate
@Help(parameter = "",summary = "Exit the telnet.",detail = "Exit the telnet.")
public class ExitTelnetHandler implements TelnetHandler {

    @Override
    public String telnet(Channel channel,String message) {
        // 關閉通道
        channel.close();
        return null;
    }

}
複製程式碼

(五)HelpTelnetHandler

該類實現了TelnetHandler介面,封裝了help命令的實現。

@Activate
@Help(parameter = "[command]",summary = "Show help.",detail = "Show help.")
public class HelpTelnetHandler implements TelnetHandler {

    /**
     * 擴充套件載入器
     */
    private final ExtensionLoader<TelnetHandler> extensionLoader = ExtensionLoader.getExtensionLoader(TelnetHandler.class);

    @Override
    public String telnet(Channel channel,String message) {
        // 如果需要檢視某一個命令的幫助
        if (message.length() > 0) {
            if (!extensionLoader.hasExtension(message)) {
                return "No such command " + message;
            }
            // 獲得對應的擴充套件實現類
            TelnetHandler handler = extensionLoader.getExtension(message);
            Help help = handler.getClass().getAnnotation(Help.class);
            StringBuilder buf = new StringBuilder();
            // 生成命令和幫助資訊
            buf.append("Command:\r\n    ");
            buf.append(message + " " + help.parameter().replace("\r\n"," ").replace("\n"," "));
            buf.append("\r\nSummary:\r\n    ");
            buf.append(help.summary().replace("\r\n"," "));
            buf.append("\r\nDetail:\r\n    ");
            buf.append(help.detail().replace("\r\n","    \r\n").replace("\n","    \n"));
            return buf.toString();
            // 如果檢視所有命令的幫助
        } else {
            List<List<String>> table = new ArrayList<List<String>>();
            // 獲得所有命令的提示資訊
            List<TelnetHandler> handlers = extensionLoader.getActivateExtension(channel.getUrl(),"telnet");
            if (handlers != null && !handlers.isEmpty()) {
                for (TelnetHandler handler : handlers) {
                    Help help = handler.getClass().getAnnotation(Help.class);
                    List<String> row = new ArrayList<String>();
                    String parameter = " " + extensionLoader.getExtensionName(handler) + " " + (help != null ? help.parameter().replace("\r\n"," ") : "");
                    row.add(parameter.length() > 50 ? parameter.substring(0,50) + "..." : parameter);
                    String summary = help != null ? help.summary().replace("\r\n"," ") : "";
                    row.add(summary.length() > 50 ? summary.substring(0,50) + "..." : summary);
                    table.add(row);
                }
            }
            return "Please input \"help [command]\" show detail.\r\n" + TelnetUtils.toList(table);
        }
    }

}
複製程式碼

help分為了需要檢視某一個命令的幫助還是檢視全部命令的幫助。

(六)LogTelnetHandler

該類實現了TelnetHandler介面,封裝了log命令的實現。

@Activate
@Help(parameter = "level",summary = "Change log level or show log ",detail = "Change log level or show log")
public class LogTelnetHandler implements TelnetHandler {

    public static final String SERVICE_KEY = "telnet.log";

    @Override
    public String telnet(Channel channel,String message) {
        long size = 0;
        File file = LoggerFactory.getFile();
        StringBuffer buf = new StringBuffer();
        if (message == null || message.trim().length() == 0) {
            buf.append("EXAMPLE: log error / log 100");
        } else {
            String str[] = message.split(" ");
            if (!StringUtils.isInteger(str[0])) {
                // 設定日誌級別
                LoggerFactory.setLevel(Level.valueOf(message.toUpperCase()));
            } else {
                // 獲得日誌長度
                int SHOW_LOG_LENGTH = Integer.parseInt(str[0]);

                if (file != null && file.exists()) {
                    try {
                        FileInputStream fis = new FileInputStream(file);
                        try {
                            FileChannel filechannel = fis.getChannel();
                            try {
                                size = filechannel.size();
                                ByteBuffer bb;
                                if (size <= SHOW_LOG_LENGTH) {
                                    // 分配緩衝區
                                    bb = ByteBuffer.allocate((int) size);
                                    // 讀日誌資料
                                    filechannel.read(bb,0);
                                } else {
                                    int pos = (int) (size - SHOW_LOG_LENGTH);
                                    // 分配緩衝區
                                    bb = ByteBuffer.allocate(SHOW_LOG_LENGTH);
                                    // 讀取日誌資料
                                    filechannel.read(bb,pos);
                                }
                                bb.flip();
                                String content = new String(bb.array()).replace("<","&lt;")
                                        .replace(">","&gt;").replace("\n","<br/><br/>");
                                buf.append("\r\ncontent:" + content);

                                buf.append("\r\nmodified:" + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
                                        .format(new Date(file.lastModified()))));
                                buf.append("\r\nsize:" + size + "\r\n");
                            } finally {
                                filechannel.close();
                            }
                        } finally {
                            fis.close();
                        }
                    } catch (Exception e) {
                        buf.append(e.getMessage());
                    }
                } else {
                    size = 0;
                    buf.append("\r\nMESSAGE: log file not exists or log appender is console .");
                }
            }
        }
        buf.append("\r\nCURRENT LOG LEVEL:" + LoggerFactory.getLevel())
                .append("\r\nCURRENT LOG APPENDER:" + (file == null ? "console" : file.getAbsolutePath()));
        return buf.toString();
    }

}
複製程式碼

log命令實現原理就是從日誌檔案中把日誌資訊讀取出來。

(七)StatusTelnetHandler

該類實現了TelnetHandler介面,封裝了status命令的實現。

@Activate
@Help(parameter = "[-l]",summary = "Show status.",detail = "Show status.")
public class StatusTelnetHandler implements TelnetHandler {

    private final ExtensionLoader<StatusChecker> extensionLoader = ExtensionLoader.getExtensionLoader(StatusChecker.class);

    @Override
    public String telnet(Channel channel,String message) {
        // 顯示狀態列表
        if (message.equals("-l")) {
            List<StatusChecker> checkers = extensionLoader.getActivateExtension(channel.getUrl(),"status");
            String[] header = new String[]{"resource","status","message"};
            List<List<String>> table = new ArrayList<List<String>>();
            Map<String,Status> statuses = new HashMap<String,Status>();
            if (checkers != null && !checkers.isEmpty()) {
                // 遍歷各個資源的狀態,如果一個當全部 OK 時則顯示 OK,只要有一個 ERROR 則顯示 ERROR,只要有一個 WARN 則顯示 WARN
                for (StatusChecker checker : checkers) {
                    String name = extensionLoader.getExtensionName(checker);
                    Status stat;
                    try {
                        stat = checker.check();
                    } catch (Throwable t) {
                        stat = new Status(Status.Level.ERROR,t.getMessage());
                    }
                    statuses.put(name,stat);
                    if (stat.getLevel() != null && stat.getLevel() != Status.Level.UNKNOWN) {
                        List<String> row = new ArrayList<String>();
                        row.add(name);
                        row.add(String.valueOf(stat.getLevel()));
                        row.add(stat.getMessage() == null ? "" : stat.getMessage());
                        table.add(row);
                    }
                }
            }
            Status stat = StatusUtils.getSummaryStatus(statuses);
            List<String> row = new ArrayList<String>();
            row.add("summary");
            row.add(String.valueOf(stat.getLevel()));
            row.add(stat.getMessage());
            table.add(row);
            return TelnetUtils.toTable(header,table);
        } else if (message.length() > 0) {
            return "Unsupported parameter " + message + " for status.";
        }
        String status = channel.getUrl().getParameter("status");
        Map<String,Status>();
        if (status != null && status.length() > 0) {
            String[] ss = Constants.COMMA_SPLIT_PATTERN.split(status);
            for (String s : ss) {
                StatusChecker handler = extensionLoader.getExtension(s);
                Status stat;
                try {
                    stat = handler.check();
                } catch (Throwable t) {
                    stat = new Status(Status.Level.ERROR,t.getMessage());
                }
                statuses.put(s,stat);
            }
        }
        Status stat = StatusUtils.getSummaryStatus(statuses);
        return String.valueOf(stat.getLevel());
    }

}
複製程式碼

(八)Help

該介面是幫助檔案介面

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface Help {

    String parameter() default "";

    String summary();

    String detail() default "";

}
複製程式碼

可以看上在每個命令的實現類上都加上了@Help註解,為了新增一些幫助文案。

(九)TelnetUtils

該類是Telnet命令的工具類,其中邏輯我就不介紹了。

(十)TelnetCodec

該類繼承了TransportCodec,是telnet的編解碼類。

1.屬性

private static final Logger logger = LoggerFactory.getLogger(TelnetCodec.class);

/**
 * 歷史命令列表
 */
private static final String HISTORY_LIST_KEY = "telnet.history.list";

/**
 * 歷史命令位置,就是用上下鍵來找歷史命令
 */
private static final String HISTORY_INDEX_KEY = "telnet.history.index";

/**
 * 向上鍵
 */
private static final byte[] UP = new byte[]{27,91,65};

/**
 * 向下鍵
 */
private static final byte[] DOWN = new byte[]{27,66};

/**
 * 回車
 */
private static final List<?> ENTER = Arrays.asList(new Object[]{new byte[]{'\r','\n'} /* Windows Enter */,new byte[]{'\n'} /* Linux Enter */});

/**
 * 退出
 */
private static final List<?> EXIT = Arrays.asList(new Object[]{new byte[]{3} /* Windows Ctrl+C */,new byte[]{-1,-12,-1,-3,6} /* Linux Ctrl+C */,-19,6} /* Linux Pause */});
複製程式碼

2.getCharset

private static Charset getCharset(Channel channel) {
    if (channel != null) {
        // 獲得屬性設定
        Object attribute = channel.getAttribute(Constants.CHARSET_KEY);
        // 返回指定字符集的charset物件。
        if (attribute instanceof String) {
            try {
                return Charset.forName((String) attribute);
            } catch (Throwable t) {
                logger.warn(t.getMessage(),t);
            }
        } else if (attribute instanceof Charset) {
            return (Charset) attribute;
        }
        URL url = channel.getUrl();
        if (url != null) {
            String parameter = url.getParameter(Constants.CHARSET_KEY);
            if (parameter != null && parameter.length() > 0) {
                try {
                    return Charset.forName(parameter);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(),t);
                }
            }
        }
    }
    // 預設的編碼是utf-8
    try {
        return Charset.forName(Constants.DEFAULT_CHARSET);
    } catch (Throwable t) {
        logger.warn(t.getMessage(),t);
    }
    return Charset.defaultCharset();
}
複製程式碼

該方法是獲得通道的字符集,根據url中編碼來獲得字符集,預設是utf-8。

3.encode

@Override
public void encode(Channel channel,ChannelBuffer buffer,Object message) throws IOException {
    // 如果需要編碼的是 telnet 命令結果
    if (message instanceof String) {
        //如果為客戶端側的通道message直接返回
        if (isClientSide(channel)) {
            message = message + "\r\n";
        }
        // 獲得位元組陣列
        byte[] msgData = ((String) message).getBytes(getCharset(channel).name());
        // 寫入緩衝區
        buffer.writeBytes(msgData);
    } else {
        super.encode(channel,buffer,message);
    }
}
複製程式碼

該方法是編碼方法。

4.decode

@Override
public Object decode(Channel channel,ChannelBuffer buffer) throws IOException {
    // 獲得緩衝區可讀的位元組
    int readable = buffer.readableBytes();
    byte[] message = new byte[readable];
    // 從緩衝區讀資料
    buffer.readBytes(message);
    return decode(channel,readable,message);
}

@SuppressWarnings("unchecked")
protected Object decode(Channel channel,int readable,byte[] message) throws IOException {
    // 如果是客戶端側,直接返回結果
    if (isClientSide(channel)) {
        return toString(message,getCharset(channel));
    }
    // 檢驗訊息長度
    checkPayload(channel,readable);
    if (message == null || message.length == 0) {
        return DecodeResult.NEED_MORE_INPUT;
    }

    // 如果回退
    if (message[message.length - 1] == '\b') { // Windows backspace echo
        try {
            boolean doublechar = message.length >= 3 && message[message.length - 3] < 0; // double byte char
            channel.send(new String(doublechar ? new byte[]{32,32,8,8} : new byte[]{32,8},getCharset(channel).name()));
        } catch (RemotingException e) {
            throw new IOException(StringUtils.toString(e));
        }
        return DecodeResult.NEED_MORE_INPUT;
    }

    // 如果命令是退出
    for (Object command : EXIT) {
        if (isEquals(message,(byte[]) command)) {
            if (logger.isInfoEnabled()) {
                logger.info(new Exception("Close channel " + channel + " on exit command: " + Arrays.toString((byte[]) command)));
            }
            // 關閉通道
            channel.close();
            return null;
        }
    }
    boolean up = endsWith(message,UP);
    boolean down = endsWith(message,DOWN);
    // 如果用上下鍵找歷史命令
    if (up || down) {
        LinkedList<String> history = (LinkedList<String>) channel.getAttribute(HISTORY_LIST_KEY);
        if (history == null || history.isEmpty()) {
            return DecodeResult.NEED_MORE_INPUT;
        }
        Integer index = (Integer) channel.getAttribute(HISTORY_INDEX_KEY);
        Integer old = index;
        if (index == null) {
            index = history.size() - 1;
        } else {
            // 向上
            if (up) {
                index = index - 1;
                if (index < 0) {
                    index = history.size() - 1;
                }
            } else {
                // 向下
                index = index + 1;
                if (index > history.size() - 1) {
                    index = 0;
                }
            }
        }
        // 獲得歷史命令,併傳送給客戶端
        if (old == null || !old.equals(index)) {
            // 設定當前命令位置
            channel.setAttribute(HISTORY_INDEX_KEY,index);
            // 獲得歷史命令
            String value = history.get(index);
            // 清除客戶端原有命令,用查到的歷史命令替代
            if (old != null && old >= 0 && old < history.size()) {
                String ov = history.get(old);
                StringBuilder buf = new StringBuilder();
                for (int i = 0; i < ov.length(); i++) {
                    buf.append("\b");
                }
                for (int i = 0; i < ov.length(); i++) {
                    buf.append(" ");
                }
                for (int i = 0; i < ov.length(); i++) {
                    buf.append("\b");
                }
                value = buf.toString() + value;
            }
            try {
                channel.send(value);
            } catch (RemotingException e) {
                throw new IOException(StringUtils.toString(e));
            }
        }
        // 返回,需要更多指令
        return DecodeResult.NEED_MORE_INPUT;
    }
    // 關閉命令
    for (Object command : EXIT) {
        if (isEquals(message,(byte[]) command)) {
            if (logger.isInfoEnabled()) {
                logger.info(new Exception("Close channel " + channel + " on exit command " + command));
            }
            channel.close();
            return null;
        }
    }
    byte[] enter = null;
    // 如果命令是回車
    for (Object command : ENTER) {
        if (endsWith(message,(byte[]) command)) {
            enter = (byte[]) command;
            break;
        }
    }
    if (enter == null) {
        return DecodeResult.NEED_MORE_INPUT;
    }
    LinkedList<String> history = (LinkedList<String>) channel.getAttribute(HISTORY_LIST_KEY);
    Integer index = (Integer) channel.getAttribute(HISTORY_INDEX_KEY);
    // 移除歷史命令
    channel.removeAttribute(HISTORY_INDEX_KEY);
    // 將歷史命令拼接
    if (history != null && !history.isEmpty() && index != null && index >= 0 && index < history.size()) {
        String value = history.get(index);
        if (value != null) {
            byte[] b1 = value.getBytes();
            byte[] b2 = new byte[b1.length + message.length];
            System.arraycopy(b1,0,b2,b1.length);
            System.arraycopy(message,b1.length,message.length);
            message = b2;
        }
    }
    // 將命令位元組陣列,轉成具體的一條命令
    String result = toString(message,getCharset(channel));
    if (result.trim().length() > 0) {
        if (history == null) {
            history = new LinkedList<String>();
            channel.setAttribute(HISTORY_LIST_KEY,history);
        }
        if (history.isEmpty()) {
            history.addLast(result);
        } else if (!result.equals(history.getLast())) {
            history.remove(result);
            // 新增當前命令到歷史尾部
            history.addLast(result);
            // 超過上限,移除歷史的頭部
            if (history.size() > 10) {
                history.removeFirst();
            }
        }
    }
    return result;
}
複製程式碼

該方法是編碼。

後記

該部分相關的原始碼解析地址:github.com/CrazyHZM/in…

該文章講解了telnet的相關實現邏輯,本文有興趣的朋友可以看看。下一篇我會講解基於grizzly實現遠端通訊部分。