1. 程式人生 > >day43 Pyhton 併發程式設計06

day43 Pyhton 併發程式設計06

一.內容回顧

  執行緒

  鎖

    為什麼有了GIL之後還需要鎖

      多個執行緒同時操作全域性變數還需要鎖

      當出現'非原子性操作',例如+= -= *= /=

    l.append(l) 原子性操作

    a += 1  a= a+1

      tmp = a +1

      a = tmp

  死鎖現象

    什麼是死鎖現象

      兩個以上的執行緒爭搶同一把鎖

      其中一個執行緒獲取到鎖之後不釋放

      另外的其他執行緒就都被鎖住了

      比較容易出現問題的情況: 兩把鎖套在一起用了

      死鎖現象的本質 :程式碼邏輯問題

  遞迴鎖

    一把鎖在同一個執行緒中acquire多次而不被阻塞

    如果另外的執行緒想要使用,必須release相同的次數

    才能釋放鎖給其他執行緒

  訊號量

    控制幾個執行緒同一時刻只能有n個執行緒執行某一段程式碼

    鎖 + 計數器

  事件

    兩件事情

    一件事情要想執行依賴於另一個任務的結果

  條件

    n個執行緒在某處阻塞

    由另一個執行緒控制這n個執行緒中有多少個執行緒能繼續執行

  定時器

    規定某一個執行緒在開啟之後的n秒之後執行

  佇列\棧\優先順序佇列

    import queue

    執行緒之間資料安全

    多個執行緒get不可能同時取走一個數據,導致資料的重複獲取

    多個執行緒put也不可能同時存入一個數據,導致資料的丟失

    佇列 先進先出

    棧    先進後出

    優先順序  優先順序高的先出

  執行緒池

    concurrent.futrues

    ThreadPoolExcuter

    ProcessPoolExcuter

  submit 非同步提交任務

  shutdown 等待池內任務完成

  result 獲取程序函式的返回值

  map  非同步提交任務的簡便用法

  add_done_callback  回撥函式

    程序 主程序執行

    執行緒

今日內容

  協程

  基礎概念

  實現方式

 

  擴充套件模組

  第三方模組的安裝

  

  IO模型

  IO多路複用

協程

  程序  計算機中最小的資源分配單位

  執行緒  計算機中能被CPU排程的最小單位

  執行緒是由作業系統建立的,開啟和銷燬仍然佔用一些時間

  排程

    1.一條執行緒陷入阻塞之後,這一整條執行緒就不能再做其他事情了

    2.開啟和銷燬多條執行緒以及cpu在多條執行緒之間切換仍然依賴作業系統

  你瞭解協程

    瞭解

    協程(纖程,輕型執行緒)

    對於作業系統來說協程是不可見的,不需要作業系統排程

    協程是程式級別的操作單位

  協程效率高不高

    和作業系統本身沒有關係,和執行緒也沒有關係

    而是看程式的排程是否合理

  協程指的只是在同一條執行緒上能夠互相切換的多個任務

  遇到io就切換實際上是我們利用協程提高執行緒工作效率的一種方式

 

  切換 + 狀態儲存  yield

import time
def consumer(res):
    '''任務1:接收資料,處理資料'''
    pass

def producer():
    '''任務2:生產資料'''
    res=[]
    for i in range(10000000):
        res.append(i)
    return res

start=time.time()
# res=producer()
consumer(res) #寫成consumer(producer())會降低執行效率
stop=time.time()
print(stop-start)
import time
def consumer():
    while True:
        res = yield

def producer():
    g = consumer()
    next(g)
    for i in range(10000000):
        g.send(i)
start =time.time()
producer()
print(time.time() - start)

# yield這種切換 就已經在一個執行緒中出現了多個任務,這多個任務之前的切換 本質上就是協程,consumer是一個協程,producer也是一個協程
# 單純的切換還會消耗時間
# 但是如果能夠在阻塞的時候切換,並且多個程式的阻塞時間共享,協程能夠非常大限度的提高效率
# greenlet  協程模組 在多個任務之間來回切換
import time
from greenlet import greenlet

def play():
    print('start play')
    g2.switch()  # 開關
    time.sleep(1)
    print('end play')
def sleep():
    print('start sleep')
    time.sleep(1)
    print('end sleep')
    g1.switch()
g1 = greenlet(play)
g2 = greenlet(sleep)
g1.switch()  # 開關
start play
start sleep
end sleep
end play

gevent

gevent.spawn()”方法會建立一個新的greenlet協程物件,並執行它。”
gevent.joinall()”方法會等待所有傳入的greenlet協程執行結束後再退出,
這個方法可以接受一個”timeout”引數來設定超時時間,單位是秒。
# gevent 基於greenlet實現的,多個任務交給gevent管理,遇到IO就使用greenlet進行切換
import time
import gevent def play(): # 協程1 print(time.time()) print('start play') gevent.sleep(1) print('end play') def sleep(): # 協程2 print('start sleep') print('end sleep') print(time.time()) g1 = gevent.spawn(play) g2 = gevent.spawn(sleep) # g1.join() # g2.join() # 精準的控制協程任務,一定是執行完畢之後join立即結束阻塞 gevent.joinall([g1,g2])

 

from gevent import monkey;monkey.patch_all()
   # 把下面所有的模組中的阻塞都打成一個包,然後gevent就可以識別這些阻塞事件了
import time
import gevent
def play():   # 協程1
    print(time.time())
    print('start play')
    time.sleep(1)
    print('end play')
def sleep():  # 協程2
    print('start sleep')
    time.sleep(1)
    print('end sleep')
    print(time.time())

g1 = gevent.spawn(play)
g2 = gevent.spawn(sleep)
gevent.joinall([g1,g2])
import time
from gevent import monkey;monkey.patch_all()
from urllib.request import urlopen
import gevent
url_lst = ['https://www.python.org/','https://www.yahoo.com/','https://github.com/']

def get_page(url):
    ret = urlopen(url).read().decode('utf-8')
    return ret
start = time.time()
g_l = []
for url in url_lst:
    g = gevent.spawn(get_page,url)
    g_l.append(g)

gevent.joinall(g_l)
print(time.time()-start)#2.8369998931884766

socket_server

from gevent import monkey;monkey.patch_all()
import socket
import gevent

def talk(conn):
    while True:
        msg = conn.recv(1024).decode()
        conn.send(msg.upper().encode())

sk = socket.socket()
sk.bind(('127.0.0.1',9000))
sk.listen()

while True:
    conn,addr = sk.accept()
    gevent.spawn(talk,conn)

socket_server

import socket
import threading
def task():
    sk = socket.socket()
    sk.connect(('127.0.0.1',9000))
    while True:
        sk.send(b'hello')
        print(sk.recv(1024))

for i in range(500):
    threading.Thread(target=task).start()

總結

# 協程
# 一條執行緒在多個任務之間相互切換
# 資料安全的
# 不能利用多核
# 能夠規避一個執行緒上的IO阻塞

# 一條執行緒能夠起500個協程
# 4c的機器
# 5個程序
# 每一個程序20個執行緒
# 每一個執行緒500個協程
# 5*20*500 = 50000

IO模型
網路IO模型

阻塞IO 之前寫的所有的socket recv阻塞  recvfrom accept

非租塞IO 能實現併發,浪費資源,並且麻煩

from socket import *
s=socket(AF_INET,SOCK_STREAM)#socket()是一個函式,建立一個套接字,AF_INET 表示用IPV4地址族,SOCK_STREAM 是說是要是用流式套接字
s.bind(('127.0.0.1',8080))
s.listen(5)
s.setblocking(False) #設定socket的介面為非阻塞
conn_l=[]   # 所有通話連線的列表
del_l=[]    #
# 1s 10000行程式碼 9998行都是異常處理 白白消耗了系統資源
while True:
    try:
        conn,addr=s.accept()  # 如果有人鏈接我 我就直接通過連線 如果沒人鏈接我 我就報錯
        conn_l.append(conn)   # 如果有人連我,就獲取到一個conn,並且把這個conn新增到conn_l
    except BlockingIOError:
        for conn in conn_l:  # [conn1,conn2]
            try:
                data=conn.recv(1024)     #嘗試接收每一個和我連線的人的訊息
                if not data:             # 如果對方已經斷開連線了
                    del_l.append(conn)   # 就把這個連線新增到del_l中
                    continue
                conn.send(data.upper())  # 對每一個傳送給我的資訊進行回覆
            except BlockingIOError:      # 如果某個conn並沒有訊息來就會報錯,被這個異常處理
                pass
            except ConnectionResetError: # 如果對方已經斷開連線了
                del_l.append(conn)

        for conn in del_l:       # 對於已經斷開連線的這些conn  [conn1]
            conn_l.remove(conn)  # 從conn_l中移除這個連線
            conn.close()         # 關閉這個連線
        del_l=[]                 # 將del_l置空
# IO多路複用
# 作業系統提供給你的
# 對於你的程式來說 : 是一個代理
# 幫助你監聽所有的通訊物件,是否有資料來到作業系統中
# 一旦有 就通知你
# 你再根據通知來接收相應的資料
# 你不需要一直迴圈著問每一個物件是否有資訊來,而是阻塞等待,任意一個物件有資訊來,我就接收
import select
# io多路複用有好幾種機制 : select poll epoll
# 多個io物件,多個conn,sk
# 一個conn佔著一條網路連線的路
# 多個conn佔著多條路
# 多個conn複用同一個執行緒的操作
# from socket import *
# import select
#
# s=socket(AF_INET,SOCK_STREAM)
# s.bind(('127.0.0.1',8081))
# s.listen(5)
# s.setblocking(False) #設定socket的介面為非阻塞
# read_l=[s,]
# while True:
#     r_l,w_l,x_l=select.select(read_l,[],[])
#     print(r_l,read_l)
#     for ready_obj in r_l:
#         if ready_obj == s:
#             conn,addr=ready_obj.accept() #此時的ready_obj等於s
#             read_l.append(conn)
#         else:
#             try:
#                 data=ready_obj.recv(1024) #此時的ready_obj等於conn
#                 if not data:
#                     ready_obj.close()
#                     read_l.remove(ready_obj)
#                     continue
#                 ready_obj.send(data.upper())
#             except ConnectionResetError:
#                 ready_obj.close()
#                 read_l.remove(ready_obj)


# 非同步IO


# socketserver
# selectors模組 + threading模組實現的

非阻塞io的client

#客戶端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8081))

while True:
    msg=input('>>: ')
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data=c.recv(1024)
    print(data.decode('utf-8'))