1. 程式人生 > >python Gevent – 高效能的Python併發框架

python Gevent – 高效能的Python併發框架

        在python裡,按照官方解釋greenlet是輕量級的並行程式設計,而gevent呢,就是利用greenlet實現的基於協程的python的網路library,好了,關係理清了。。。
        協程,gevent,greenlet,eventlet 不瞭解的可以上網查詢資料瞭解下。至於 協程,程序和執行緒大家平時瞭解的都比較多,而協程算是一種輕量級程序,但又不能叫程序,因為作業系統並不知道它的存在。什麼意思呢,就是說,協程像是一種在程式級別來模擬系統級別的程序,由於是單程序,並且少了上下文切換,於是相對來說系統消耗很少,而且網上的各種測試也表明,協程確實擁有驚人的速度。並且在實現過程中,協程可以用以前同步思路的寫法,而執行起來確是非同步的,也確實很有意思。話說有一種說法就是說進化歷程是:多程序->多執行緒->非同步->協程,暫且不論說的對不對,單從諸多讚譽來看,協程還是有必要理解一下的。
比較慚愧,greenlet沒怎麼看就直接看gevent,官方文件還是可以看看的,尤其是原始碼裡的examples都相當不錯,有助於理解gevent的使用。

gevent 封裝了很多很方便的介面,其中一個就是monkey

from gevent import monkey
monkey.patch_all()

這樣兩行,就可以使用python以前的socket之類的,因為gevent已經給你自動轉化了,真是超級方便阿。
而且安裝gevent也是很方便,首先安裝依賴libevent和greenlet,再利用pypi安裝即可

sudo apt-get install libevent-dev
sudo apt-get install python-dev
sudo easy-install gevent

然後,gevent中的event,有wait,set等api,方便你可以讓某些協程在某些地方等待條件,然後用另一個去喚醒他們。
再就是gevent實現了wsgi可以很方便的當作python的web server伺服器使。
最後放送一個我利用gevent實現的一個帶有快取的dns server

# -*- coding: UTF-8 -*-
 
import gevent
import dnslib
from gevent import socket
from gevent import event
 
rev=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
rev.bind(('',53))
ip=[]
cur=0
 
def preload():
    for i in open('ip'):
        ip.append(i)
    print "load "+str(len(ip))+"
ip"
 
def send_request(data):
    global cur
    ret=rev.sendto(data,(ip[cur],53))
    cur=(cur+1)%len(ip)
 
class Cache:
    def __init__(self):
        self.c={}
    def get(self,key):
        return self.c.get(key,None)
    def set(self,key,value):
        self.c[key]=value
    def remove(self,key):
        self.c.pop(key,None)
 
cache=Cache()
 
def handle_request(s,data,addr):
    req=dnslib.DNSRecord.parse(data)
    qname=str(req.q.qname)
    qid=req.header.id
    ret=cache.get(qname)
    if ret:
        ret=dnslib.DNSRecord.parse(ret)
        ret.header.id=qid;
        s.sendto(ret.pack(),addr)
    else:
        e=event.Event()
        cache.set(qname+"e",e)
        send_request(data)
        e.wait(60)
        tmp=cache.get(qname)
        if tmp:
            tmp=dnslib.DNSRecord.parse(tmp)
            tmp.header.id=qid;
            s.sendto(tmp.pack(),addr)
 
def handle_response(data):
    req=dnslib.DNSRecord.parse(data)
    qname=str(req.q.qname)
    print qname
    cache.set(qname,data)
    e=cache.get(qname+"e")
    cache.remove(qname+"e")
    if e:
        e.set()
        e.clear()
 
def handler(s,data,addr):
    req=dnslib.DNSRecord.parse(data)
    if req.header.qr:
        handle_response(data)
    else:handle_request(s,data,addr)
 
def main():
    preload()
    while True:
        data,addr=rev.recvfrom(8192)
        gevent.spawn(handler,rev,data,addr)
 
if __name__ == '__main__':
    main()

這個是直接利用了dict來作為快取查詢了,在 這裡:https://github.com/isnowfy/dns 還有我將dict換成redis持久化實現的另一個版本(話說redis的python api也可以利用pypi安裝,pypi(PyPI - the Python Package Index : Python Package Index :https://pypi.python.org/pypi )

##################################

# -*- coding: cp936 -*-

import gevent 

import time
from gevent import event #呼叫gevent的event子模組
#三個程序需要定義三個事件event1,event2,event3,來進行12,23,31迴圈機制,即程序一,程序二,程序三順序執行

def fun1(num,event1,event2):#固定格式

    i=0

    while i<10: #設定迴圈10次

        i+=1

        time.sleep(1) #睡眠1秒

        print'程序一:111111111'

        event2.set() #將event2值設為True

        event1.clear()#將event1值設為False

        event1.wait()#event1等待,其值為True時才執行

        

def fun2(num,event2,event3):

    i=0

    while i<10:

        i+=1

        time.sleep(1)

        print'程序二:222222222'

        event3.set()#將event3值設為True

        event2.clear()#將event2值設為False

        event2.wait()#event2等待,其值為True時才執行
def fun3(num,event3,event1):

    i=0

    while i<10:

        i+=1

        time.sleep(1)

        print'程序三:333333333'

        event1.set()

        event3.clear()

        event3.wait()
if __name__=="__main__": #執行呼叫格式

    act1=gevent.event.Event() #呼叫event中的Event類,用act1表示

    act2=gevent.event.Event() 

    act3=gevent.event.Event()

    #三個程序,act1,act2,act3

    Gevents=[] #建立一個數列,用來存和管理程序

    g=gevent.Greenlet(fun1,1,act1,act2) #呼叫gevent中的Greenlet子模組,用Greenlet建立程序一

    g.start() 

    print'程序一啟動:'

    Gevents.append(g) #將程序一加入到Gevents數列
    g=gevent.Greenlet(fun2,2,act2,act3)

    g.start()

    print'程序二啟動:'

    Gevents.append(g)
    g=gevent.Greenlet(fun3,3,act3,act1)

    g.start()

    print'程序三啟動:'

    print'所有程序都已啟動!'

    Gevents.append(g)
    gevent.joinall(Gevents) #呼叫Greenlet中的joinall函式,將Gevents的程序收集排列
from gevent import monkey
monkey.patch_all()
import gevent
from gevent import Greenlet

class Task(Greenlet):
    def __init__(self, name):
        Greenlet.__init__(self)
        self.name = name    
    def _run(self):
        print "Task %s: some task..." % self.name

t1 = Task("task1")
t2 = Task("task2")
t1.start()
t2.start()
# here we are waiting all tasks
gevent.joinall([t1,t2])

##################################

首先,gevent是一個網路庫:libevent是一個事件分發引擎,greenlet提供了輕量級執行緒的支援。所以它不適合處理有長時間阻塞IO的情況。
gevent就是基於這兩個東西的一個專門處理網路邏輯的並行庫。

  1. gevent.spawn啟動的所有協程,都是執行在同一個執行緒之中,所以協程不能跨執行緒同步資料。
  2. gevent.queue.Queue 是協程安全的。
  3. gevent啟動的併發協程,具體到task function,不能有長時間阻塞的IO操作。因為gevent的協程的特點是,當前協程阻塞了才會切換到別的協程。
    如果當前協程長時間阻塞,則不能顯示(gevent.sleep(0),或隱式,由gevent來做)切換到別的協程。導致程式出問題。
  4. 如果有長時間阻塞的IO操作,還是用傳統的執行緒模型比較好。
  5. 因為gevent的特點總結是:事件驅動+協程+非阻塞IO,事件驅動值得是libvent對epool的封裝,來基於事件的方式處理IO。
    協程指的是greenlet,非阻塞IO指的是gevent已經patch過的各種庫,例如socket和select等等。
  6. 使用gevent的協程,最好要用gevent自身的非阻塞的庫。如httplib, socket, select等等。
  7. gevent適合處理大量無阻塞的任務,如果有實在不能把阻塞的部分變為非阻塞再交給gevent處理,就把阻塞的部分改為非同步吧。

##################################

  1. gevent.server.StreamServer 會針對每個客戶端連線啟動一個greenlet處理,要注意的是,如果不迴圈監聽( 阻塞在read ),
    每個greenlet會在完成後立即退出,從而導致客戶端退出( 傳送FIN_ACK給客戶端 )。這個問題折騰了一晚上,終於弄明白了。坑爹啊。。
  2. 要非常仔細的檢查,greenlet處理的程式碼,發現有可能阻塞IO的地方,儘量用gevent提供的庫。
  3. 一些第三方庫隱藏了自己的實現( 通常是直接封裝C庫),要使得gevent相容它們,可以用monkey_patch,但不保證全部管用。
  4. 最後最後的一點,gevent的greenlet效能非常高,所以如果是用它作為併發的client端,那麼一定要注意,你的server端處理速度一定要足夠快!否則你的客戶端程式碼會因為服務端的慢速,而失去了greenlet的優勢。。。

####################################

安裝 libevent:apt-get install libevent-dev
安裝python-dev:apt-get install python-dev
安裝greenlet:easy_install greenlet
安裝gevent:easy_install gevent

一個小測試,測試gevent 的任務池

from gevent import pool
g = pool.Pool()
def a():
    for i in xrange(100):
        g.spawn(b)
def b():
    print 'b'
g.spawn(a)
g.join()

gevent程式設計師指南

gevent是一個基於libev的併發庫。它為各種併發和網路相關的任務提供了整潔的API。

介紹

本指南假定讀者有中級Python水平,但不要求有其它更多的知識,不期待讀者有 併發方面的知識。本指南的目標在於給予你需要的工具來開始使用gevent,幫助你 馴服現有的併發問題,並從今開始編寫非同步應用程式。

貢獻者

同時感謝Denis Bilenko寫了gevent和相應的指導以形成本指南。

這是一個以MIT許可證釋出的協作文件。你想新增一些內容?或看見一個排版錯誤? Fork一個分支釋出一個request到 Github. 我們歡迎任何貢獻。

本頁也有日文版本

核心部分

Greenlets

在gevent中用到的主要模式是Greenlet, 它是以C擴充套件模組形式接入Python的輕量級協程。 Greenlet全部執行在主程式作業系統程序的內部,但它們被協作式地排程。

在任何時刻,只有一個協程在執行。

這與multiprocessingthreading等提供真正並行構造的庫是不同的。 這些庫輪轉使用作業系統排程的程序和執行緒,是真正的並行。

同步和非同步執行

併發的核心思想在於,大的任務可以分解成一系列的子任務,後者可以被排程成 同時執行或非同步執行,而不是一次一個地或者同步地執行。兩個子任務之間的 切換也就是上下文切換

在gevent裡面,上下文切換是通過yielding來完成的. 在下面的例子裡, 我們有兩個上下文,通過呼叫gevent.sleep(0),它們各自yield向對方。


import gevent

def foo():
    print('Running in foo')
    gevent.sleep(0)
    print('Explicit context switch to foo again')

def bar():
    print('Explicit context to bar')
    gevent.sleep(0)
    print('Implicit context switch back to bar')

gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])

Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar

下圖將控制流形象化,就像在偵錯程式中單步執行整個程式,以說明上下文切換如何發生。


當我們在受限於網路或IO的函式中使用gevent,這些函式會被協作式的排程, gevent的真正能力會得到發揮。Gevent處理了所有的細節, 來保證你的網路庫會在可能的時候,隱式交出greenlet上下文的執行權。 這樣的一種用法是如何強大,怎麼強調都不為過。或者我們舉些例子來詳述。

下面例子中的select()函式通常是一個在各種檔案描述符上輪詢的阻塞呼叫。


import time
import gevent
from gevent import select

start = time.time()
tic = lambda: 'at %1.1f seconds' % (time.time() - start)

def gr1():
    # Busy waits for a second, but we don't want to stick around...
    print('Started Polling: %s' % tic())
    select.select([], [], [], 2)
    print('Ended Polling: %s' % tic())

def gr2():
    # Busy waits for a second, but we don't want to stick around...
    print('Started Polling: %s' % tic())
    select.select([], [], [], 2)
    print('Ended Polling: %s' % tic())

def gr3():
    print("Hey lets do some stuff while the greenlets poll, %s" % tic())
    gevent.sleep(1)

gevent.joinall([
    gevent.spawn(gr1),
    gevent.spawn(gr2),
    gevent.spawn(gr3),
])

Started Polling: at 0.0 seconds
Started Polling: at 0.0 seconds
Hey lets do some stuff while the greenlets poll, at 0.0 seconds
Ended Polling: at 2.0 seconds
Ended Polling: at 2.0 seconds

下面是另外一個多少有點人造色彩的例子,定義一個非確定性的(non-deterministic) 的task函式(給定相同輸入的情況下,它的輸出不保證相同)。 此例中執行這個函式的副作用就是,每次task在它的執行過程中都會隨機地停某些秒。


import gevent
import random

def task(pid):
    """
    Some non-deterministic task
    """
    gevent.sleep(random.randint(0,2)*0.001)
    print('Task %s done' % pid)

def synchronous():
    for i in range(1,10):
        task(i)

def asynchronous():
    threads = [gevent.spawn(task, i) for i in xrange(10)]
    gevent.joinall(threads)

print('Synchronous:')
synchronous()

print('Asynchronous:')
asynchronous()

Synchronous:
Task 1 done
Task 2 done
Task 3 done
Task 4 done
Task 5 done
Task 6 done
Task 7 done
Task 8 done
Task 9 done
Asynchronous:
Task 3 done
Task 7 done
Task 9 done
Task 2 done
Task 4 done
Task 1 done
Task 8 done
Task 6 done
Task 0 done
Task 5 done

上例中,在同步的部分,所有的task都同步的執行, 結果當每個task在執行時主流程被阻塞(主流程的執行暫時停住)。

程式的重要部分是將task函式封裝到Greenlet內部執行緒的gevent.spawn。 初始化的greenlet列表存放在陣列threads中,此陣列被傳給gevent.joinall 函式,後者阻塞當前流程,並執行所有給定的greenlet。執行流程只會在 所有greenlet執行完後才會繼續向下走。

要重點留意的是,非同步的部分本質上是隨機的,而且非同步部分的整體執行時間比同步 要大大減少。事實上,同步部分的最大執行時間,即是每個task停0.002秒,結果整個 佇列要停0.02秒。而非同步部分的最大執行時間大致為0.002秒,因為沒有任何一個task會 阻塞其它task的執行。

一個更常見的應用場景,如非同步地向伺服器取資料,取資料操作的執行時間 依賴於發起取資料請求時遠端伺服器的負載,各個請求的執行時間會有差別。

import gevent.monkey
gevent.monkey.patch_socket()

import gevent
import urllib2
import simplejson as json

def fetch(pid):
    response = urllib2.urlopen('http://json-time.appspot.com/time.json')
    result = response.read()
    json_result = json.loads(result)
    datetime = json_result['datetime']

    print('Process %s: %s' % (pid, datetime))
    return json_result['datetime']

def synchronous():
    for i in range(1,10):
        fetch(i)

def asynchronous():
    threads = []
    for i in range(1,10):
        threads.append(gevent.spawn(fetch, i))
    gevent.joinall(threads)

print('Synchronous:')
synchronous()

print('Asynchronous:')
asynchronous()

確定性

就像之前所提到的,greenlet具有確定性。在相同配置相同輸入的情況下,它們總是 會產生相同的輸出。下面就有例子,我們在multiprocessing的pool之間執行一系列的 任務,與在gevent的pool之間執行作比較。


import time


            
           

相關推薦

python Gevent高效能Python併發框架

        在python裡,按照官方解釋greenlet是輕量級的並行程式設計,而gevent呢,就是利用greenlet實現的基於協程的python的網路library,好了,關係理清了。。。         協程,gevent,gr

模擬高併發請求服務端(python gevent

專案背景:對web後端進行高併發的請求,簡單測試服務框架的效能 解決思路:利用python的多執行緒,但python的多執行緒有點“雞肋”, 個人選擇使用簡潔輕便gevent。 解決方案:採用gevent非同步 + requests 進行高併發請求 import time import

python 高性能web框架 gunicorn+gevent

之一 啟動 發布 egg nic 一個 運行 -- www. 參考鏈接: http://rfyiamcool.blog.51cto.com/1030776/1276364/ http://www.cnblogs.com/nanrou/

百度開源高效能 Python 分散式計算框架 Bigflow

小編近日看到一個百度開源的python框架-Bigflow , 致力於提供一套簡單易用的介面來描述使用者的計算任務,並使同一套程式碼可以執行在不同的執行引擎之上。 Bigflow 的設計中有許多思想借鑑自 Google FlumeJava以及 Google Cloud Dataflow,另有部分

Python非同步併發框架

呵呵,這個標題有點大,其實只是想從零開始介紹一下非同步的基礎,以及 Python 開源非同步併發框架的發展和互操作性。 另外,這是我在 OSTC 2014 做的一個同題演講,幻燈片在這裡,歡迎拍磚。 開源 Python 是開源的,介紹的這幾個框架 Twisted、Tornado、Gevent 和 tu

Python實戰之協程(greenlet模組,gevent模組,socket+ gevent實現高併發處理)

協程 協程,又稱微執行緒,纖程。英文名Coroutine。一句話說明什麼是執行緒:協程是一種使用者態的輕量級執行緒。(cpu不知道,是使用者自己控制的) 協程擁有自己的暫存器上下文和棧。協程排程切換時,將暫存器上下文和棧儲存到其他地方,在切回來的時候,恢復先前儲存的暫存器上下文和棧(執行緒的

python爬蟲—使用scrapy爬蟲框架

pywin32 rip for 鏈接 是把 ror sdn 成功 repl 問題1.使用scrapy框架,使用命令提示符pip命令下載scrapy後,卻無法使用scrapy命令,出現scrapy不是內部或外部命令。也不是可運行的程序 解決:一開始,我是把python安裝在

Selenium2+python自動化30-引入unittest框架【轉載】

進行 test 比較 試用 獲得 webdriver class bsp com 本篇轉自博客:上海-悠悠 原文地址:http://www.cnblogs.com/yoyoketang/tag/unittest/ from selenium import webdriver

python web開發之flask框架學習(2) 加載模版

模版文件 簡書 nbsp 什麽 blog python 目錄 pan col 上次學習了flask的helloword項目的創建,這次來學習flask項目的模版加載: 第一步:創建一個flask項目 第二步:在項目目錄的templates文件夾下創建一個html文件

Selenium2+python自動化20-引入unittest框架

模塊 試用 and 瀏覽器 bsp false get urn 完成 Selenium2+python自動化20-引入unittest框架 from selenium import webdriverfrom selenium.webdriver.common.b

python-gevent模塊(自動切換io的協程)

UNC TE fun AS imp In 程序 自動 AR import gevent def foo(): print("Running in foo") gevent.sleep(2) print("Explicit context sw

python--gevent

print for rom 3.0 int all imp pre from import gevent from gevent import monkey monkey.patch_all() import time def func(n): time.s

python高性能web框架——Japronto

url hub class ews scrip pat 0ms cep dem   近期做了一個簡單的demo需求,搭建一個http server,支持簡單的qa查詢。庫中有10000個qa對,需要支持每秒10000次以上的查詢請求。   需求比較簡單,主要難點就是1000

Python - - 項目實戰 -- 遊戲框架搭建

部分 sta align tab 判斷 數字 game 向上 通過 目標 -- 使用 面向對象 設計 飛機大戰遊戲類 目標 明確主程序職責 實現主程序類 準備遊戲精靈組 01,明確主程序職責 回顧 快速入門案例 ,一個遊戲主程序的 職責 可以分為兩個部分 遊戲初始化

Python學習多程序併發寫入同一檔案

最近學習了Python的多程序,想到我的高德API爬蟲那個爬取讀寫速度我就心累,實在是慢,看到多程序可以充分利用CPU核數我就開始完善我的程式碼,不過過程是艱辛的,在此之中出現了很多問題,其中最大的問題是爬取的資料是正確的,但是讀寫到Excel中卻開啟是空,想了半天也沒解決,腦子笨沒辦法,不過我

Java併發程式設計高階技術-高效能併發框架原始碼解析與實戰(資源同步)

第1章 課程介紹(Java併發程式設計進階課程) 什麼是Disruptor?它一個高效能的非同步處理框架,號稱“單執行緒每秒可處理600W個訂單”的神器,本課程目標:徹底精通一個如此優秀的開源框架,面試秒殺面試官。本章會帶領小夥伴們先了解課程大綱與重點,然後模擬千萬,億級資料進行壓力測試。讓大

python + selenium + unittest 自動化測試框架 -- 入門篇

、 預置條件: 1. python已安裝 2. pycharm已安裝 3. selenium已安裝 4. chrome.driver 驅動已下載     二、工程建立 1. New Project:建立自己的工程 2. New Package:建立各個配置包 3. Ne

Java併發程式設計高階技術-高效能併發框架原始碼解析與實戰(無密連結)

第1章 課程介紹(Java併發程式設計進階課程) 什麼是Disruptor?它一個高效能的非同步處理框架,號稱“單執行緒每秒可處理600W個訂單”的神器,本課程目標:徹底精通一個如此優秀的開源框架,面試秒殺面試官。本章會帶領小夥伴們先了解課程大綱與重點,然後模擬千萬,億級資料進行壓力測試。讓大

2017年最棒的七個Python圖形應用GUI開發框架

作為Pyhon開發者,你遲早都會碰到圖形使用者介面(GUI)應用開發任務,目前市場上有大量Python GUI開發框架可供選擇,Python wiki GUI programming給出了超過30個跨平臺框架方案,包括Pyjamas這樣的跨瀏覽器web開發框架。 如何從眾多的Python GUI

Java併發程式設計高階技術-高效能併發框架原始碼解析與實戰

Java併發程式設計高階技術-高效能併發框架原始碼解析與實戰 第1章 課程介紹 什麼是Disruptor?它一個高效能的非同步處理框架,號稱“單執行緒每秒可處理600W個訂單”的神器,本課程目標:徹底精通一個如此優秀的開源框架,面試秒殺面試官。本章會帶領小夥伴們先了解課程大綱與重點,然後