1. 程式人生 > 實用技巧 >python ssh 執行shell命令

python ssh 執行shell命令

# -*- coding: utf-8 -*-

import paramiko
import threading

def run(host_ip, username, password, command):
    ssh = paramiko.SSHClient()
    try:
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(host_ip, 22, username, password)

        print('===================exec on [%s]=====================
' % host_ip) stdin, stdout, stderr = ssh.exec_command(command, timeout=300) out = stdout.readlines() for o in out: print (o.strip('\n')) except Exception as ex: print('error, host is [%s], msg is [%s]' % (host_ip, ex.message)) finally: ssh.close()
if __name__ == '__main__': # 將需要批量執行命令的host ip地址填到這裡 # eg: host_ip_list = ['IP1', 'IP2'] host_ip_list = ['147.116.20.19'] for _host_ip in host_ip_list: # 使用者名稱,密碼,執行的命令填到這裡 run(_host_ip, 'tzgame', 'tzgame@1234', 'df -h') run(_host_ip, 'tzgame', 'tzgame@1234'
, 'ping -c 5 220.181.38.148')

下面是多執行緒執行版本

#!/usr/bin/python
#coding:utf-8
import threading
import subprocess
import os
import sys


sshport = 13131
log_path = 'update_log'
output = {}

def execute(s, ip, cmd, log_path_today):
    with s:      
        cmd = '''ssh -p%s root@%s -n "%s" ''' % (sshport, ip, cmd)

        ret = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        output[ip] = ret.stdout.readlines()




if __name__ == "__main__":
    if len(sys.argv) != 3:
        print "Usage: %s config.ini cmd" % sys.argv[0]
        sys.exit(1)
                                     
    if not os.path.isfile(sys.argv[1]):
        print "Usage: %s is not file!" % sys.argv[1]
        sys.exit(1)
                                         
    cmd = sys.argv[2]
                                     
    f = open(sys.argv[1],'r')
    list = f.readlines()
    f.close()
    today = datetime.date.today()
    log_path_today = '%s/%s' % (log_path,today)
    if not os.path.isdir(log_path_today):
        os.makedirs(log_path_today)
                                     
    threading_num = 100
    if threading_num > len(list):
        threading_num = len(list)

    s = threading.Semaphore(threading_num)
                                     
    for line in list:
        ip = line.strip()
        t = threading.Thread(target=execute,args=(s, ip,cmd,log_path_today))
        t.setDaemon(True)
        t.start()
                                         
    main_thread = threading.currentThread()
    for t in threading.enumerate():
        if t is main_thread:
            continue
        t.join()
                                         
    for ip,result in output.items():
        print "%s: " % ip
        for line in result:
            print "    %s" % line.strip()
                                     
    print "Done!"

以上指令碼讀取兩個引數,第一個為存放IP的文字,第二個為shell命令

執行效果如下

# -*- coding:utf-8 -*-
import requests
from requests.exceptions import RequestException
import os, time
import re
from lxml import etree
import threading

lock = threading.Lock()
def get_html(url):

    response = requests.get(url, timeout=10)
    # print(response.status_code)
    try:
        if response.status_code == 200:

            # print(response.text)
            return response.text
        else:
             return None
    except RequestException:
        print("請求失敗")
        # return None


def parse_html(html_text):

    html = etree.HTML(html_text)

    if len(html) > 0:
        img_src = html.xpath("//img[@class='photothumb lazy']/@data-original")  # 元素提取方法
        # print(img_src)
        return img_src

    else:
        print("解析頁面元素失敗")

def get_image_pages(url):
    html_text = get_html(url)  # 獲取搜尋url響應內容
    # print(html_text)
    if html_text is not None:
        html = etree.HTML(html_text)  # 生成XPath解析物件
        last_page = html.xpath("//div[@class='pages']//a[last()]/@href")  # 提取最後一頁所在href連結
        print(last_page)
        if last_page:
            max_page = re.compile(r'(\d+)', re.S).search(last_page[0]).group()  # 使用正則表示式提取連結中的頁碼數字
            print(max_page)
            print(type(max_page))
            return int(max_page)  # 將字串頁碼轉為整數並返回
        else:
            print("暫無資料")
            return None
    else:
        print("查詢結果失敗")


def get_all_image_url(page_number):
    base_url = 'https://imgbin.com/free-png/naruto/'
    image_urls = []

    x = 1  # 定義一個標識,用於給每個圖片url編號,從1遞增
    for i in range(1, page_number):
        url = base_url + str(i)  # 根據頁碼遍歷請求url
        try:
            html = get_html(url)  # 解析每個頁面的內容
            if html:
                data = parse_html(html)  # 提取頁面中的圖片url
                # print(data)
                # time.sleep(3)
                if data:
                    for j in data:
                        image_urls.append({
                            'name': x,
                            'value': j
                        })
                        x += 1  # 每提取一個圖片url,標識x增加1
        except RequestException as f:
            print("遇到錯誤:", f)
            continue
    # print(image_urls)
    return image_urls

def get_image_content(url):
    try:
        r = requests.get(url, timeout=15)
        if r.status_code == 200:
            return r.content
        return None
    except RequestException:
        return None

def main(url, image_name):
    semaphore.acquire()  # 加鎖,限制執行緒數
    print('當前子執行緒: {}'.format(threading.current_thread().name))
    save_path = os.path.dirname(os.path.abspath('.')) + '/pics/'
    try:
        file_path = '{0}/{1}.jpg'.format(save_path, image_name)
        if not os.path.exists(file_path):  # 判斷是否存在檔案,不存在則爬取
            with open(file_path, 'wb') as f:
                f.write(get_image_content(url))
                f.close()

                print('第{}個檔案儲存成功'.format(image_name))

        else:
            print("第{}個檔案已存在".format(image_name))

        semaphore.release()  # 解鎖imgbin-多執行緒-重寫run方法.py

    except FileNotFoundError as f:
        print("第{}個檔案下載時遇到錯誤,url為:{}:".format(image_name, url))
        print("報錯:", f)
        raise

    except TypeError as e:
        print("第{}個檔案下載時遇到錯誤,url為:{}:".format(image_name, url))
        print("報錯:", e)

class MyThread(threading.Thread):
    """繼承Thread類重寫run方法建立新程序"""
    def __init__(self, func, args):
        """

        :param func: run方法中要呼叫的函式名
        :param args: func函式所需的引數
        """
        threading.Thread.__init__(self)
        self.func = func
        self.args = args

    def run(self):
        print('當前子執行緒: {}'.format(threading.current_thread().name))
        self.func(self.args[0], self.args[1])
        # 呼叫func函式
        # 因為這裡的func函式其實是上述的main()函式,它需要2個引數;args傳入的是個引數元組,拆解開來傳入


if __name__ == '__main__':
    start = time.time()
    print('這是主執行緒:{}'.format(threading.current_thread().name))

    urls = get_all_image_url(5)  # 獲取所有圖片url列表
    thread_list = []  # 定義一個列表,向裡面追加執行緒
    semaphore = threading.BoundedSemaphore(5) # 或使用Semaphore方法
    for t in urls:
        # print(i)

        m = MyThread(main, (t["value"], t["name"]))  # 呼叫MyThread類,得到一個例項

        thread_list.append(m)

    for m in thread_list:

        m.start()  # 呼叫start()方法,開始執行

    for m in thread_list:
        m.join()  # 子執行緒呼叫join()方法,使主執行緒等待子執行緒執行完畢之後才退出


    end = time.time()
    print(end-start)
    # get_image_pages("https://imgbin.com/free-png/Naruto")