1. 程式人生 > 實用技巧 >Python學習筆記Day09 - 多執行緒

Python學習筆記Day09 - 多執行緒

目錄

http://www.cnblogs.com/alex3714/articles/5230609.html

1.paramiko 基於SSH用於連線遠端伺服器並執行相關操作

# http://www.cnblogs.com/wupeiqi/articles/5095821.html
import paramiko

# 建立SSH物件
ssh = paramiko.SSHClient()
# 允許連線不在know_hosts檔案中的主機
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 連線伺服器
ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', password='123')
# # 利用RSA私鑰登入
# private_key = paramiko.RSAKey.from_private_key_file('id_rsa31')
# ssh.connect(hostname='10.0.0.41', port=52113, username='gongli', pkey=private_key)

# 執行命令,返回三個結果:標準輸入,標準輸出,標準錯誤
stdin, stdout, stderr = ssh.exec_command('df')
# 獲取命令結果,輸出或者錯誤
res,err = stdout.read(),stderr.read()
result = res if res else err
print(result.decode())

# 關閉連線
ssh.close()

2.SSH與SFTP,通過paramiko執行命令,或傳輸檔案

# 上傳下載檔案

import paramiko
transport = paramiko.Transport(('10.0.0.31', 52113))    # 建立
transport.connect(username='root', password='123456')   # 建立連線
sftp = paramiko.SFTPClient.from_transport(transport)    # 將連線交給SFTP客戶端處理
# 將location.py 上傳至伺服器 /tmp/test.py
sftp.put('筆記', '/tmp/test_from_win')
# 將remove_path 下載到本地 local_path
sftp.get('/root/oldgirl.txt', 'fromlinux.txt')

transport.close()

3.ssh 金鑰

RSA -非對稱金鑰驗證

公鑰 public  key  (天王蓋地虎)
私鑰  private key (寶塔鎮河妖)
10.0.0.31    -----> 10.0.0.41
私鑰      公鑰

4.程序和執行緒

程序: qq 要以一個整體的形式暴露給作業系統管理,裡面包含對各種資源的呼叫,
    記憶體的管理,網路介面的呼叫等。。。對各種資源管理的集合 就可以稱為程序
執行緒: 是作業系統最小的排程單位, 是一串指令的集合
    執行緒可以建立同級執行緒
程序要操作cpu , 必須要先建立一個執行緒
程序是一系列執行緒的集合,可以建立子程序,同一程序內的執行緒共享同一塊記憶體空間的資料

程序與執行緒的區別?
    1.執行緒共享記憶體空間;程序的記憶體是獨立的
    2.同一個程序的執行緒之間可以直接交流;兩個程序想通訊,必須通過一箇中間代理來實現
    3.建立新執行緒很簡單;建立新程序需要對其父程序進行一次克隆,生成之後的二者獨立
    4.一個執行緒可以控制和操作同一程序裡的其他執行緒;但是程序只能操作子程序
    5.改變程序裡的一個執行緒可能對另一個執行緒有影響;但父程序修改對子程序無影響

5.threading 多執行緒

join 等待
Daemon 守護執行緒

# 建立多執行緒
# join
# daemon

import threading
import time

def run(n):
    print("task ", n)
    time.sleep(2)
    print("task done", n)

start_time = time.time()
# t1 = threading.Thread(target=run, args=("t-1",))
# t2 = threading.Thread(target=run, args=("t-2",))

# 批量啟動大量執行緒
t_objs = []  # 存執行緒例項
for i in range(50):
    t = threading.Thread(target=run, args=("t-%s" % i,))    # 迴圈建立執行緒例項
    # 守護執行緒,當主執行緒退出時,守護執行緒也會退出,由t啟動的其它子執行緒會同時退出,不管是否執行完任務
    t.setDaemon(True)   # 守護執行緒設定一定要在start之前
    t.start()   # 啟動執行緒
    t_objs.append(t)  # 為了不阻塞後面執行緒的啟動,不在這裡join,先放到一個列表裡

# 正常情況下,主執行緒啟動了子執行緒後,主執行緒獨立執行,不等待子執行緒執行完畢
# 利用join等待某執行緒執行完畢主執行緒再繼續(有些語言是wait())
for t in t_objs:  # 迴圈執行緒例項列表,等待所有執行緒執行完畢
    t.join()

print("----------all threads has finished...")
print("cost:", time.time() - start_time)

用類的形式建立執行緒,繼承式呼叫(一般不用)

import threading
import time

class MyThread(threading.Thread):
    def __init__(self, n, sleep_time):
        super(MyThread, self).__init__()
        self.n = n
        self.sleep_time = sleep_time

    def run(self):
        print("runnint task ", self.n)
        time.sleep(self.sleep_time)
        print("task done,", self.n)

t1 = MyThread("t1", 2)
t2 = MyThread("t2", 4)

t1.start()
t2.start()

t1.join()  # =wait()
t2.join()

print("main thread....")

GIL vs lock

lock 執行緒鎖(mutex互斥鎖) 保證同一時間只有一個執行緒修改資料

# 執行緒lock,2.*上需要自己加,3.*上自動加
# 保證同一時間只有一個執行緒在修改資料

import threading
import time

def run(n):
    lock.acquire()  # 獲取鎖
    global num
    num += 1
    time.sleep(1)
    lock.release()  # 釋放執行緒鎖

lock = threading.Lock()  # 申請執行緒鎖

num = 0
t_objs = []  # 存執行緒例項
for i in range(50):
    t = threading.Thread(target=run, args=("t-%s" % i,))
    t.start()
    t_objs.append(t)

for t in t_objs:  # 迴圈執行緒例項列表,等待所有執行緒執行完畢
    t.join()

print("----------all threads has finished...", threading.active_count())

print("num:", num)

RLock 遞迴鎖 說白了就是在一個大鎖中還要再包含子鎖,以防多把鎖時弄混,實際用處少

import threading, time

def run1():
    print("grab the first part data")
    lock.acquire()
    global num
    num += 1
    lock.release()
    return num

def run2():
    print("grab the second part data")
    lock.acquire()
    global num2
    num2 += 1
    lock.release()
    return num2

def run3():
    lock.acquire()
    res = run1()
    print('--------between run1 and run2-----')
    res2 = run2()
    lock.release()
    print(res, res2)

num, num2 = 0, 0
lock = threading.RLock()    # 生成RLock鎖
for i in range(1):
    t = threading.Thread(target=run3)
    t.start()

while threading.active_count() != 1:
    print(threading.active_count())
else:
    print('----all threads done---')
    print(num, num2)

6.semaphore 訊號量 允許n個執行緒同時執行,多執行緒時加快執行速度

import threading, time

def run(n):
    semaphore.acquire()
    time.sleep(1)
    print("run the thread: %s\n" % n)
    semaphore.release()

if __name__ == '__main__':
    semaphore = threading.BoundedSemaphore(5)  # 最多允許5個執行緒同時執行
    for i in range(22):
        t = threading.Thread(target=run, args=(i,))
        t.start()
while threading.active_count() != 1:
    pass  # print threading.active_count()
else:
    print('----all threads done---')
    #print(num)

7.events 處理多個執行緒間的互動,例如紅綠燈,set,clear,wait,is_set

import time
import threading

event = threading.Event()  # 生成event物件

def lighter():
    count = 0
    event.set()  # 先設定標誌位(綠燈)
    while True:
        if 5 < count < 10:
            event.clear()  # 把標誌位清空(紅燈)
            print("\033[41;1mred light is on....\033[0m")
        elif count > 10:
            event.set()  # 重新設定標誌位(綠燈)
            count = 0
        else:
            print("\033[42;1mgreen light is on....\033[0m")
        time.sleep(1)
        count += 1


def car(name):
    while True:
        if event.is_set():  # 標誌位被設定,代表綠燈
            print("[%s] running..." % name)
            time.sleep(1)
        else:
            print("[%s] sees red light , waiting...." % name)
            event.wait()    # 等待event被設定
            print("\033[34;1m[%s] green light is on, start going...\033[0m" % name)


light = threading.Thread(target=lighter, )      # 生成一個執行緒處理紅綠燈
light.start()

car1 = threading.Thread(target=car, args=("Tesla",))    # 生成一個執行緒處理車
car1.start()

8.queue 佇列,put,get,qsize

執行緒queue,只能在當前程序訪問
Queue 模組中的常用方法:
Queue.qsize() 返回佇列的大小
Queue.empty() 如果佇列為空,返回True,反之False
Queue.full() 如果佇列滿了,返回True,反之False
Queue.full 與 maxsize 大小對應
Queue.get([block[, timeout]])獲取佇列,timeout等待時間
Queue.get_nowait() 相當Queue.get(False)
Queue.put(item) 寫入佇列,timeout等待時間
Queue.put_nowait(item) 相當Queue.put(item, False)
Queue.task_done() 在完成一項工作之後,Queue.task_done()函式向任務已經完成的佇列傳送一個訊號
Queue.join() 實際上意味著等到佇列為空,再執行別的操作

# 執行緒queue,只能在當前程序訪問
import queue
# 按優先順序排序
q = queue.PriorityQueue()

q.put((-1, "chenronghua"))
q.put((3, "hanyang"))
q.put((10, "alex"))
q.put((6, "wangsen"))

print(q.get())
print(q.get())
print(q.get())
print(q.get())

# # 後入先出
# q  = queue.LifoQueue()
#
# q.put(1)
# q.put(2)
# q.put(3)
# print(q.get())
# print(q.get())
# print(q.get())