flask如何開啟多執行緒詳解
flask開啟多執行緒
在我之前寫的'flask中current_app、g、request、session原始碼的深究和理解'一文中解釋了flask如何支援多執行緒
主要通過兩個類來實現,LocalStack和Local,在Local中有兩個屬性,__storage__和__ident_func__,後者用來獲取執行緒id,從而區分不同執行緒發來的請求
這次要說的是flask如何開啟多執行緒
先從app.run()這個方法看起
def run(self, host=None, port=None, debug=None, **options): from werkzeug.serving import run_simple if host is None: host = '127.0.0.1' if port is None: server_name = self.config['SERVER_NAME'] if server_name and ':' in server_name: port = int(server_name.rsplit(':', 1)[1]) else: port = 5000 if debug is not None: self.debug = bool(debug) options.setdefault('use_reloader', self.debug) options.setdefault('use_debugger', self.debug) try: run_simple(host, port, self, **options) #會進入這個函式 finally: # reset the first request information if the development server # reset normally. This makes it possible to restart the server # without reloader and that stuff from an interactive shell. self._got_first_request = False
經過判斷和設定後進入run_simple()這個函式,看下原始碼
def run_simple(hostname, port, application, use_reloader=False, use_debugger=False, use_evalex=True, extra_files=None, reloader_interval=1, reloader_type='auto', threaded=False, processes=1, request_handler=None, static_files=None, passthrough_errors=False, ssl_context=None): """Start a WSGI application. Optional features include a reloader, multithreading and fork support. This function has a command-line interface too:: python -m werkzeug.serving --help .. versionadded:: 0.5 `static_files` was added to simplify serving of static files as well as `passthrough_errors`. .. versionadded:: 0.6 support for SSL was added. .. versionadded:: 0.8 Added support for automatically loading a SSL context from certificate file and private key. .. versionadded:: 0.9 Added command-line interface. .. versionadded:: 0.10 Improved the reloader and added support for changing the backend through the `reloader_type` parameter. See :ref:`reloader` for more information. :param hostname: The host for the application. eg: ``'localhost'`` :param port: The port for the server. eg: ``8080`` :param application: the WSGI application to execute :param use_reloader: should the server automatically restart the python process if modules were changed? :param use_debugger: should the werkzeug debugging system be used? :param use_evalex: should the exception evaluation feature be enabled? :param extra_files: a list of files the reloader should watch additionally to the modules. For example configuration files. :param reloader_interval: the interval for the reloader in seconds. :param reloader_type: the type of reloader to use. The default is auto detection. Valid values are ``'stat'`` and ``'watchdog'``. See :ref:`reloader` for more information. :param threaded: should the process handle each request in a separate thread? :param processes: if greater than 1 then handle each request in a new process up to this maximum number of concurrent processes. :param request_handler: optional parameter that can be used to replace the default one. You can use this to replace it with a different :class:`~BaseHTTPServer.BaseHTTPRequestHandler` subclass. :param static_files: a list or dict of paths for static files. This works exactly like :class:`SharedDataMiddleware`, it's actually just wrapping the application in that middleware before serving. :param passthrough_errors: set this to `True` to disable the error catching. This means that the server will die on errors but it can be useful to hook debuggers in (pdb etc.) :param ssl_context: an SSL context for the connection. Either an :class:`ssl.SSLContext`, a tuple in the form ``(cert_file, pkey_file)``, the string ``'adhoc'`` if the server should automatically create one, or ``None`` to disable SSL (which is the default). """ if not isinstance(port, int): raise TypeError('port must be an integer') if use_debugger: from werkzeug.debug import DebuggedApplication application = DebuggedApplication(application, use_evalex) if static_files: from werkzeug.wsgi import SharedDataMiddleware application = SharedDataMiddleware(application, static_files) def log_startup(sock): display_hostname = hostname not in ('', '*') and hostname or 'localhost' if ':' in display_hostname: display_hostname = '[%s]' % display_hostname quit_msg = '(Press CTRL+C to quit)' port = sock.getsockname()[1] _log('info', ' * Running on %s://%s:%d/ %s', ssl_context is None and 'http' or 'https', display_hostname, port, quit_msg) def inner(): try: fd = int(os.environ['WERKZEUG_SERVER_FD']) except (LookupError, ValueError): fd = None srv = make_server(hostname, port, application, threaded, processes, request_handler, passthrough_errors, ssl_context, fd=fd) if fd is None: log_startup(srv.socket) srv.serve_forever() if use_reloader: # If we're not running already in the subprocess that is the # reloader we want to open up a socket early to make sure the # port is actually available. if os.environ.get('WERKZEUG_RUN_MAIN') != 'true': if port == 0 and not can_open_by_fd: raise ValueError('Cannot bind to a random port with enabled ' 'reloader if the Python interpreter does ' 'not support socket opening by fd.') # Create and destroy a socket so that any exceptions are # raised before we spawn a separate Python interpreter and # lose this ability. address_family = select_ip_version(hostname, port) s = socket.socket(address_family, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(get_sockaddr(hostname, port, address_family)) if hasattr(s, 'set_inheritable'): s.set_inheritable(True) # If we can open the socket by file descriptor, then we can just # reuse this one and our socket will survive the restarts. if can_open_by_fd: os.environ['WERKZEUG_SERVER_FD'] = str(s.fileno()) s.listen(LISTEN_QUEUE) log_startup(s) else: s.close() # Do not use relative imports, otherwise "python -m werkzeug.serving" # breaks. from werkzeug._reloader import run_with_reloader run_with_reloader(inner, extra_files, reloader_interval, reloader_type) else: inner() #預設會執行
還是經過一系列判斷後預設會進入inner()函式,這個函式定義在run_simple()內,屬於閉包,inner()中會執行make_server()這個函式,看下原始碼:
def make_server(host=None, port=None, app=None, threaded=False, processes=1, request_handler=None, passthrough_errors=False, ssl_context=None, fd=None): """Create a new server instance that is either threaded, or forks or just processes one request after another. """ if threaded and processes > 1: raise ValueError("cannot have a multithreaded and " "multi process server.") elif threaded: return ThreadedWSGIServer(host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd) elif processes > 1: return ForkingWSGIServer(host, port, app, processes, request_handler, passthrough_errors, ssl_context, fd=fd) else: return BaseWSGIServer(host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd)
看到這也很明白了,想要配置多執行緒或者多程序,則需要設定threaded或processes這兩個引數,而這兩個引數是從app.run()中傳遞過來的:
app.run(**options) ---> run_simple(threaded,processes) ---> make_server(threaded,processes)
預設情況下flask是單執行緒,單程序的,想要開啟只需要在run中傳入對應的引數:app.run(threaded=True)即可.
從make_server中可知,flask提供了三種server:ThreadedWSGIServer,ForkingWSGIServer,BaseWSGIServer,預設情況下是BaseWSGIServer
以執行緒為例,看下ThreadedWSGIServer這個類:
class ThreadedWSGIServer(ThreadingMixIn, BaseWSGIServer): #繼承自ThreadingMixIn, BaseWSGIServer
"""A WSGI server that does threading."""
multithread = True
daemon_threads = True
ThreadingMixIn = socketserver.ThreadingMixIn
class ThreadingMixIn:
"""Mix-in class to handle each request in a new thread."""
# Decides how threads will act upon termination of the
# main process
daemon_threads = False
def process_request_thread(self, request, client_address):
"""Same as in BaseServer but as a thread.
In addition, exception handling is done here.
"""
try:
self.finish_request(request, client_address)
self.shutdown_request(request)
except:
self.handle_error(request, client_address)
self.shutdown_request(request)
def process_request(self, request, client_address):
"""Start a new thread to process the request."""
t = threading.Thread(target = self.process_request_thread,
args = (request, client_address))
t.daemon = self.daemon_threads
t.start()
process_request就是對每個請求產生一個新的執行緒來處理
最後寫一個非常簡單的應用來驗證以上說法:
from flask import Flask
from flask import _request_ctx_stack
app = Flask(__name__)
@app.route('/')
def index():
print(_request_ctx_stack._local.__ident_func__())
while True:
pass
return '<h1>hello</h1>'
app.run() #如果需要開啟多執行緒則app.run(threaded=True)
_request_ctx_stack._local.__ident_func__()對應這get_ident()這個函式,返回當前執行緒id,為什麼要在後面加上while True這句呢,我們看下get_ident()這個函式的說明:
Return a non-zero integer that uniquely identifies the current thread amongst other threads that exist simultaneously. This may be used to identify per-thread resources. Even though on some platforms threads identities may appear to be allocated consecutive numbers starting at 1, this behavior should not be relied upon, and the number should be seen purely as a magic cookie. A thread's identity may be reused for another thread after it exits.
關鍵字我已經加粗了,執行緒id會線上程結束後重複利用,所以我在路由函式中加了這個死迴圈來阻塞請求以便於觀察到不同的id,這就會產生兩種情況:
1.沒開啟多執行緒的情況下,一次請求過來,伺服器直接阻塞,並且之後的其他請求也都阻塞
2.開啟多執行緒情況下,每次都會打印出不同的執行緒id
結果:
---------------------
第一種情況
>>> * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
>>>139623180527360
---------------------
第二種情況
>>> * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
>>>140315469436672
>>>140315477829376
>>>140315486222080
>>>140315316901632
>>>140315105163008
>>>140315096770304
>>>140315088377600
結果顯而易見
綜上所述:flask支援多執行緒,但預設沒開啟,其次app.run()只適用於開發環境,生產環境下可以使用uWSGI,Gunicorn等web伺服器