1. 程式人生 > >Python入門學習-DAY37-進程池與線程池、協程、gevent模塊

Python入門學習-DAY37-進程池與線程池、協程、gevent模塊

在線 ces pro alt 18C name bcb 所有 __name__

一、進程池與線程池

基本使用:

  進程池和線程池操作一樣

提交任務的兩種方式:

同步調用:提交完一個任務之後,就在原地等待,等待任務完完整整地運行完畢拿到結果後,再執行下一行代碼,會導致任務是串行執行的

異步調用:提交完一個任務之後,不在原地等待,結果???,而是直接執行下一行代碼,會導致任務是並發執行的

同步調用

技術分享圖片
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import  time,random,os

def task():
    print(%s is running%os.getpid())
    i
=random.randint(1,3) time.sleep(i) return i if __name__ == __main__: p=ProcessPoolExecutor(4) l=[] for i in range(10): res = p.submit(task).result()#等待任務執行完畢,返回結果 print(res) print()
View Code

異步調用

技術分享圖片
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time,random,os def task(): print(%s is running%os.getpid()) i=random.randint(1,3) time.sleep(i) return i if __name__ == __main__: p=ProcessPoolExecutor(4) l=[] for i in range(10): future=p.submit(task)#只替提交任務 l.append(future) p.shutdown(wait
=True)#關閉進程池入口,並在原地等待所有進程任務執行完畢 for i in l: print(i.result()) print()
View Code

異步 + 回調函數

技術分享圖片
from concurrent.futures import ProcessPoolExecutor
import time,os
import requests


def get(url):
    print(%s GET %s %(os.getpid(),url))
    time.sleep(3)
    response=requests.get(url)
    if response.status_code == 200:
        res=response.text
    else:
        res=下載失敗
    return res

def parse(future):
    time.sleep(1)
    res=future.result()
    print(%s 解析結果為%s %(os.getpid(),len(res)))

if __name__ == __main__:
    urls=[
        https://www.baidu.com,
        https://www.sina.com.cn,
        https://www.tmall.com,
        https://www.jd.com,
        https://www.python.org,
        https://www.openstack.org,
        https://www.baidu.com,
        https://www.baidu.com,
        https://www.baidu.com,

    ]

    p=ProcessPoolExecutor(9)

    start=time.time()
    for url in urls:
        future=p.submit(get,url)
        future.add_done_callback(parse)
        #parse會在任務運行完畢後自動觸發,然後接收一個參數future對象,回調函數的執行是在主進程裏,而線程中的回調函數是由空閑的線程來執行

    p.shutdown(wait=True)

    print(,time.time()-start)
    print(,os.getpid())
View Code

基於線程池的套接字通訊

服務端

技術分享圖片
from concurrent.futures import ThreadPoolExecutor
import socket
from threading import current_thread
IP=127.0.0.1
PORT=8085
ADDRESS=(IP,PORT)
BUFFSIZE=1024
t = ThreadPoolExecutor(4)


def communicate(conn,addr):
    while True:
        try:
            data=conn.recv(BUFFSIZE)
            if not data:
                print(%s客戶端斷開....%addr)
                break
            print(>>>>%s  端口:%s 線程:%s%(data.decode(utf-8),addr[1],current_thread().name))
            conn.send(data.upper())
        except ConnectionResetError:
            break
    conn.close()

if __name__ == __main__:
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(ADDRESS)
    server.listen(2)
    print(current_thread().name)
    while True:
        conn,addr=server.accept()
        t.submit(communicate, conn,addr)
View Code

客戶端

技術分享圖片
import socket
IP=127.0.0.1
PORT=8085
ADDRESS=(IP,PORT)
BUFFSIZE=1024

client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
client.connect(ADDRESS)

while True:
    msg=input(>>>>).strip()
    if len(msg)==0:continue
    if msg==q:break
    client.send(msg.encode(utf-8))
    data = client.recv(BUFFSIZE)
    print(data.decode(utf-8))

client.close()
View Code

二、協程

1. 目標:

在線程下實現並發
並發(多個任務看起來是同時執行就是並發):切換+保存狀態

2. 協程:

協程是單線程實現並發
註意:協程是程序員意淫出來的東西,操作系統裏只有進程和線程的概念(操作系統調度的是線程)

在單線程下實現多個任務間遇到IO就切換就可以降低單線程的IO時間,從而最大限度地提升單線程的效率

串行執行

技術分享圖片
import time

def func1():
    for i in range(10000000):
        i+1

def func2():
    for i in range(10000000):
        i+1

start = time.time()
func1()
func2()
stop = time.time()
print(stop - start)#1.9774692058563232s
View Code

基於yield並發執行

技術分享圖片
import time
def func1():
    while True:
        print(func1)
        yield

def func2():
    g=func1()
    for i in range(1000):
        print(func2)
        i+1
        next(g)

start=time.time()
func2()
stop=time.time()
print(stop-start)#0.014994382858276367s
View Code

三、gevent模塊

1.使用

from gevent import monkey;monkey.patch_all()#用來識別IO阻塞,必須放到文件頭
from gevent import spawn,joinall
import time


def foo1(name):
    print(%s  play1%name)
    time.sleep(2)#模擬IO操作,遇到IO切換任務
    print(%s  play2%name)


def foo2(name):
    print(%s  eat1%name)
    time.sleep(3)#模擬IO操作,遇到IO切換任務
    print(%s  eat2%name)

f1=spawn(foo1,egon)#提交任務
f2=spawn(foo2,egon)#提交任務

joinall([f1,f2])#主線程等待任務完成
print()

#結果:
  #egon play1
  #egon eat1
  #egon play2
  #egon eat2
  #

2.基於gevent的套接字通信

服務端

技術分享圖片
from gevent import monkey;monkey.patch_all()
from gevent import spawn
import socket
from threading import current_thread
IP=127.0.0.1
PORT=8086
ADDRESS=(IP,PORT)
BUFFSIZE=1024

def communicate(conn,addr):
    while True:
        try:
            data=conn.recv(BUFFSIZE)
            if not data:
                print(%s客戶端斷開....%addr)
                break
            conn.send(data.upper())
        except ConnectionResetError:
            break


def server():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(ADDRESS)
    server.listen(2)
    print(current_thread().name)
    while True:
        conn,addr=server.accept()
        spawn(communicate,conn,addr)

if __name__ == __main__:
    s1=spawn(server)
    s1.join()
View Code

多個客戶端並發

技術分享圖片
import socket
from threading import Thread,current_thread

IP = 127.0.0.1
PORT = 8086
ADDRESS = (IP, PORT)
BUFFSIZE = 1024
def client():


    client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    client.connect(ADDRESS)
    n=0
    while True:
        msg=%s say hello %s %(current_thread().name,n)
        n+=1
        client.send(msg.encode(utf-8))
        data=client.recv(BUFFSIZE)
        print(data.decode(utf-8))

if __name__ == __main__:
    for i in range(500):
        t=Thread(target=client)
        t.start()
View Code

Python入門學習-DAY37-進程池與線程池、協程、gevent模塊