1. 程式人生 > >Python系統學習-26

Python系統學習-26

  1. 執行本地命令:
import subprocess

# 執行命令,獲取結果
ret = subprocess.check_output('dir', shell=True, cwd=r'D:\PycharmProjects\Homework\admins\static')

# 執行命令,但不獲取結果
subprocess.check_call('dir', shell=True, cwd=r'D:\PycharmProjects\Homework\admins\static')
  1. 面向物件的繼承
    以此種方式模擬抽象基類,在父類中丟擲一個錯誤
class
Base(object): def f1(self): raise NotImplementedError('未實現的異常') class Foo(Base): pass obj = Foo() obj.f1()
  1. 解壓縮+移動檔案+刪除檔案
import shutil

# 壓縮檔案
shutil.make_archive(base_name=r'D:\PycharmProjects\Homework\admins\static\test1',
   				format='tar', root_dir=r'D:\PycharmProjects\Homework\admins\templates'
) # 解壓檔案 shutil.unpack_archive(r'D:\PycharmProjects\Homework\admins\static\test1.tar', extract_dir=r'D:\PycharmProjects\Homework\admins\static\ext', format='tar') import shutil shutil.move(r'D:\PycharmProjects\Homework\admins\static\ext\test.py', r'D:\PycharmProjects\Homework\admins\static\ext1\test.py'
) # 遞迴刪除資料夾下面所有檔案及資料夾 shutil.rmtree(r'D:\PycharmProjects\Homework\admins\static\ext')
  1. 上傳檔案
import requests

f = open('xxxxx.zip', 'rb')
requests.post(
   url='上傳檔案的地址',
   files=('xxxxx.zip', f)
)
f.close()

5.paramiko

"""
   pip3 install paramiko
   管理工具:
       - saltstack
           - salt-ssh
           - agent(RPC)
       - ansible
           - ssh
       - fabric
           - 給開發提供介面
"""

# ################# 基於paramiko遠端連線伺服器並執行命令【使用者名稱密碼】 #################
"""
import paramiko
# 建立SSH物件
ssh = paramiko.SSHClient()
# 允許連線不在know_hosts檔案中的主機
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 連線伺服器
ssh.connect(hostname='192.168.16.175', port=22, username='root', password='123')
# 執行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 獲取命令結果
result = stdout.read()
# 關閉連線
ssh.close()
print(result)
"""
# ################# 基於paramiko遠端連線伺服器並執行命令【祕鑰】 #################
"""
import paramiko
private_key = paramiko.RSAKey.from_private_key_file('本地私鑰') # 前提要把公鑰推送到伺服器
# 建立SSH物件
ssh = paramiko.SSHClient()
# 允許連線不在know_hosts檔案中的主機
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 連線伺服器
ssh.connect(hostname='c1.salt.com', port=22, username='使用者名稱', pkey=private_key)
# 執行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 獲取命令結果
result = stdout.read()
# 關閉連線
ssh.close()

"""

# ################# 基於paramiko遠端連線伺服器並上傳下載檔案【使用者名稱密碼】 #################
"""
import paramiko
transport = paramiko.Transport(('192.168.16.175', 22,))
transport.connect(username='root', password='123')
sftp = paramiko.SFTPClient.from_transport(transport)
# 將location.py 上傳至伺服器 /tmp/test.py
sftp.put(r'D:\s22\s22day25\x1.py', '/data/x1.py')
# 將remove_path 下載到本地 local_path
# sftp.get('/data/x1.py', r'D:\s22\s22day25\x2.py')
transport.close()
"""
# ################# 基於paramiko遠端連線伺服器並上傳下載檔案【祕鑰】 #################
"""
import paramiko
private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
transport = paramiko.Transport(('hostname', 22))
transport.connect(username='wupeiqi', pkey=private_key)

sftp = paramiko.SFTPClient.from_transport(transport)
# 將location.py 上傳至伺服器 /tmp/test.py
sftp.put('/tmp/location.py', '/tmp/test.py')
# 將remove_path 下載到本地 local_path
sftp.get('remove_path', 'local_path')
transport.close()
"""

# ################################# 基於祕鑰字串 ########################
import paramiko
from io import StringIO
key_str = """-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEAq7gLsqYArAFco02/55IgNg0r7NXOtEM3qXpb/dabJ5Uyky/8
NEHhFiQ7deHIRIuTW5Zb0kD6h6EBbVlUMBmwJrC2oSzySLU1w+ZNfH0PE6W6fans
H80whhuc/YgP+fjiO+VR/gFcqib8Rll5UfYzf5H8uuOnDeIXGCVgyHQSmt8if1+e
7hn1MVO1Lrm9Fco8ABI7dyv8/ZEwoSfh2C9rGYgA58LT1FkBRkOePbHD43xNfAYC
tfLvz6LErMnwdOW4sNMEWWAWv1fsTB35PAm5CazfKzmam9n5IQXhmUNcNvmaZtvP
c4f4g59mdsaWNtNaY96UjOfx83Om86gmdkKcnwIDAQABAoIBAQCnDBGFJuv8aA7A
ZkBLe+GN815JtOyye7lIS1n2I7En3oImoUWNaJEYwwJ8+LmjxMwDCtAkR0XwbvY+
c+nsKPEtkjb3sAu6I148RmwWsGncSRqUaJrljOypaW9dS+GO4Ujjz3/lw1lrxSUh
IqVc0E7kyRW8kP3QCaNBwArYteHreZFFp6XmtKMtXaEA3saJYILxaaXlYkoRi4k8
S2/K8aw3ZMR4tDCOfB4o47JaeiA/e185RK3A+mLn9xTDhTdZqTQpv17/YRPcgmwz
zu30fhVXQT/SuI0sO+bzCO4YGoEwoBX718AWhdLJFoFq1B7k2ZEzXTAtjEXQEWm6
01ndU/jhAasdfasdasdfasdfa3eraszxqwefasdfadasdffsFIfAsjQb4HdkmHuC
OeJrJOd+CYvdEeqJJNnF6AbHyYHIECkj0Qq1kEfLOEsqzd5nDbtkKBte6M1trbjl
HtJ2Yb8w6o/q/6Sbj7wf/cW3LIYEdeVCjScozVcQ9R83ea05J+QOAr4nAoGBAMaq
UzLJfLNWZ5Qosmir2oHStFlZpxspax/ln7DlWLW4wPB4YJalSVovF2Buo8hr8X65
lnPiE41M+G0Z7icEXiFyDBFDCtzx0x/RmaBokLathrFtI81UCx4gQPLaSVNMlvQA
539GsubSrO4LpHRNGg/weZ6EqQOXvHvkUkm2bDDJAoGATytFNxen6GtC0ZT3SRQM
WYfasdf3xbtuykmnluiofasd2sfmjnljkt7khghmghdasSDFGQfgaFoKfaawoYeH
C2XasVUsVviBn8kPSLSVBPX4JUfQmA6h8HsajeVahxN1U9e0nYJ0sYDQFUMTS2t8
RT57+WK/0ONwTWHdu+KnaJECgYEAid/ta8LQC3p82iNAZkpWlGDSD2yb/8rH8NQg
9tjEryFwrbMtfX9qn+8srx06B796U3OjifstjJQNmVI0qNlsJpQK8fPwVxRxbJS/
pMbNICrf3sUa4sZgDOFfkeuSlgACh4cVIozDXlR59Z8Y3CoiW0uObEgvMDIfenAj
98pl3ZkCgYEAj/UCSni0dwX4pnKNPm6LUgiS7QvIgM3H9piyt8aipQuzBi5LUKWw
DlQC4Zb73nHgdREtQYYXTu7p27Bl0Gizz1sW2eSgxFU8eTh+ucfVwOXKAXKU5SeI
+MbuBfUYQ4if2N/BXn47+/ecf3A4KgB37Le5SbLDddwCNxGlBzbpBa0=
-----END RSA PRIVATE KEY-----"""
file_obj = StringIO(key_str)
private_key = paramiko.RSAKey()
transport = paramiko.Transport(('10.0.1.40', 22))
transport.connect(username='wupeiqi', pkey=private_key)

ssh = paramiko.SSHClient()
ssh._transport = transport

stdin, stdout, stderr = ssh.exec_command('df')
result = stdout.read()

transport.close()

print(result)
  1. 執行緒池
import time
from concurrent.futures import ThreadPoolExecutor


def task(num):
	time.sleep(2)
	print(num)


pool = ThreadPoolExecutor(5)
for i in range(32):
	# 去執行緒池中獲取一個執行緒,並執行task任務。
	pool.submit(task, (i,))

pool.shutdown()
  1. 殺程序
import os
import signal

os.kill(123, signal.SIGKILL)