1. 程式人生 > 其它 >python-ssh連結linux查詢日誌,並按日誌等級在控制檯分顏色輸出日誌

python-ssh連結linux查詢日誌,並按日誌等級在控制檯分顏色輸出日誌

import paramiko
# unicode_utils.py
def to_str(bytes_or_str):
    """
    把byte型別轉換為str
    :param bytes_or_str:
    :return:
    """
    if isinstance(bytes_or_str, bytes):
        value = bytes_or_str.decode('utf-8')
    else:
        value = bytes_or_str
    return value

class SSHConnection(object):

    def __init__(self, host_dict):
        self.host = host_dict['host']
        self.port = host_dict['port']
        self.username = host_dict['username']
        self.pwd = host_dict['pwd']
        self.__k = None

    def connect(self):
        transport = paramiko.Transport((self.host, self.port))
        transport.connect(username=self.username, password=self.pwd)
        self.__transport = transport

    def close(self):
        self.__transport.close()

    def run_cmd(self, command):
        """
         執行shell命令,返回字典
         return {'color': 'red','res':error}或
         return {'color': 'green', 'res':res}
        :param command:
        :return:
        """
        ssh = paramiko.SSHClient()
        ssh._transport = self.__transport
        # 執行命令
        stdin, stdout, stderr = ssh.exec_command(command)
        # 獲取命令結果
        res = to_str(stdout.read())
        # 獲取錯誤資訊
        error = to_str(stderr.read())
        # 如果有錯誤資訊,返回error
        # 否則返回res
        #這裡程式碼寫了根據狀態控制檯列印不同顏色,正常顯示綠色,錯誤顯示紅色
        if error.strip():
            return f"\033[1;31;40m'res': {error}\033[0m"
        else:
            return f"\033[1;32;40m'res': {res}\033[0m"

    def upload(self, local_path, target_path):
        # 連線,上傳
        sftp = paramiko.SFTPClient.from_transport(self.__transport)
        # 將location.py 上傳至伺服器 /tmp/test.py
        sftp.put(local_path, target_path, confirm=True)
        # print(os.stat(local_path).st_mode)
        # 增加許可權
        # sftp.chmod(target_path, os.stat(local_path).st_mode)
        sftp.chmod(target_path, 0o755)  # 注意這裡的許可權是八進位制的,八進位制需要使用0o作為字首

    def download(self, target_path, local_path):
        # 連線,下載
        sftp = paramiko.SFTPClient.from_transport(self.__transport)
        # 將location.py 下載至伺服器 /tmp/test.py
        sftp.get(target_path, local_path)

    # 銷燬
    def __del__(self):
        self.close()