1. 程式人生 > 程式設計 >Python實現非同步IO的示例

Python實現非同步IO的示例

前言

  用阻塞 API 寫同步程式碼最簡單,但一個執行緒同一時間只能處理一個請求,有限的執行緒數導致無法實現萬級別的併發連線,過多的執行緒切換也搶走了 CPU 的時間,從而降低了每秒能夠處理的請求數量。為了達到高併發,你可能會選擇一個非同步框架,用非阻塞 API 把業務邏輯打亂到多個回撥函式,通過多路複用與事件迴圈的方式實現高併發。

磁碟 IO 為例,描述了多執行緒中使用阻塞方法讀磁碟,2 個執行緒間的切換方式。那麼,怎麼才能實現高併發呢?

Python實現非同步IO的示例

把上圖中本來由核心實現的請求切換工作,交由使用者態的程式碼來完成就可以了,非同步化程式設計通過應用層程式碼實現了請求切換,降低了切換成本和記憶體佔用空間。非同步化依賴於 IO 多路複用機制,比如 Linux 的 epoll 或者 Windows 上的 iocp,同時,必須把阻塞方法更改為非阻塞方法,才能避免核心切換帶來的巨大消耗。Nginx、Redis 等高效能服務都依賴非同步化實現了百萬量級的併發。

下圖描述了非同步 IO 的非阻塞讀和非同步框架結合後,是如何切換請求的。

Python實現非同步IO的示例

然而,寫非同步化程式碼很容易出錯。因為所有阻塞函式,都需要通過非阻塞的系統呼叫拆分成兩個函式。雖然這兩個函式共同完成一個功能,但呼叫方式卻不同。第一個函式由你顯式呼叫,第二個函式則由多路複用機制呼叫。

這種方式違反了軟體工程的內聚性原則,函式間同步資料也更復雜。特別是條件分支眾多、涉及大量系統呼叫時,非同步化的改造工作會非常困難。

Python如何實現非同步呼叫

from flask import Flask
import time
app = Flask(__name__)


@app.route('/bar')
def bar():
  time.sleep(1)
  return '<h1>bar!</h1>'

@app.route('/foo')
def foo():
  time.sleep(1)
  return '<h1>foo!</h1>'
if __name__ == '__main__':
  app.run(host='127.0.0.1',port=5555,debug=True)

採用同步的方式呼叫

import requests
import time

starttime = time.time()
print(requests.get('http://127.0.0.1:5555/bar').content)
print(requests.get('http://127.0.0.1:5555/foo').content)
print("消耗時間: ",time.time() -starttime)

b'<h1>bar!</h1>'
b'<h1>foo!</h1>'
消耗時間: 2.015509605407715

取樣非同步的方式呼叫:

重點:

1.將阻塞io改為非阻塞io;

2.多路複用io監聽核心事件,事件觸發通過回撥函式;

3.使用者態程式碼採取事件迴圈的方式獲取事件,執行事件的回撥函式;

import selectors
import socket
import time
# from asynrequest import ParserHttp
class asynhttp:
  def __init__(self):
    self.selecter = selectors.DefaultSelector()

  def get(self,url,optiondict = None):
    global reqcount
    reqcount += 1
    s = socket.socket()
    s.setblocking(False)
    try:
      s.connect(('127.0.0.1',5555))
    except BlockingIOError:
      pass
    requset = 'GET %s HTTP/1.0\r\n\r\n' % url
    callback = lambda : self.send(s,requset)
    self.selecter.register(s.fileno(),selectors.EVENT_WRITE,callback)

  def send(self,s,requset):
    self.selecter.unregister(s.fileno())
    s.send(requset.encode())
    chunks = []
    callback = lambda: self.recv(s,chunks)
    self.selecter.register(s.fileno(),selectors.EVENT_READ,callback)

  def recv(self,chunks):
    self.selecter.unregister(s.fileno())
    chunk = s.recv(1024)
    if chunk:
      chunks.append(chunk)
      callback = lambda: self.recv(s,chunks)
      self.selecter.register(s.fileno(),callback)
    else:
      global reqcount
      reqcount -= 1
      request_first,request_headers,request_content,_ = ParserHttp.parser(b''.join(chunks))
      print("解析資料:",request_first,request_content)
      print((b''.join(chunks)).decode())
      return (b''.join(chunks)).decode()

starttime = time.time()
reqcount = 0
asynhttper = asynhttp()
asynhttper.get('/bar')
asynhttper.get('/foo')
while reqcount:
  events = asynhttper.selecter.select()
  for event,mask in events:
    func = event.data
    func()
print("消耗時間:",time.time() - starttime)

HTTP/1.0 200 OK
Content-Type: text/html; charset=utf-8
Content-Length: 13
Server: Werkzeug/1.0.1 Python/3.7.7
Date: Thu,15 Oct 2020 03:28:16 GMT

<h1>bar!</h1>
HTTP/1.0 200 OK
Content-Type: text/html; charset=utf-8
Content-Length: 13
Server: Werkzeug/1.0.1 Python/3.7.7
Date: Thu,15 Oct 2020 03:28:16 GMT

<h1>foo!</h1>
消耗時間: 1.0127637386322021

以上就是Python實現非同步IO的示例的詳細內容,更多關於python 非同步IO的資料請關注我們其它相關文章!