python並發編程
操作系統基礎
- 操作系統的兩大功能:
- 封裝好硬件復雜的接口,提供良好的抽象接口,運行應用程序只需要調用這些接口即可啟動相應的硬件服務,例如啟動暴風音影
- 雙擊執行文件
- 獲取應用軟件在硬盤上的存儲地址
- 操作系統會直接將硬盤上的數據讀取到內存中
- 交給cpu運行
- 管理、調度進程,並且將多個進程對硬件的競爭變得有序;
- 封裝好硬件復雜的接口,提供良好的抽象接口,運行應用程序只需要調用這些接口即可啟動相應的硬件服務,例如啟動暴風音影
系統演化
- 第二代計算機:批處理系統
- 1701主要是負責I/O操作,批量輸入和批量輸出
- 7094主要是負責計算, 運行程序
第三代計算機: 多個聯機終端 + 多道技術(針對單核)
多道技術:多道指的是多個程序,多道技術的實現是為了解決多個程序競爭或者說共享同一個資源(cpu)的有序調度問題,解決方式為多路復用,復用分為時間上的復用和空間上的復用
時間上的復用:
當一個程序在等待I/O時,另一個程序可以使用cpu,如果內存中可以存放足夠多的作業,則cpu利用率可以達到100%
操作系統采用了多道技術後,可以控制進程的切換,或者說進程之間去爭搶cpu的執行權限。這種切換不僅會在一個進程遇到io時進行,一個進程占用cpu時間過長也會切換,或者說被操作系統奪走cpu的執行權限
空間上的復用
程序之間的內存必須分割,這種分割需要在硬件層面實現,由操作系統控制。如果內存彼此不分割,則一個程序可以訪問另外一個程序的內存,
首先喪失的是安全性,比如你的qq程序可以訪問操作系統的內存,這意味著你的qq可以拿到操作系統的所有權限。
其次喪失的是穩定性,某個程序崩潰時有可能把別的程序的內存也給回收了,比方說把操作系統的內存給回收了,則操作系統崩潰。
- 第二代計算機:批處理系統
開啟子進程
- 方法1(基礎版本)
from multiprocessing import Process
import time
def task(name):
print(‘%s starts running‘ % name)
time.sleep(3)
print(‘%s finish‘ % name)
if __name__ == ‘__main__‘:
p = Process(target=task, kwargs={‘name‘:‘子進程1‘})
p.start()
print(‘主進程‘)
join方法
在主進程的任務與子進程的任務彼此獨立的情況下,主進程的任務先執行完畢後,主進程還需要等待子進程執行完畢,然後統一回收資源。
join作用 :如果主進程的任務在執行到某個階段時,需要等待子進程執行完畢後才能繼續執行,就需要有一種機制能讓主進程檢測子進程是否運行完畢,在子進程執行完畢後才能繼續執行, 否則一直在原地阻塞
查看子進程
- 可以通過
os.getpid()
查看當前進程代號,os.getppid()
查看父進程代號;
from multiprocessing import Process
import time
import os
def task():
print(‘%s is running, parent id <%s>‘ % (os.getpid(), os.getppid()))
time.sleep(4)
print(‘%s finish, parent id <%s>‘ % (os.getpid(), os.getppid()))
print("==========")
if __name__ == ‘__main__‘:
p = Process(target=task)
p.start()
print(‘主進程<%s> is running, 我的父進程<%s>‘ % (os.getpid(), os.getppid()))
- 方法2:(升級版本)
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self,name):
super().__init__()
self.name = name
def run(self):
"""方法名稱必須為run"""
print(‘%s starts running‘ % self.name)
time.sleep(3)
print(‘%s finish‘ % self.name)
if __name__ == ‘__main__‘:
p = MyProcess(‘子進程1‘)
p.start() # 會調用run方法
print(‘主進程!‘)
子進程和父進程是存儲在不同的內存空間的
- 驗證實例
from multiprocessing import Process
import time
import os
def task():
print(‘%s is running, parent id <%s>‘ % (os.getpid(), os.getppid()))
time.sleep(4)
print(‘%s finish, parent id <%s>‘ % (os.getpid(), os.getppid()))
print("==========")
if __name__ == ‘__main__‘:
p = Process(target=task)
p.start()
print(‘主進程<%s> is running, 我的父進程<%s>‘ % (os.getpid(), os.getppid()))
socket多進程代碼
# server服務器
import socket
from multiprocessing import Process
import os
def run(conn):
"""開始等待,接收數據"""
while True:
data = conn.recv(1024)
if not data: break
res = ‘來自服務端<%s>的反饋:%s‘ %(os.getpid(), data)
conn.send(res.encode(‘utf-8‘))
def server(ip, port):
"""啟動服務器模版,並生成服務器"""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((ip, port))
server.listen(5)
while True:
conn, addr = server.accept()
# 開啟子進程服務,可以同時服務多個客戶端
p = Process(target=run, args=(conn,))
p.start()
server(‘127.0.0.1‘, 8800)
# 客戶端
import socket
import time
import time
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((‘127.0.0.1‘, 8800))
msg = ‘hello from fqh‘
client.send(msg.encode((‘utf8‘)))
data = client.recv(1024)
print(data.decode(‘utf-8‘))
time.sleep(5)
守護進程
- 守護進程在主進程代碼執行完之後會立即斷開
- 設置守護進程方法:
p.daemon=True
, 且必須在p.start()
之前設置
from multiprocessing import Process
import time
def task(name):
print(‘%s starts‘ % name)
time.sleep(2)
print(‘%s ends‘ % name)
if __name__ == ‘__main__‘:
p=Process(target=task,args=(‘egon‘,))
# 一定要在p.start()前設置,設置p為守護進程,禁止p創建子進程,並且父進程代碼執行結束,p即終止運行
p.daemon=True
p.start()
time.sleep(1)
# 只要終端打印出這一行內容,那麽守護進程p也就跟著結束掉了
print(‘主‘)
互斥鎖
只需要將對共享數據修改的那一段代碼加上鎖就可以,其余的部分還是可以並行!
模擬購票系統, 運用互鎖功能實現搶票,代碼如下:
from multiprocessing import Process, Lock
import time,json
def search(name):
"""查詢剩余的票數"""
dic=json.load(open(‘db.txt‘))
time.sleep(1)
print(‘\033[43m%s 查到剩余票數%s\033[0m‘ %(name,dic[‘count‘]))
def get(name):
"""購票"""
dic=json.load(open(‘db.txt‘))
time.sleep(1) #模擬讀數據的網絡延遲
if dic[‘count‘] >0:
dic[‘count‘]-=1
time.sleep(1) #模擬寫數據的網絡延遲
json.dump(dic,open(‘db.txt‘,‘w‘))
print(‘\033[46m%s 購票成功!\033[0m‘ %name)
else:
print(‘\033[46m%s 對不起,搶票失敗!\033[0m‘ %name)
def task(name, mutex):
"""購票流程"""
search(name)
mutex.acquire()
get(name)
mutex.release()
if __name__ == ‘__main__‘:
# 主進程,模擬並發10個客戶端搶票
mutex = Lock()
for i in range(10):
name=‘<路人%s>‘ % i
p=Process(target=task,args=(name, mutex))
p.start()
隊列Queue
Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。
- maxsize是隊列中允許最大項數,省略則無大小限制。但需要明確:
- 存放的是消息而非大數據
- 用的是內存空間,因而maxsize即便是無大小限制也受限於內存大小
q.put()
方法用於插入數據,q.get()
從隊列讀取並刪除一個元素在存儲數據時候, 若隊列已經滿了再存入數據就會出現進程阻塞狀態, 可以用
q.full()
判斷隊列是否為存滿數據同樣,在取出數據時候,若隊列為空時,取數據也會出現進程阻塞狀態, 可以用
q.empty()
判斷隊列是否為空狀態
生產消費者模式
- 簡單版(單生產者及消費者模式)
from multiprocessing import Process,Queue
import time,random,os
def producer(q, name, food):
"""生產者"""
for i in range(3):
time.sleep(1)
res = ‘%s%s‘ %(food, i)
q.put(res)
print(‘\033[45m%s 生產了 %s\033[0m‘ %(name,res))
def consumer(q, name):
"""消費者"""
while True:
res = q.get()
if res == None: # 判斷是否生產完畢
print(‘序列已經清空!‘)
break
time.sleep(2)
print(‘\033[43m%s 吃 %s\033[0m‘ % (name, res))
if __name__ == ‘__main__‘:
q = Queue() # 生成容器對象
# 生產者們
p1 = Process(target=producer,args=(q, ‘egon‘, ‘包子‘))
# 消費者們
c1 = Process(target=consumer,args=(q, ‘alex‘))
# 開始
p1.start()
c1.start()
p1.join()
q.put(None) # 當生產進程執行完畢後再放進None表示結束
print(‘主進程執行結束====‘)
- 升級版(多消費者多生產者模式)
from multiprocessing import Process,Queue
import time,random,os
def producer(q, name, food):
"""生產者"""
for i in range(3):
time.sleep(1)
res = ‘%s%s‘ %(food, i)
q.put(res)
print(‘\033[45m%s 生產了 %s\033[0m‘ %(name,res))
def consumer(q, name):
"""消費者"""
while True:
res = q.get()
if res == "_a_": # 判斷是否生產完畢
print(‘%s結束消費‘%name)
break
time.sleep(2)
print(‘\033[43m%s 吃 %s\033[0m‘ % (name, res))
if __name__ == ‘__main__‘:
q = Queue() # 生成容器對象
# 生產者們
p1 = Process(target=producer, args=(q, ‘egon‘, ‘包子‘))
p2 = Process(target=producer, args=(q, ‘kate‘, ‘襪子‘))
p3 = Process(target=producer, args=(q, ‘steven‘, ‘iphone‘))
# 消費者們
c1 = Process(target=consumer, args=(q, ‘alex‘))
c2 = Process(target=consumer, args=(q, ‘frank‘))
# 生產者開始生產
p1.start()
p2.start()
p3.start()
# 消費者開始消費
c1.start()
c2.start()
# 等待生產完畢後添加結束生產標識符, 否則消費者進程會阻塞主進程,
# 註意有幾個消費者就要添加幾個結束標識符,必須結束所有的消費進程,否則依然阻塞
p1.join()
p3.join()
p2.join()
q.put(‘_a_‘)
q.put(‘_a_‘)
print(‘主進程‘)
- 最高級版本, 利用joinable_queue和守護進程
from multiprocessing import Process, JoinableQueue
import time
def producer(q, name, food):
"""生產者"""
for i in range(3):
time.sleep(1)
res = ‘%s%s‘ %(food, i)
q.put(res)
print(‘\033[45m%s 生產了 %s\033[0m‘ %(name,res))
q.join() # 當q為空的時候才會繼續主進程
def consumer(q, name):
"""消費者"""
while True:
res = q.get()
if res == "_a_": # 判斷是否生產完畢
print(‘%s結束消費‘%name)
break
time.sleep(2)
print(‘\033[43m%s 吃 %s\033[0m‘ % (name, res))
q.task_done() # 每取走一個數據都會向生產者反饋信息
if __name__ == ‘__main__‘:
q = JoinableQueue() # 生成容器對象
# 生產者們
p1 = Process(target=producer, args=(q, ‘egon‘, ‘包子‘))
p2 = Process(target=producer, args=(q, ‘kate‘, ‘襪子‘))
p3 = Process(target=producer, args=(q, ‘steven‘, ‘iphone‘))
# 消費者們
c1 = Process(target=consumer, args=(q, ‘alex‘))
c2 = Process(target=consumer, args=(q, ‘frank‘))
# 設置消費者進程為守護進程,當主程序執行完畢會自動回收
c1.daemon = True
c2.daemon = True
# 生產者開始生產
p1.start()
p2.start()
p3.start()
# 消費者開始消費
c1.start()
c2.start()
# 等待生產者進程結束
p1.join()
p3.join()
p2.join()
# 此時隊列一定為空
print(‘主進程‘)
python並發編程