1. 程式人生 > 程式設計 >python Event事件、程序池與執行緒池、協程解析

python Event事件、程序池與執行緒池、協程解析

Event事件

用來控制執行緒的執行

出現e.wait(),就會把這個執行緒設定為False,就不能執行這個任務;

只要有一個執行緒出現e.set(),就會告訴Event物件,把有e.wait的使用者全部改為True,剩餘的任務就會立馬去執行。由一些執行緒去控制另一些執行緒,中間通過Event。

from threading import Event
from threading import Thread
import time
# 呼叫Event例項化出物件
e = Event()
#
# # 若該方法出現在任務中,則為False,阻塞
# e.wait() # False
# # 若該方法出現在任務中,則將其他執行緒的False改為True,進入就緒態和執行態
# e.set() # True
def light():
  print('紅燈亮...')
  time.sleep(5)
  # 應該發出訊號,告訴其他執行緒準備執行
  e.set()  # 將car中的False變為True
  print('綠燈亮...')
def car(name):
  print('正在等紅燈...')
  # 讓所有汽車任務進入阻塞態
  e.wait() # False
  print(f'{name}正在加速飄逸...')
# 讓一個light執行緒控制多個car執行緒
t = Thread(target=light)
t.start()

for i in range(10):
  t = Thread(target=car,args=(f'汽車{i}號',))
  t.start()

程序池與執行緒池

程序池與執行緒池是用來控制當前程式允許建立(程序/執行緒)的數量

作用:保證在硬體允許的範圍內建立(程序/執行緒)的數量

執行緒池使用一:

from concurrent.futures import ThreadPoolExecutor
import time
pool = ThreadPoolExecutor(5) # 5代表只能開啟5個程序,不加預設使用cpu的程序數
# ThreadPoolExecutor(5) # 5代表只能開啟5個執行緒
# pool.submit() #非同步提交任務, 括號裡傳函式地址
def task():
  print('執行緒任務開始了...')
  time.sleep(1)
  print('執行緒任務結束了...')
for line in range(5):
  pool.submit(task)

使用二:

from concurrent.futures import ThreadPoolExecutor
import time

pool = ThreadPoolExecutor(5) # 5代表只能開啟5個程序,不加預設使用cpu的程序數

# ThreadPoolExecutor(5) # 5代表只能開啟5個執行緒
# pool.submit() #非同步提交任務, 括號裡傳函式地址
def task():
  print('執行緒任務開始了...')
  time.sleep(1)
  print('執行緒任務結束了...')
  return 123
# 回撥函式
def call_back(res):
  print(type(res))
  res2 = res.result() # 注意:賦值操作不要與接收的res同名
  print(res2)
for line in range(5):
  pool.submit(task).add_done_callback(call_back)

pool.shutdown() 會讓所有執行緒池的任務結束後,才往下執行程式碼

多執行緒爬取梨視訊

利用requests模組,封裝底層socket套接字

  • 主頁中獲取所有視訊id號,拼接視訊詳情頁url
  • 在視訊詳情頁中獲取真實視訊url srcUrl=
  • 往真實視訊url地址傳送請求獲取 視訊 二進位制資料
  • 最後把視訊二進位制資料儲存到本地

協程

  • 程序: 資源單位
  • 執行緒: 執行單位
  • 協程: 在單執行緒下實現併發

注意: 協程不是作業系統資源,目的是讓單執行緒實現併發

協程目的

  • 作業系統:使用多道技術,切換 + 儲存狀態,一個是遇到IO, 另一個是CPU執行時間過長
  • 協程:通過手動模擬作業系統 “多道計數”, 實現 切換 + 儲存狀態
    • 手動實現,遇到IO切換,欺騙作業系統誤以為沒有IO操作
    • 單執行緒時,遇到IO,就切換 + 儲存狀態
    • 單執行緒時,對於計算密集型,來回切換 + 儲存狀態反而效率更低

優點:在IO密集型的情況下,會提高效率

缺點:若在計算密集型的情況下,來回切換,反而效率更低

import time

def func1():
  for i in range(10000000):
    i+1

def func2():
  for i in range(10000000):
    i+1
start = time.time()
func1()
func2()
stop = time.time()
print(stop - start)  # 1.0312113761901855


# 基於yield實現併發 在計算密集型的情況下效率更低

def func1():
  while True:
    10000000+1
    yield

def func2():
  g = func1()
  for i in range(10000000):
    i+1
    next(g)  # 每次執行next相當於切換到func1下面
start = time.time()
func2()
stop = time.time()
print(stop - start) # 1.3294126987457275

gevent

gevent是一個第三方模組,可以幫你監聽IO操作,並切換

使用gevent的目的:在單執行緒下實現,遇到IO就會 儲存狀態 + 切換

import time
from gevent import monkey

monkey.patch_all() # 可以監聽該程式下所有的IO操作
from gevent import spawn,joinall # 用於做切換 + 儲存狀態
def func1():
  print('1')
  time.sleep(1) # IO操作
def func2():
  print('2')
  time.sleep(3)
def func3():
  print('3')
  time.sleep(5)
start = time.time()
s1 = spawn(func1)
s2 = spawn(func2)
s3 = spawn(func3)

s1.join() # 傳送訊號,相當於等待自己(在單執行緒的情況下)
s2.join()
s3.join()

# joinall((s1,s2,s3)) # 一個個執行很麻煩,可以用joinall把這些全部裝進去

end = time.time()
print(end - start) # 5.006161451339722

TCP服務端socket套接字實現協程

服務端:

from gevent import monkey
from gevent import spawn
import socket

monkey.patch_all()
server = socket.socket()
server.bind(('127.0.0.1',9999))
server.listen(5)

def task(conn):
  while True:
    try:
      data = conn.recv(1024)
      if len(data) == 0:
        break
      print(data.decode('utf-8'))
      send_data = data.upper()
      conn.send(send_data)

    except Exception:
      break
  conn.close()
def server2():
  while True:
    conn,addr = server.accept()
    print(addr)
    spawn(task,conn)
if __name__ == '__main__':
  s = spawn(server2)
  s.join()

客戶端:

import socket
from threading import Thread,current_thread

def client():
  client = socket.socket()
  client.connect(('127.0.0.1',9999))

  number = 0
  while True:
    send_data = f'{current_thread().name} {number}'
    client.send(send_data.encode('utf-8'))

    data = client.recv(1024)
    print(data.decode('utf-8'))
    number += 1


for i in range(400):
  t = Thread(target=client)
  t.start()

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。