1. 程式人生 > >python修飾器的作用

python修飾器的作用

最近在看<python cookbook>中的修飾符的作用,以前一直不是很理解修飾符的作用,今天 看了以後感覺挺好的,終於懂@的含義了,有一些初級用法,也有一些高階用法,轉了一個我認為解釋的很清楚的,供大家欣賞。

Python 的 Decorator在使用上和Java/C#的Annotation很相似,就是在方法名前面加一個@XXX註解來為這個方法裝飾一些東西。但是,Java/C#的Annotation也很讓人望而卻步,太TMD的複雜了,你要玩它,你需要了解一堆Annotation的類庫文件,讓人感覺就是在學另外一門語言。

而Python使用了一種相對於Decorator Pattern和Annotation來說非常優雅的方法,這種方法不需要你去掌握什麼複雜的OO模型或是Annotation的各種類庫規定,完全就是語言層面的玩法:一種函數語言程式設計的技巧。如果你看過本站的《函數語言程式設計》,你一定會為函數語言程式設計的那種“描述你想幹什麼,而不是描述你要怎麼去實現”的程式設計方式感到暢快。(如果你不瞭解函數語言程式設計,那在讀本文之前,還請你移步去看看《函數語言程式設計》) 好了,我們先來點感性認識,看一個Python修飾器的Hello World的程式碼。

Hello World



下面是程式碼:
檔名:hello.py
    
def hello(fn):
    def wrapper():
        print "hello, %s" % fn.__name__
        fn()
        print "goodby, %s" % fn.__name__
    return wrapper
 
@hello
def foo():
    print "i am foo"
 
foo()

當你執行程式碼,你會看到如下輸出:
    
[[email protected]]$ python hello.py
hello, foo
i am foo
goodby, foo

你可以看到如下的東西:

1)函式foo前面有個@hello的“註解”,hello就是我們前面定義的函式hello

2)在hello函式中,其需要一個fn的引數(這就用來做回撥的函式)

3)hello函式中返回了一個inner函式wrapper,這個wrapper函式回調了傳進來的fn,並在回撥前後加了兩條語句。

Decorator 的本質

對於Python的這個@註解語法糖- Syntactic Sugar 來說,當你在用某個@decorator來修飾某個函式func時,如下所示:
    
@decorator
def func():
    pass

其直譯器會解釋成下面這樣的語句:
    
func = decorator(func)

尼瑪,這不就是把一個函式當引數傳到另一個函式中,然後再回調嗎?是的,但是,我們需要注意,那裡還有一個賦值語句,把decorator這個函式的返回值賦值回了原來的func。 根據《函數語言程式設計》中的first class functions中的定義的,你可以把函式當成變數來使用,所以,decorator必需得返回了一個函數出來給func,這就是所謂的higher order function 高階函式,不然,後面當func()呼叫的時候就會出錯。 就我們上面那個hello.py裡的例子來說,
    
@hello
def foo():
    print "i am foo"

被解釋成了:

foo = hello(foo)

是的,這是一條語句,而且還被執行了。你如果不信的話,你可以寫這樣的程式來試試看:
    
def fuck(fn):
    print "fuck %s!" % fn.__name__[::-1].upper()
 
@fuck
def wfg():
    pass

沒了,就上面這段程式碼,沒有呼叫wfg()的語句,你會發現, fuck函式被呼叫了,而且還很NB地輸出了我們每個人的心聲!

再回到我們hello.py的那個例子,我們可以看到,hello(foo)返回了wrapper()函式,所以,foo其實變成了wrapper的一個變數,而後面的foo()執行其實變成了wrapper()。

知道這點本質,當你看到有多個decorator或是帶引數的decorator,你也就不會害怕了。

比如:多個decorator

@decorator_one
@decorator_two
def func():
    pass

相當於:
    
func = decorator_one(decorator_two(func))

比如:帶引數的decorator:

@decorator(arg1, arg2)
def func():
    pass

相當於:

func = decorator(arg1,arg2)(func)

這意味著decorator(arg1, arg2)這個函式需要返回一個“真正的decorator”。

帶引數及多個Decrorator

我們來看一個有點意義的例子:
html.py

def makeHtmlTag(tag, *args, **kwds):
    def real_decorator(fn):
        css_class = " class='{0}'".format(kwds["css_class"]) 
                                     if "css_class" in kwds else ""
        def wrapped(*args, **kwds):
            return "<"+tag+css_class+">" + fn(*args, **kwds) + "</"+tag+">"
        return wrapped
    return real_decorator
 
@makeHtmlTag(tag="b", css_class="bold_css")
@makeHtmlTag(tag="i", css_class="italic_css")
def hello():
    return "hello world"
 
print hello()
 
# 輸出:
# <b class='bold_css'><i class='italic_css'>hello world</i></b>

在上面這個例子中,我們可以看到:makeHtmlTag有兩個引數。所以,為了讓 hello = makeHtmlTag(arg1, arg2)(hello) 成功,makeHtmlTag 必需返回一個decorator(這就是為什麼我們在makeHtmlTag中加入了real_decorator()的原因),這樣一來,我們就可以進入到 decorator 的邏輯中去了—— decorator得返回一個wrapper,wrapper裡回撥hello。看似那個makeHtmlTag() 寫得層層疊疊,但是,已經瞭解了本質的我們覺得寫得很自然。

你看,Python的Decorator就是這麼簡單,沒有什麼複雜的東西,你也不需要了解過多的東西,使用起來就是那麼自然、體貼、乾爽、透氣,獨有的速效凹道和完美的吸收軌跡,讓你再也不用為每個月的那幾天感到焦慮和不安,再加上貼心的護翼設計,量多也不用當心。對不起,我調皮了。

什麼,你覺得上面那個帶引數的Decorator的函式巢狀太多了,你受不了。好吧,沒事,我們看看下面的方法。

class式的 Decorator

首先,先得說一下,decorator的class方式,還是看個示例:

class myDecorator(object):
 
    def __init__(self, fn):
        print "inside myDecorator.__init__()"
        self.fn = fn
 
    def __call__(self):
        self.fn()
        print "inside myDecorator.__call__()"
 
@myDecorator
def aFunction():
    print "inside aFunction()"
 
print "Finished decorating aFunction()"
 
aFunction()
 
# 輸出:
# inside myDecorator.__init__()
# Finished decorating aFunction()
# inside aFunction()
# inside myDecorator.__call__()

上面這個示例展示了,用類的方式宣告一個decorator。我們可以看到這個類中有兩個成員:
1)一個是__init__(),這個方法是在我們給某個函式decorator時被呼叫,所以,需要有一個fn的引數,也就是被decorator的函式。
2)一個是__call__(),這個方法是在我們呼叫被decorator函式時被呼叫的。
上面輸出可以看到整個程式的執行順序。

這看上去要比“函式式”的方式更易讀一些。

下面,我們來看看用類的方式來重寫上面的html.py的程式碼:
html.py

class makeHtmlTagClass(object):
 
    def __init__(self, tag, css_class=""):
        self._tag = tag
        self._css_class = " class='{0}'".format(css_class) 
                                       if css_class !="" else ""
 
    def __call__(self, fn):
        def wrapped(*args, **kwargs):
            return "<" + self._tag + self._css_class+">"  
                       + fn(*args, **kwargs) + "</" + self._tag + ">"
        return wrapped
 
@makeHtmlTagClass(tag="b", css_class="bold_css")
@makeHtmlTagClass(tag="i", css_class="italic_css")
def hello(name):
    return "Hello, {}".format(name)
 
print hello("Hao Chen")

上面這段程式碼中,我們需要注意這幾點:
1)如果decorator有引數的話,__init__() 成員就不能傳入fn了,而fn是在__call__的時候傳入的。
2)這段程式碼還展示了 wrapped(*args, **kwargs) 這種方式來傳遞被decorator函式的引數。(其中:args是一個引數列表,kwargs是引數dict,具體的細節,請參考Python的文件或是StackOverflow的這個問題,這裡就不展開了)

用Decorator設定函式的呼叫引數

你有三種方法可以幹這個事:

第一種,通過 **kwargs,這種方法decorator會在kwargs中注入引數。

def decorate_A(function):
    def wrap_function(*args, **kwargs):
        kwargs['str'] = 'Hello!'
        return function(*args, **kwargs)
    return wrap_function
 
@decorate_A
def print_message_A(*args, **kwargs):
    print(kwargs['str'])
 
print_message_A()

第二種,約定好引數,直接修改引數

def decorate_B(function):
    def wrap_function(*args, **kwargs):
        str = 'Hello!'
        return function(str, *args, **kwargs)
    return wrap_function
 
@decorate_B
def print_message_B(str, *args, **kwargs):
    print(str)
 
print_message_B()

第三種,通過 *args 注入

def decorate_C(function):
    def wrap_function(*args, **kwargs):
        str = 'Hello!'
        #args.insert(1, str)
        args = args +(str,)
        return function(*args, **kwargs)
    return wrap_function
 
class Printer:
    @decorate_C
    def print_message(self, str, *args, **kwargs):
        print(str)
 
p = Printer()
p.print_message()

Decorator的副作用

到這裡,我相信你應該瞭解了整個Python的decorator的原理了。

相信你也會發現,被decorator的函式其實已經是另外一個函數了,對於最前面那個hello.py的例子來說,如果你查詢一下foo.__name__的話,你會發現其輸出的是“wrapper”,而不是我們期望的“foo”,這會給我們的程式埋一些坑。所以,Python的functool包中提供了一個叫wrap的decorator來消除這樣的副作用。下面是我們新版本的hello.py。
檔名:hello.py

from functools import wraps
def hello(fn):
    @wraps(fn)
    def wrapper():
        print "hello, %s" % fn.__name__
        fn()
        print "goodby, %s" % fn.__name__
    return wrapper
 
@hello
def foo():
    '''foo help doc'''
    print "i am foo"
    pass
 
foo()
print foo.__name__ #輸出 foo
print foo.__doc__  #輸出 foo help doc

當然,即使是你用了functools的wraps,也不能完全消除這樣的副作用。

來看下面這個示例:

from inspect import getmembers, getargspec
from functools import wraps
 
def wraps_decorator(f):
    @wraps(f)
    def wraps_wrapper(*args, **kwargs):
        return f(*args, **kwargs)
    return wraps_wrapper
 
class SomeClass(object):
    @wraps_decorator
    def method(self, x, y):
        pass
 
obj = SomeClass()
for name, func in getmembers(obj, predicate=inspect.ismethod):
    print "Member Name: %s" % name
    print "Func Name: %s" % func.func_name
    print "Args: %s" % getargspec(func)[0]
 
# 輸出:
# Member Name: method
# Func Name: method
# Args: []

你會發現,即使是你你用了functools的wraps,你在用getargspec時,引數也不見了。

要修正這一問,我們還得用Python的反射來解決,下面是相關的程式碼:

def get_true_argspec(method):
    argspec = inspect.getargspec(method)
    args = argspec[0]
    if args and args[0] == 'self':
        return argspec
    if hasattr(method, '__func__'):
        method = method.__func__
    if not hasattr(method, 'func_closure') or method.func_closure is None:
        raise Exception("No closure for method.")
 
    method = method.func_closure[0].cell_contents
    return get_true_argspec(method)

當然,我相信大多數人的程式都不會去getargspec。所以,用functools的wraps應該夠用了。

一些decorator的示例

好了,現在我們來看一下各種decorator的例子:

給函式呼叫做快取

這個例實在是太經典了,整個網上都用這個例子做decorator的經典範例,因為太經典了,所以,我這篇文章也不能免俗。

from functools import wraps
def memo(fn):
    cache = {}
    miss = object()
 
    @wraps(fn)
    def wrapper(*args):
        result = cache.get(args, miss)
        if result is miss:
            result = fn(*args)
            cache[args] = result
        return result
 
    return wrapper
 
@memo
def fib(n):
    if n < 2:
        return n
    return fib(n - 1) + fib(n - 2)

上面這個例子中,是一個斐波拉契數例的遞迴演算法。我們知道,這個遞迴是相當沒有效率的,因為會重複呼叫。比如:我們要計算fib(5),於是其分解成fib(4) + fib(3),而fib(4)分解成fib(3)+fib(2),fib(3)又分解成fib(2)+fib(1)…… 你可看到,基本上來說,fib(3), fib(2), fib(1)在整個遞迴過程中被呼叫了兩次。

而我們用decorator,在呼叫函式前查詢一下快取,如果沒有才呼叫了,有了就從快取中返回值。一下子,這個遞迴從二叉樹式的遞迴成了線性的遞迴。

Profiler的例子

這個例子沒什麼高深的,就是實用一些。

import cProfile, pstats, StringIO
 
def profiler(func):
    def wrapper(*args, **kwargs):
        datafn = func.__name__ + ".profile" # Name the data file
        prof = cProfile.Profile()
        retval = prof.runcall(func, *args, **kwargs)
        #prof.dump_stats(datafn)
        s = StringIO.StringIO()
        sortby = 'cumulative'
        ps = pstats.Stats(prof, stream=s).sort_stats(sortby)
        ps.print_stats()
        print s.getvalue()
        return retval
 
    return wrapper

註冊回撥函式

下面這個示例展示了通過URL的路由來呼叫相關注冊的函式示例:
    
class MyApp():
    def __init__(self):
        self.func_map = {}
 
    def register(self, name):
        def func_wrapper(func):
            self.func_map[name] = func
            return func
        return func_wrapper
 
    def call_method(self, name=None):
        func = self.func_map.get(name, None)
        if func is None:
            raise Exception("No function registered against - " + str(name))
        return func()
 
app = MyApp()
 
@app.register('/')
def main_page_func():
    return "This is the main page."
 
@app.register('/next_page')
def next_page_func():
    return "This is the next page."
 
print app.call_method('/')
print app.call_method('/next_page')

注意:
1)上面這個示例中,用類的例項來做decorator。
2)decorator類中沒有__call__(),但是wrapper返回了原函式。所以,原函式沒有發生任何變化。

給函式打日誌

下面這個示例演示了一個logger的decorator,這個decorator輸出了函式名,引數,返回值,和執行時間。

from functools import wraps
def logger(fn):
    @wraps(fn)
    def wrapper(*args, **kwargs):
        ts = time.time()
        result = fn(*args, **kwargs)
        te = time.time()
        print "function      = {0}".format(fn.__name__)
        print "    arguments = {0} {1}".format(args, kwargs)
        print "    return    = {0}".format(result)
        print "    time      = %.6f sec" % (te-ts)
        return result
    return wrapper
 
@logger
def multipy(x, y):
    return x * y
 
@logger
def sum_num(n):
    s = 0
    for i in xrange(n+1):
        s += i
    return s
 
print multipy(2, 10)
print sum_num(100)
print sum_num(10000000)

上面那個打日誌還是有點粗糙,讓我們看一個更好一點的(帶log level引數的):

import inspect
def get_line_number():
    return inspect.currentframe().f_back.f_back.f_lineno
 
def logger(loglevel):
    def log_decorator(fn):
        @wraps(fn)
        def wrapper(*args, **kwargs):
            ts = time.time()
            result = fn(*args, **kwargs)
            te = time.time()
            print "function   = " + fn.__name__,
            print "    arguments = {0} {1}".format(args, kwargs)
            print "    return    = {0}".format(result)
            print "    time      = %.6f sec" % (te-ts)
            if (loglevel == 'debug'):
                print "    called_from_line : " + str(get_line_number())
            return result
        return wrapper
    return log_decorator

但是,上面這個帶log level引數的有兩具不好的地方,
1) loglevel不是debug的時候,還是要計算函式呼叫的時間。
2) 不同level的要寫在一起,不易讀。

我們再接著改進:

import inspect
 
def advance_logger(loglevel):
 
    def get_line_number():
        return inspect.currentframe().f_back.f_back.f_lineno
 
    def _basic_log(fn, result, *args, **kwargs):
        print "function   = " + fn.__name__,
        print "    arguments = {0} {1}".format(args, kwargs)
        print "    return    = {0}".format(result)
 
    def info_log_decorator(fn):
        @wraps(fn)
        def wrapper(*args, **kwargs):
            result = fn(*args, **kwargs)
            _basic_log(fn, result, args, kwargs)
        return wrapper
 
    def debug_log_decorator(fn):
        @wraps(fn)
        def wrapper(*args, **kwargs):
            ts = time.time()
            result = fn(*args, **kwargs)
            te = time.time()
            _basic_log(fn, result, args, kwargs)
            print "    time      = %.6f sec" % (te-ts)
            print "    called_from_line : " + str(get_line_number())
        return wrapper
 
    if loglevel is "debug":
        return debug_log_decorator
    else:
        return info_log_decorator

你可以看到兩點,
1)我們分了兩個log level,一個是info的,一個是debug的,然後我們在外尾根據不同的引數返回不同的decorator。
2)我們把info和debug中的相同的程式碼抽到了一個叫_basic_log的函式裡,DRY原則。

一個MySQL的Decorator

下面這個decorator是我在工作中用到的程式碼,我簡化了一下,把DB連線池的程式碼去掉了,這樣能簡單點,方便閱讀。

import umysql
from functools import wraps
 
class Configuraion:
    def __init__(self, env):
        if env == "Prod":
            self.host    = "coolshell.cn"
            self.port    = 3306
            self.db      = "coolshell"
            self.user    = "coolshell"
            self.passwd  = "fuckgfw"
        elif env == "Test":
            self.host   = 'localhost'
            self.port   = 3300
            self.user   = 'coolshell'
            self.db     = 'coolshell'
            self.passwd = 'fuckgfw'
 
def mysql(sql):
 
    _conf = Configuraion(env="Prod")
 
    def on_sql_error(err):
        print err
        sys.exit(-1)
 
    def handle_sql_result(rs):
        if rs.rows > 0:
            fieldnames = [f[0] for f in rs.fields]
            return [dict(zip(fieldnames, r)) for r in rs.rows]
        else:
            return []
 
    def decorator(fn):
        @wraps(fn)
        def wrapper(*args, **kwargs):
            mysqlconn = umysql.Connection()
            mysqlconn.settimeout(5)
            mysqlconn.connect(_conf.host, _conf.port, _conf.user, 
                              _conf.passwd, _conf.db, True, 'utf8')
            try:
                rs = mysqlconn.query(sql, {})
            except umysql.Error as e:
                on_sql_error(e)
 
            data = handle_sql_result(rs)
            kwargs["data"] = data
            result = fn(*args, **kwargs)
            mysqlconn.close()
            return result
        return wrapper
 
    return decorator
 
@mysql(sql = "select * from coolshell" )
def get_coolshell(data):
    ... ...
    ... ..

執行緒非同步

下面量個非常簡單的非同步執行的decorator,注意,非同步處理並不簡單,下面只是一個示例。

from threading import Thread
from functools import wraps
 
def async(func):
    @wraps(func)
    def async_func(*args, **kwargs):
        func_hl = Thread(target = func, args = args, kwargs = kwargs)
        func_hl.start()
        return func_hl
 
    return async_func
 
if __name__ == '__main__':
    from time import sleep
 
    @async
    def print_somedata():
        print 'starting print_somedata'
        sleep(2)
        print 'print_somedata: 2 sec passed'
        sleep(2)
        print 'print_somedata: 2 sec passed'
        sleep(2)
        print 'finished print_somedata'
 
    def main():
        print_somedata()
        print 'back in main'
        print_somedata()
        print 'back in main'
 
    main()

其它

關於更多的示例,你可以參看: Python Decorator Library