1. 程式人生 > 程式設計 >Python佇列、程序間通訊、執行緒案例

Python佇列、程序間通訊、執行緒案例

程序互斥鎖

多程序同時搶購餘票

# 併發執行,效率高,但競爭寫同一檔案,資料寫入錯亂
# data.json檔案內容為 {"ticket_num": 1}
import json
import time
from multiprocessing import Process
def search(user):
  with open('data.json','r',encoding='utf-8') as f:
    dic = json.load(f)
  print(f'使用者{user}檢視餘票,還剩{dic.get("ticket_num")}...')
def buy(user):
  with open('data.json',encoding='utf-8') as f:
    dic = json.load(f)

  time.sleep(0.1)
  if dic['ticket_num'] > 0:
    dic['ticket_num'] -= 1
    with open('data.json','w',encoding='utf-8') as f:
      json.dump(dic,f)
    print(f'使用者{user}搶票成功!')

  else:
    print(f'使用者{user}搶票失敗')
def run(user):
  search(user)
  buy(user)
if __name__ == '__main__':
  for i in range(10): # 模擬10個使用者搶票
    p = Process(target=run,args=(f'使用者{i}',))
    p.start()

使用鎖來保證資料安全

# data.json檔案內容為 {"ticket_num": 1}
import json
import time
from multiprocessing import Process,Lock
def search(user):
  with open('data.json',encoding='utf-8') as f:
    dic = json.load(f)

  time.sleep(0.2)
  if dic['ticket_num'] > 0:
    dic['ticket_num'] -= 1
    with open('data.json',f)
    print(f'使用者{user}搶票成功!')

  else:
    print(f'使用者{user}搶票失敗')
def run(user,mutex):
  search(user)
  mutex.acquire() # 加鎖
  buy(user)
  mutex.release() # 釋放鎖
if __name__ == '__main__':
  # 呼叫Lock()類得到一個鎖物件
  mutex = Lock()

  for i in range(10): # 模擬10個使用者搶票
    p = Process(target=run,mutex))
    p.start()

程序互斥鎖:

讓併發變成序列,犧牲了執行效率,保證了資料安全

在程式併發時,需要修改資料使用

佇列

佇列遵循的是先進先出

佇列:相當於記憶體中一個佇列空間,可以存放多個數據,但資料的順序是由先進去的排在前面。

q.put() 新增資料

q.get() 取資料,遵循佇列先進先出

q.get_nowait() 獲取佇列資料, 佇列中沒有就會報錯

q.put_nowait 新增資料,若佇列滿了也會報錯

q.full() 檢視佇列是否滿了

q.empty() 檢視佇列是否為空

from multiprocessing import Queue

# 呼叫佇列類,例項化佇列物件
q = Queue(5)  # 佇列中存放5個數據

# put新增資料,若佇列裡的資料滿了就會卡住
q.put(1)
print('進入資料1')
q.put(2)
print('進入資料2')
q.put(3)
print('進入資料3')
q.put(4)
print('進入資料4')
q.put(5)
print('進入資料5')

# 檢視佇列是否滿了
print(q.full())

# 新增資料,若佇列滿了也會報錯
q.put_nowait(6)

# q.get() 獲取的資料遵循先進先出
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
# print(q.get())
print(q.get_nowait())  # 獲取佇列資料, 佇列中沒有就會報錯

# 判斷佇列是否為空
print(q.empty())
q.put(6)
print('進入資料6')

程序間通訊

IPC(Inter-Process Communication)

程序間資料是相互隔離的,若想實現程序間通訊,可以利用佇列

from multiprocessing import Process,Queue
def task1(q):
  data = 'hello 你好'
  q.put(data)
  print('程序1新增資料到佇列')
def task2(q):
  print(q.get())
  print('程序2從佇列中獲取資料')
if __name__ == '__main__':
  q = Queue()

  p1 = Process(target=task1,args=(q,))
  p2 = Process(target=task2,))
  p1.start()
  p2.start()
  print('主程序')

生產者與消費者

在程式中,通過佇列生產者把資料新增到佇列中,消費者從佇列中獲取資料

from multiprocessing import Process,Queue
import time


# 生產者
def producer(name,food,q):
  for i in range(10):
    data = food,i
    msg = f'使用者{name}開始製作{data}'
    print(msg)
    q.put(data)
    time.sleep(0.1)
# 消費者
def consumer(name,q):
  while True:
    data = q.get()
    if not data:
      break

    print(f'使用者{name}開始吃{data}')
if __name__ == '__main__':
  q = Queue()
  p1 = Process(target=producer,args=('neo','煎餅',q))
  p2 = Process(target=producer,args=('wick','肉包',q))

  c1 = Process(target=consumer,args=('cwz',q))
  c2 = Process(target=consumer,args=('woods',q))

  p1.start()
  p2.start()
  
  c1.daemon = True
  c2.daemon = True
  c1.start()
  c2.start()
  print('主')

執行緒

執行緒的概念

程序與執行緒都是虛擬單位

程序:資源單位

執行緒:執行單位

開啟一個程序,一定會有一個執行緒,執行緒才是真正執行者

開啟程序:

  • 開闢一個名稱空間,每開啟一個程序都會佔用一份記憶體資源
  • 會自帶一個執行緒

開啟執行緒:

  • 一個程序可以開啟多個執行緒
  • 執行緒的開銷遠小於程序

注意:執行緒不能實現並行,執行緒只能實現併發,程序可以實現並行

執行緒的兩種建立方式

from threading import Thread
import time
# 建立執行緒方式1
def task():
  print('執行緒開啟')
  time.sleep(1)
  print('執行緒結束')

if __name__ == '__main__':
  t = Thread(target=task)
  t.start()
# 建立執行緒方式2
class MyThread(Thread):
  def run(self):
    print('執行緒開啟...')
    time.sleep(1)
    print('執行緒結束...')
if __name__ == '__main__':
  t = MyThread()
  t.start()

執行緒物件的方法

from threading import Thread
from threading import current_thread
import time

def task():
  print(f'執行緒開啟{current_thread().name}')
  time.sleep(1)
  print(f'執行緒結束{current_thread().name}')
if __name__ == '__main__':
  t = Thread(target=task)
  print(t.isAlive())
  # t.daemon = True
  t.start()
  print(t.isAlive())

執行緒互斥鎖

執行緒之間資料是共享的

from threading import Thread
from threading import Lock
import time

mutex = Lock()
n = 100

def task(i):
  print(f'執行緒{i}啟動')
  global n
  mutex.acquire()
  temp = n
  time.sleep(0.1)
  n = temp - 1
  print(n)
  mutex.release()
  
if __name__ == '__main__':
  t_l = []
  for i in range(100):
    t = Thread(target=task,args=(i,))
    t_l.append(t)
    t.start()

  for t in t_l:
    t.join()

  print(n)

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。