python 執行緒
程序 執行緒的理解
- 程式: /bin/firefox一個二進位制程式, 也可以包含任意可執行的程式, 是一個真實存在的實體。
- 程序: 執行程式過程中產生一系列內容.(程序資訊儲存)
- 執行緒: 每個程序裡面至少包含一個主執行緒, (firefox裡面有多個table)
- 多程序: 瀏覽器,網易雲音樂以及pycharm 三個軟體只能順序執行是怎樣一種場景呢?本來在python程式設計時想要聽個古典音樂來點靈感, 結果發現,對於CPU來說, 一次只能執行一個程式, 太崩潰, 另外,假如有兩個程式A和B,程式A在執行到一半的過程中,需要讀取大量的資料輸入(I/O操作),而此時CPU只能靜靜 地等待任務A讀取完資料才能繼續執行,這樣就白白浪費了CPU資源。你是不是已經想到在程式A讀取資料的過程中,讓程式B去執行,當程式A讀取完資料之後,讓程式B暫停。這裡有一個關鍵詞:切換。
- 多執行緒:多執行緒就是指一個程序中同時有多個執行路徑(執行緒)正在執行。但在ptthon中,有一個GIL鎖:Global Interpreter Lock,任何Python執行緒執行前,必須先獲得GIL鎖。多執行緒程式設計,模型複雜,容易發生衝突,必須用鎖加以隔離,同時,又要小心死鎖的發生。 Python直譯器由於設計時有GIL全域性鎖,導致了多執行緒無法利用多核。多執行緒的併發在Python中就是一個美麗的夢。
建立執行緒及建立多執行緒
建立執行緒
import threading import time def job(): print("這是一個需要執行的任務") # 啟用的執行緒個數 print("當前執行緒的個數:",threading.active_count()) # 列印當前執行緒的詳細資訊 print("當前執行緒資訊:",threading.current_thread()) time.sleep(5) if __name__ == '__main__': job()
建立多執行緒
- _thread模組(不常用)
import time import _thread def job(name): print("這是一個需要執行的任務") print(name,time.ctime()) time.sleep(5) if __name__ == '__main__': # 建立多個執行緒, 但是沒有開始執行任務; _thread.start_new_thread(job,('thread1',)) _thread.start_new_thread(job, ('thread2',)) while True: pass
- threading模組 通過例項化物件實現
import threading
import time
def job(name):
print("這是一個需要執行的任務")
print("當前執行緒的個數:",threading.active_count())
print("當前執行緒資訊:",threading.current_thread())
time.sleep(5)
print(name,time.ctime())
if __name__ == '__main__':
job('job0')
# 建立多個執行緒
t1 = threading.Thread(target=job,name='job1',args=("job1-name",))
t2 = threading.Thread(target=job, name='job2', args=("job2-name",))
t1.start()
t2.start()
# 等待所有的子執行緒執行結束之後, 繼續執行主執行緒的內容;
t1.join()
t2.join()
print('hello')
- threading 通過繼承實現
import threading
import time
# 類的繼承
class Job(threading.Thread):
# 重寫構造方法;如果執行的任務需要傳遞引數, 那將引數通過建構函式與self繫結;
def __init__(self,jobname):
super(Job,self).__init__()
self.jobname=jobname
# 將多執行緒需要執行的任務重寫到run方法中;
def run(self):
print("這是一個需要執行的任務")
print("當前執行緒的個數:", threading.active_count())
print("當前執行緒資訊:", threading.current_thread())
time.sleep(5)
print(self.jobname,time.ctime())
if __name__=='__main__':
t1 = Job(jobname='job1')
t2 = Job(jobname='job2')
t1.start()
t2.start()
t1.join()
t2.join()
print('Suceess')
用多執行緒和不用多執行緒效率對比
import threading
import time
from mytimeit import timeit
def music(name):
for i in range(2):
print("正在聽音樂%s"%(name))
time.sleep(1)
def code(name):
for i in range(2):
print("正在編寫程式碼%s"%(name))
time.sleep(2)
@timeit
def use_thread():
t1 = threading.Thread(target=music,args=("去年夏天",))
t2 = threading.Thread(target=code, args=("爬蟲",))
t1.start()
t2.start()
t1.join()
t2.join()
@timeit
def not_use_thread():
music("去年夏天")
code("爬蟲")
if __name__=='__main__':
use_thread()
not_use_thread()
多執行緒的join方法
join方法: 會等待, 直到t1執行緒執行結束;阻塞正在呼叫的執行緒
def job(name):
time.sleep(1)
print(name)
t1 = threading.Thread(target=job,args=('job1',))
t1.start()
t1.join() # 會等待, 直到t1執行緒執行結束;阻塞正在呼叫的執行緒
t2 = threading.Thread(target=job,args=('job2',))
t2.start()
t2.join() # 會等待, 直到t2執行緒執行結束;阻塞正在呼叫的執行緒
print("main thread end")
守護執行緒
將t1執行緒生命為守護執行緒, 如果設定為True, 子執行緒啟動, 當主執行緒執行結束, 子執行緒也結束 t1.setDaemon(True)
# 當主執行緒執行結束, 讓沒有執行的執行緒強制結束;set_daemon
import threading
import time
def music(name):
for i in range(2):
print("正在聽音樂%s"%(name))
time.sleep(1)
def code(name):
for i in range(2):
print("正在編寫程式碼%s"%(name))
time.sleep(2)
if __name__=='__main__':
start_time=time.time()
t1 = threading.Thread(target=music,args=('去年夏天',))
t2 = threading.Thread(target=code,args=('爬蟲',))
# 將t1執行緒生命為守護執行緒, 如果設定為True, 子執行緒啟動, 當主執行緒執行結束, 子執行緒也結束
# 設定setDaemon必須在啟動執行緒之前進行設定;
t1.setDaemon(True)
t2.setDaemon(True)
t1.start()
t2.start()
print('花費時間:%s'%(time.time()-start_time))
執行緒鎖
為什麼會需要執行緒鎖? 多個執行緒對同一個資料進行修改時, 肯能出現不可預料的情況.
import threading
def add(lock):
# 操作變數之前進行加鎖
lock.acquire()
global money
for i in range(1000000):
money += 1
# 操作變數完成後進行解鎖
lock.release()
def reduce(lock):
# 操作變數之前進行加鎖
lock.acquire()
global money
for i in range(1000000):
money -= 1
# 操作變數完成後進行解鎖
lock.release()
if __name__=='__main__':
money=0
# 例項化一個鎖物件
lock = threading.Lock()
t1 = threading.Thread(target=add,args=(lock,))
t2 = threading.Thread(target=reduce, args=(lock,))
t1.start()
t2.start()
t1.join()
t2.join()
print("最終金額為:%s" %(money))
若不枷鎖: 枷鎖:
GIL全域性直譯器
python使用多執行緒 , 是個好主意麼? 為什麼? - GIL(全域性直譯器鎖) - python直譯器預設每次只允許一個執行緒執行 執行過程: 1). 設定GIL 2). 切換到執行緒去執行對應的任務; 3). 執行 - 執行完了 - time.sleep() - 獲取其他資訊才能繼續執行, eg: 從網路上獲取網頁資訊等; 3. 把執行緒設定為睡眠狀態 4. 解鎖GIL 5.再次重複執行上述內容; python直譯器:Cpython直譯器, Jpython直譯器, p-python直譯器
方法的選擇: Python並不支援真正意義上的多執行緒。Python中提供了多執行緒包,但是如果你想通過多執行緒提高程式碼的速度, 使用多執行緒包並不是個好主意。Python中有一個被稱為Global Interpreter Lock(GIL)的東西, 它會確保任何時候你的多個執行緒中,只有一個被執行。執行緒的執行速度非常之快,會讓你誤以為執行緒是並行執行的, 但是實際上都是輪流執行。經過GIL這一道關卡處理,會增加執行的開銷。這意味著,如果你想提高程式碼的執行速度, 使用threading包並不是一個很好的方法。 # I/O密集型操作: 多執行緒操作 # CPU/計算密集型:多程序操作
import threading
from mytimeit import timeit
def job(l):
sum(l)
@timeit
def use_thread():
li = range(1,10000)
for i in range(5):
t = threading.Thread(target=job,args=(li,))
t.start()
@timeit
def use_no_thread():
li = range(1,10000)
job(li)
if __name__=='__main__':
use_thread()
use_no_thread()
多執行緒資料的儲存與傳送 -----佇列
1). 理論上多執行緒執行任務, 會產生一些資料, 為其他程式執行作鋪墊; 2). 多執行緒是不能返回任務執行結果的, 因此需要一個容器來儲存多執行緒產生的資料 3). 這個容器如何選擇? list(棧, 佇列)
import threading
from queue import Queue
def job(l,queue):
# 將任務的結果儲存到佇列中;
queue.put(sum(l))
def use_thread():
# 例項化一個佇列, 用來儲存每個執行緒執行的結果;
q =Queue()
li= [[1,2,3,4,5], [2,3,4,5,6], [2,3,4,5,6,7,8], [2,3,4,5,6]]
threads = []
for i in li:
t = threading.Thread(target=job,args=(i,q))
threads.append(t)
t.start()
# join方法等待所有子執行緒之心結束
[thread.join() for thread in threads]
# 從佇列裡面拿出所有的執行結果
result = [q.get() for _ in li]
print(result)
if __name__=="__main__":
use_thread()
生產者消費者模型
給定200個ip地址, 可能開放埠為80, 443, 7001, 7002, 8000, 8080, 9000(flask), 9001 以http://ip:port形式訪問頁面以判斷是否正常訪問. 1). 任務1:構建所有的url地址;===儲存到一個數據結構中 2). 任務2:依次判斷url址是否可以成功訪問 用列表儲存:
import threading
from urllib.request import urlopen
def creat_data():
with open('ips.txt','w') as f:
for i in range(200):
f.write('172.25.254.%d\n'%(i+1))
def creat_url():
portlist = [80,443,7001,7002,8000,8080]
with open('ips.txt') as f:
ips = [ip.strip()for ip in f]
urls = ['http://%s:%s'%(ip,port) for ip in ips for port in portlist]
return urls
def job(url):
try:
urlobj=urlopen(url)
except Exception as e:
print("%s unknown url"%(url))
else:
print("%s is ok"%(url))
if __name__=='__main__':
creat_data()
urls = creat_url()
threads = []
for url in urls:
t = threading.Thread(target=job,args=(url,))
threads.append(t)
t.start()
[thread.join() for thread in threads]
用佇列:
import threading
from queue import Queue
from urllib.request import urlopen
class Producer(threading.Thread):
def __init__(self, queue):
super(Producer, self).__init__()
self.queue = queue
def run(self):
portlist = [80, 443, 7001, 7002, 8000, 8080]
with open('ips.txt') as f:
ips = [ip.strip() for ip in f]
# 每生產一個url地址, 就將生產的資料放到佇列裡面;
for ip in ips:
for port in portlist:
url = 'http://%s:%s' % (ip, port)
self.queue.put(url)
# urls = ['http://%s:%s' % (ip, port) for ip in ips for port in portlist]
# return urls
class Consumer(threading.Thread):
def __init__(self, queue):
super(Consumer, self).__init__()
self.queue = queue
def run(self):
try:
url = self.queue.get()
urlObj = urlopen(url)
except Exception as e:
print("%s unknown url" % (url))
else:
print("%s is ok" % (url))
if __name__ == '__main__':
queue = Queue()
# 一個執行緒物件, 生產者
p1 = Producer(queue)
p1.start()
# 消費者啟動多個執行緒(啟動20個)
for i in range(20):
c1 = Consumer(queue)
c1.start()
多執行緒獲取IP地址
import json
import threading
from urllib.request import urlopen
from mytimeit import timeit
def job(ip):
#API
url = "http://ip.taobao.com/service/getIpInfo.php?ip=%s" % (ip)
# 根據url獲取網頁的內容, 並且解碼為utf-8格式, 識別中文;
text = urlopen(url).read().decode('utf-8')
# 將獲取的字串型別轉換為字典, 方便處理
d = json.loads(text)['data']
country = d['country']
city = d['city']
print("%s:"%(ip),country,city)
@timeit
def has_many_thread():
threads = []
ips = ['172.25.254.78', '8.8.8.8',
'172.25.254.78', '8.8.8.8',
'172.25.254.78', '8.8.8.8']
for ip in ips:
# 例項化執行緒物件
t = threading.Thread(target=job,args=(ip,))
threads.append(t)
t.start()
[thread.join() for thread in threads]
@timeit
def has_no_thread():
ips = ['172.25.254.250', '8.8.8.8',
'172.25.254.250', '8.8.8.8',
'172.25.254.250', '8.8.8.8']
for ip in ips:
job(ip)
if __name__=='__main__':
has_many_thread()
has_no_thread()
ThreadPool執行緒池
注意: python3.2版本以後才可以使用;
from concurrent.futures import ThreadPoolExecutor
import time
def job():
print("this is a job")
return 'hello'
if __name__=='__main__':
# 例項化物件, 執行緒池包含10個執行緒來處理任務;
pool = ThreadPoolExecutor(max_workers=10)
# 往執行緒池裡面扔需要執行的任務, 返回一個物件,( _base.Future例項化出來的)
f1 = pool.submit(job)
f2 = pool.submit(job)
# 判斷任務是否執行結束
print(f1.done())
time.sleep(1)
print(f2.done())
# 獲取任務執行的結果
print(f1.result())
print(f2.result())
執行緒池與map函式
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.error import HTTPError
from urllib.request import urlopen
from mytimeit import timeit
URLS = ['http://httpbin.org', 'http://example.com/',
'https://api.github.com/'] * 10
def get_page(url,timeout=3):
try:
content = urlopen(url).read()
return {'url':url,'len':len(content)}
except HTTPError as e:
return {'url':url,'len':0}
@timeit
def method_1():
# 方法1: submit提交任務
pool =ThreadPoolExecutor(max_workers=20)
futuresobj = [pool.submit(get_page,url) for url in URLS]
# 注意: 傳遞的時包含futures物件的序列, as_complete, 返回已經執行完任務的future物件,
# 直到所有的future對應的任務執行完成, 迴圈結束;
# for finish_fs in as_completed(futuresobj):
# print(finish_fs.result())
for future in futuresobj:
print(future.result())
@timeit
def method_2():
# 方法2:通過map方式執行
pool = ThreadPoolExecutor(max_workers=20)
for res in pool.map(get_page,URLS):
print(res)
method_2()
多執行緒批量管理主機
# 基於ssh用於連線遠端伺服器做操作:遠端執行命令, 上傳檔案, 下載檔案
import threading
from concurrent.futures import ThreadPoolExecutor
import paramiko
from paramiko.ssh_exception import NoValidConnectionsError, AuthenticationException
def connect(cmd, hostname, port=22, user='root'):
# ssh [email protected]
# 建立一個ssh物件;
client = paramiko.SSHClient()
# 返回一個私鑰物件
private_key = paramiko.RSAKey.from_private_key_file('id_rsa')
# 2. 解決問題:如果之前沒有;連線過的ip, 會出現
# Are you sure you want to continue connecting (yes/no)? yes
# 自動選擇yes
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
# 3. 連線伺服器
client.connect(hostname=hostname,
port=port,
username=user,
pkey=private_key
)
# 4. 執行操作
stdin, stdout, stderr = client.exec_command(cmd)
except NoValidConnectionsError as e:
print("%s連線失敗" %(hostname))
except AuthenticationException as e:
print("%s密碼錯誤" %(hostname))
else:
# 5. 獲取命令的執行結果;
result = stdout.read().decode('utf-8')
print("%s執行結果:" %(hostname), result)
finally:
# 6. 關閉連線
client.close()
#******方法1:例項化物件實現*********
def method_1():
threads = []
for count in range(254):
host = '172.25.254.%s' % (count + 1)
# print(host.center(50, '*'))
t = threading.Thread(target=connect, args=('uname', host))
threads.append(t)
t.start()
# join方法, 等待所有的子執行緒執行結束;
_ = [thread.join() for thread in threads]
print("任務執行結束........")
# ******方法2: 執行緒池只有50個執行緒處理所有的任務********
def method_2():
# 建立執行緒池物件
pool = ThreadPoolExecutor(max_workers=50)
# 依次向執行緒池提交任務
for count in range(254):
host = '172.25.254.%s' % (count + 1)
pool.submit(connect, 'uname', host)
method_2()
繼承的方式實現:
import threading
import paramiko
from paramiko.ssh_exception import NoValidConnectionsError, AuthenticationException
class IpThread(threading.Thread):
def __init__(self,cmd,hostname,port=22,user='root'):
super(IpThread, self).__init__()
self.cmd=cmd
self.hostname=hostname
self.port=port
self.user=user
def run(self):
client = paramiko.SSHClient()
private_key = paramiko.RSAKey.from_private_key_file('id_rsa')
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect(hostname=self.hostname,
port=self.port,
username=self.user,
pkey=private_key
)
stdin, stdout, stderr = client.exec_command(self.cmd)
except NoValidConnectionsError as e:
print("%s連線失敗" % (self.hostname))
except AuthenticationException as e:
print("%s密碼錯誤" % (self.hostname))
except TimeoutError as e:
print("%s連線超時" % (self.hostname))
else:
result = stdout.read().decode('utf-8')
print('%s' % (self.hostname), result)
finally:
client.close()
def use_thread():
threads = []
ips = ['172.25.254.%s' %(i) for i in range(1,254)]
for ip in ips:
t = IpThread(cmd='uname',hostname=ip)
threads.append(t)
t.start()
[thread.join() for thread in threads]
if __name__ == '__main__':
use_thread()