Python系統學習-26
阿新 • • 發佈:2019-01-11
- 執行本地命令:
import subprocess
# 執行命令,獲取結果
ret = subprocess.check_output('dir', shell=True, cwd=r'D:\PycharmProjects\Homework\admins\static')
# 執行命令,但不獲取結果
subprocess.check_call('dir', shell=True, cwd=r'D:\PycharmProjects\Homework\admins\static')
- 面向物件的繼承
以此種方式模擬抽象基類,在父類中丟擲一個錯誤
class Base(object):
def f1(self):
raise NotImplementedError('未實現的異常')
class Foo(Base):
pass
obj = Foo()
obj.f1()
- 解壓縮+移動檔案+刪除檔案
import shutil
# 壓縮檔案
shutil.make_archive(base_name=r'D:\PycharmProjects\Homework\admins\static\test1',
format='tar', root_dir=r'D:\PycharmProjects\Homework\admins\templates' )
# 解壓檔案
shutil.unpack_archive(r'D:\PycharmProjects\Homework\admins\static\test1.tar',
extract_dir=r'D:\PycharmProjects\Homework\admins\static\ext', format='tar')
import shutil
shutil.move(r'D:\PycharmProjects\Homework\admins\static\ext\test.py',
r'D:\PycharmProjects\Homework\admins\static\ext1\test.py' )
# 遞迴刪除資料夾下面所有檔案及資料夾
shutil.rmtree(r'D:\PycharmProjects\Homework\admins\static\ext')
- 上傳檔案
import requests
f = open('xxxxx.zip', 'rb')
requests.post(
url='上傳檔案的地址',
files=('xxxxx.zip', f)
)
f.close()
5.paramiko
"""
pip3 install paramiko
管理工具:
- saltstack
- salt-ssh
- agent(RPC)
- ansible
- ssh
- fabric
- 給開發提供介面
"""
# ################# 基於paramiko遠端連線伺服器並執行命令【使用者名稱密碼】 #################
"""
import paramiko
# 建立SSH物件
ssh = paramiko.SSHClient()
# 允許連線不在know_hosts檔案中的主機
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 連線伺服器
ssh.connect(hostname='192.168.16.175', port=22, username='root', password='123')
# 執行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 獲取命令結果
result = stdout.read()
# 關閉連線
ssh.close()
print(result)
"""
# ################# 基於paramiko遠端連線伺服器並執行命令【祕鑰】 #################
"""
import paramiko
private_key = paramiko.RSAKey.from_private_key_file('本地私鑰') # 前提要把公鑰推送到伺服器
# 建立SSH物件
ssh = paramiko.SSHClient()
# 允許連線不在know_hosts檔案中的主機
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 連線伺服器
ssh.connect(hostname='c1.salt.com', port=22, username='使用者名稱', pkey=private_key)
# 執行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 獲取命令結果
result = stdout.read()
# 關閉連線
ssh.close()
"""
# ################# 基於paramiko遠端連線伺服器並上傳下載檔案【使用者名稱密碼】 #################
"""
import paramiko
transport = paramiko.Transport(('192.168.16.175', 22,))
transport.connect(username='root', password='123')
sftp = paramiko.SFTPClient.from_transport(transport)
# 將location.py 上傳至伺服器 /tmp/test.py
sftp.put(r'D:\s22\s22day25\x1.py', '/data/x1.py')
# 將remove_path 下載到本地 local_path
# sftp.get('/data/x1.py', r'D:\s22\s22day25\x2.py')
transport.close()
"""
# ################# 基於paramiko遠端連線伺服器並上傳下載檔案【祕鑰】 #################
"""
import paramiko
private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
transport = paramiko.Transport(('hostname', 22))
transport.connect(username='wupeiqi', pkey=private_key)
sftp = paramiko.SFTPClient.from_transport(transport)
# 將location.py 上傳至伺服器 /tmp/test.py
sftp.put('/tmp/location.py', '/tmp/test.py')
# 將remove_path 下載到本地 local_path
sftp.get('remove_path', 'local_path')
transport.close()
"""
# ################################# 基於祕鑰字串 ########################
import paramiko
from io import StringIO
key_str = """-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEAq7gLsqYArAFco02/55IgNg0r7NXOtEM3qXpb/dabJ5Uyky/8
NEHhFiQ7deHIRIuTW5Zb0kD6h6EBbVlUMBmwJrC2oSzySLU1w+ZNfH0PE6W6fans
H80whhuc/YgP+fjiO+VR/gFcqib8Rll5UfYzf5H8uuOnDeIXGCVgyHQSmt8if1+e
7hn1MVO1Lrm9Fco8ABI7dyv8/ZEwoSfh2C9rGYgA58LT1FkBRkOePbHD43xNfAYC
tfLvz6LErMnwdOW4sNMEWWAWv1fsTB35PAm5CazfKzmam9n5IQXhmUNcNvmaZtvP
c4f4g59mdsaWNtNaY96UjOfx83Om86gmdkKcnwIDAQABAoIBAQCnDBGFJuv8aA7A
ZkBLe+GN815JtOyye7lIS1n2I7En3oImoUWNaJEYwwJ8+LmjxMwDCtAkR0XwbvY+
c+nsKPEtkjb3sAu6I148RmwWsGncSRqUaJrljOypaW9dS+GO4Ujjz3/lw1lrxSUh
IqVc0E7kyRW8kP3QCaNBwArYteHreZFFp6XmtKMtXaEA3saJYILxaaXlYkoRi4k8
S2/K8aw3ZMR4tDCOfB4o47JaeiA/e185RK3A+mLn9xTDhTdZqTQpv17/YRPcgmwz
zu30fhVXQT/SuI0sO+bzCO4YGoEwoBX718AWhdLJFoFq1B7k2ZEzXTAtjEXQEWm6
01ndU/jhAasdfasdasdfasdfa3eraszxqwefasdfadasdffsFIfAsjQb4HdkmHuC
OeJrJOd+CYvdEeqJJNnF6AbHyYHIECkj0Qq1kEfLOEsqzd5nDbtkKBte6M1trbjl
HtJ2Yb8w6o/q/6Sbj7wf/cW3LIYEdeVCjScozVcQ9R83ea05J+QOAr4nAoGBAMaq
UzLJfLNWZ5Qosmir2oHStFlZpxspax/ln7DlWLW4wPB4YJalSVovF2Buo8hr8X65
lnPiE41M+G0Z7icEXiFyDBFDCtzx0x/RmaBokLathrFtI81UCx4gQPLaSVNMlvQA
539GsubSrO4LpHRNGg/weZ6EqQOXvHvkUkm2bDDJAoGATytFNxen6GtC0ZT3SRQM
WYfasdf3xbtuykmnluiofasd2sfmjnljkt7khghmghdasSDFGQfgaFoKfaawoYeH
C2XasVUsVviBn8kPSLSVBPX4JUfQmA6h8HsajeVahxN1U9e0nYJ0sYDQFUMTS2t8
RT57+WK/0ONwTWHdu+KnaJECgYEAid/ta8LQC3p82iNAZkpWlGDSD2yb/8rH8NQg
9tjEryFwrbMtfX9qn+8srx06B796U3OjifstjJQNmVI0qNlsJpQK8fPwVxRxbJS/
pMbNICrf3sUa4sZgDOFfkeuSlgACh4cVIozDXlR59Z8Y3CoiW0uObEgvMDIfenAj
98pl3ZkCgYEAj/UCSni0dwX4pnKNPm6LUgiS7QvIgM3H9piyt8aipQuzBi5LUKWw
DlQC4Zb73nHgdREtQYYXTu7p27Bl0Gizz1sW2eSgxFU8eTh+ucfVwOXKAXKU5SeI
+MbuBfUYQ4if2N/BXn47+/ecf3A4KgB37Le5SbLDddwCNxGlBzbpBa0=
-----END RSA PRIVATE KEY-----"""
file_obj = StringIO(key_str)
private_key = paramiko.RSAKey()
transport = paramiko.Transport(('10.0.1.40', 22))
transport.connect(username='wupeiqi', pkey=private_key)
ssh = paramiko.SSHClient()
ssh._transport = transport
stdin, stdout, stderr = ssh.exec_command('df')
result = stdout.read()
transport.close()
print(result)
- 執行緒池
import time
from concurrent.futures import ThreadPoolExecutor
def task(num):
time.sleep(2)
print(num)
pool = ThreadPoolExecutor(5)
for i in range(32):
# 去執行緒池中獲取一個執行緒,並執行task任務。
pool.submit(task, (i,))
pool.shutdown()
- 殺程序
import os
import signal
os.kill(123, signal.SIGKILL)