400行python 教你寫個高效能 http伺服器+web框架,效能秒勝tornado django webpy uwsgi
阿新 • • 發佈:2018-12-30
tornado 4kqps 多程序 1w
nginx+tornado 9kqps
nginx+uwsgi 8kqps (注意:沒說比nginx快,只是這幾個web框架不行)
本server 3.2w qps
沒用任何python加速
不相信的可以自己壓測下哦
已經真實使用到自己的多個專案之中,效果明顯
有需要優化的地方或者建議歡迎聯絡 qq 512284622
簡單說下原理 多程序+多執行緒的 半同步半非同步模式
相信很多朋友看過memcache libevent的會問:
為什麼不一個執行緒負責監聽,另外執行緒負責讀寫,中間再用管道進行通訊,
這是因為:
1、epoll 的所有事件包括 add modify 和wait 都是執行緒安全的,可以多個執行緒同時對一個epoll進行操作,
所以這裡我們可以業務執行緒處理完後立即modify epoll事件,而不用通過佇列管道等,都放到一個執行緒去操作epoll事件
2、linux現在的epoll 已經解決驚群問題,可以多個程序多個epoll同時對一個sock進行epollwait,
而不用只用一個程序或執行緒去單純負責監聽
因此現在不需要libevent 那種一個執行緒負責監聽 另外執行緒負責處理讀寫,中間通過管道互動的複雜方式,程式碼簡潔。
另外感謝python強大庫 所以才能做到400多行完成一個非同步高效能http伺服器。
使用者文件:
1、啟動:
指定監聽埠即可啟動
python server.py 8992
2、快速編寫cgi,支援執行時修改,無需重啟server
在server.py同一目錄下
隨便建一個python 檔案
例如:
example.py
定義一個tt函式:
則請求該函式的url為 http://ip:port/example.tt
修改後儲存,即可訪問,無需重啟
example.py
def tt(request,response_head):
#print request.form
#print request.getdic
#print request.postdic
#itemname = request.form["file"].filename
#value = request.form["file"].file.read()
return "ccb"+request.path
函式必須帶兩個引數
request:表示請求的資料 預設帶以下屬性
headers: 頭部 (字典)
form: multipart/form表單 (字典)
getdic: url引數 (字典)
postdic: httpbody引數 (字典)
rfile: 原始http content內容 (字串)
action: python檔名 (這裡為example)
method: 函式方法 (這裡為tt)
command: (get or post)
path: url (字串)
http_version: http版本號 (http 1.1)
response_head: 表示response內容的頭部
例如如果要返回用gzip壓縮
則增加頭部
response_head["Content-Encoding"] = "gzip"
3、下載檔案
預設靜態檔案放在static資料夾下
例如把a.jpg放到static資料夾下
訪問的url為 http://ip:port/static/a.jpg
支援etag 客戶端快取功能
支援range 支援斷點續傳
(server 使用sendfile進行檔案傳送,不佔記憶體且快速)
4、支援網頁模板編寫
建立一個模板 template.html
<HTML>
<HEAD><TITLE>$title</TITLE></HEAD>
<BODY>
$contents
</BODY>
</HTML>
則對應的函式:
def template(request,response_head):
t = Template(file="template.html")
t.title = "my title"
t.contents = "my contents"
return str(t)
模板實現使用了python最快速Cheetah開源模板,
效能約為webpy django thinkphp等模板的10倍以上:
import os
import imp
import sys
import time
import gzip
from StringIO import StringIO
from Cheetah.Template import Template
def tt(request,response_head):
#print request.form
#print request.getdic
#print request.postdic
return "ccb"+request.path
def getdata(request,response_head):
f=open("a.txt")
content = f.read()
f.close()
response_head["Content-Encoding"] = "gzip"
return content
def template(request,response_head):
t = Template(file="template.html")
t.title = "my title"
t.contents = "my contents"
response_head["Content-Encoding"] = "gzip"
nginx+tornado 9kqps
nginx+uwsgi 8kqps (注意:沒說比nginx快,只是這幾個web框架不行)
本server 3.2w qps
沒用任何python加速
不相信的可以自己壓測下哦
已經真實使用到自己的多個專案之中,效果明顯
有需要優化的地方或者建議歡迎聯絡 qq 512284622
簡單說下原理 多程序+多執行緒的 半同步半非同步模式
相信很多朋友看過memcache libevent的會問:
為什麼不一個執行緒負責監聽,另外執行緒負責讀寫,中間再用管道進行通訊,
這是因為:
1、epoll 的所有事件包括 add modify 和wait 都是執行緒安全的,可以多個執行緒同時對一個epoll進行操作,
所以這裡我們可以業務執行緒處理完後立即modify epoll事件,而不用通過佇列管道等,都放到一個執行緒去操作epoll事件
2、linux現在的epoll 已經解決驚群問題,可以多個程序多個epoll同時對一個sock進行epollwait,
而不用只用一個程序或執行緒去單純負責監聽
因此現在不需要libevent 那種一個執行緒負責監聽 另外執行緒負責處理讀寫,中間通過管道互動的複雜方式,程式碼簡潔。
另外感謝python強大庫 所以才能做到400多行完成一個非同步高效能http伺服器。
使用者文件:
1、啟動:
指定監聽埠即可啟動
python server.py 8992
2、快速編寫cgi,支援執行時修改,無需重啟server
在server.py同一目錄下
隨便建一個python 檔案
例如:
example.py
定義一個tt函式:
則請求該函式的url為 http://ip:port/example.tt
修改後儲存,即可訪問,無需重啟
example.py
def tt(request,response_head):
#print request.form
#print request.getdic
#print request.postdic
#itemname = request.form["file"].filename
#value = request.form["file"].file.read()
return "ccb"+request.path
函式必須帶兩個引數
request:表示請求的資料 預設帶以下屬性
headers: 頭部 (字典)
form: multipart/form表單 (字典)
getdic: url引數 (字典)
postdic: httpbody引數 (字典)
rfile: 原始http content內容 (字串)
action: python檔名 (這裡為example)
method: 函式方法 (這裡為tt)
command: (get or post)
path: url (字串)
http_version: http版本號 (http 1.1)
response_head: 表示response內容的頭部
例如如果要返回用gzip壓縮
則增加頭部
response_head["Content-Encoding"] = "gzip"
3、下載檔案
預設靜態檔案放在static資料夾下
例如把a.jpg放到static資料夾下
訪問的url為 http://ip:port/static/a.jpg
支援etag 客戶端快取功能
支援range 支援斷點續傳
(server 使用sendfile進行檔案傳送,不佔記憶體且快速)
4、支援網頁模板編寫
建立一個模板 template.html
<HTML>
<HEAD><TITLE>$title</TITLE></HEAD>
<BODY>
$contents
</BODY>
</HTML>
則對應的函式:
def template(request,response_head):
t = Template(file="template.html")
t.title = "my title"
t.contents = "my contents"
return str(t)
模板實現使用了python最快速Cheetah開源模板,
效能約為webpy django thinkphp等模板的10倍以上:
http://my.oschina.net/whp/blog/112296
example.pyimport os
import imp
import sys
import time
import gzip
from StringIO import StringIO
from Cheetah.Template import Template
def tt(request,response_head):
#print request.form
#print request.getdic
#print request.postdic
return "ccb"+request.path
def getdata(request,response_head):
f=open("a.txt")
content = f.read()
f.close()
response_head["Content-Encoding"] = "gzip"
return content
def template(request,response_head):
t = Template(file="template.html")
t.title = "my title"
t.contents = "my contents"
response_head["Content-Encoding"] = "gzip"
return str(t)
#!/usr/bin/python
#-*- coding:utf-8 -*-
# Author : hemingzhe <[email protected]>; xiaorixin<[email protected]>
import socket, logging
import select, errno
import os
import sys
import traceback
import gzip
from StringIO import StringIO
import Queue
import threading
import time
import thread
import cgi
from cgi import parse_qs
import json
import imp
from os.path import join, getsize
import md5
import re
logger = logging.getLogger("network-server")
action_dic = {}
action_time = {}
listfile = os.listdir("./")
for l in listfile:
prefixname, extname = os.path.splitext(l)
if extname == ".py":
action = __import__(prefixname)
mtime = os.path.getmtime(l)
action_time[prefixname] = mtime
action_dic[prefixname] = action
static_file_dir = "static"
static_dir = "/%s/" % static_file_dir
cache_static_dir = "cache_%s" % static_file_dir
if not os.path.exists(cache_static_dir):
os.makedirs(cache_static_dir)
filedic = {"HTM":None,"HTML":None,"CSS":None,"JS":None,"TXT":None,"XML":None}
def getTraceStackMsg():
tb = sys.exc_info()[2]
msg = ''
for i in traceback.format_tb(tb):
msg += i
return msg
def md5sum(fobj):
m = md5.new()
while True:
d = fobj.read(65536)
if not d:
break
m.update(d)
return m.hexdigest()
class QuickHTTPRequest():
def __init__(self, data):
headend = data.find("\r\n\r\n")
rfile = ""
if headend > 0:
rfile = data[headend+4:]
headlist = data[0:headend].split("\r\n")
else:
headlist = data.split("\r\n")
self.rfile = StringIO(rfile)
first_line = headlist.pop(0)
self.command, self.path, self.http_version = re.split('\s+', first_line)
indexlist = self.path.split('?')
self.baseuri = indexlist[0]
indexlist = self.baseuri.split('/')
while len(indexlist) != 0:
self.index = indexlist.pop()
if self.index == "":
continue
else:
self.action,self.method = os.path.splitext(self.index)
self.method = self.method.replace('.', '')
break
self.headers = {}
for item in headlist:
if item.strip() == "":
continue
segindex = item.find(":")
if segindex < 0:
continue
key = item[0:segindex].strip()
value = item[segindex+1:].strip()
self.headers[key] = value
c_low = self.command.lower()
self.getdic = None
self.form = None
self.postdic = None
if c_low == "get" and "?" in self.path:
self.getdic = parse_qs(self.path.split("?").pop())
elif c_low == "post" and self.headers.get('Content-Type',"").find("boundary") > 0:
self.form = cgi.FieldStorage(fp=self.rfile,headers=None,
environ={'REQUEST_METHOD':self.command,'CONTENT_TYPE':self.headers['Content-Type'],})
if self.form == None:
self.form = {}
elif c_low == "post":
self.postdic = parse_qs(rfile)
def sendfilejob(request, data, epoll_fd, fd):
try:
base_filename = request.baseuri[request.baseuri.find(static_dir)+1:]
cache_filename = "./cache_"+base_filename
filename = "./"+base_filename
if not os.path.exists(filename):
res = "file not found"
data["writedata"] = "HTTP/1.1 200 OK\r\nContent-Length: %s\r\nConnection:keep-alive\r\n\r\n%s" % (len(res),res)
else:
lasttimestr = request.headers.get("If-Modified-Since", None)
filemd5 = os.path.getmtime(filename)
timestr = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(filemd5))
curtime = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(time.time()))
sock = data["connections"]
if lasttimestr == timestr and "Range" not in request.headers:
data["writedata"] = "HTTP/1.1 304 Not Modified\r\nLast-Modified: %s\r\nETag: \"%s\"\r\nDate: %s\r\nConnection:keep-alive\r\n\r\n" % (timestr,filemd5,curtime)
else:
ext = request.method
iszip = False
Accept_Encoding = request.headers.get("Accept-Encoding", "")
if ext.upper() in filedic or (ext == "" and "gzip" in Accept_Encoding):
if not os.path.exists(cache_filename) or \
os.path.getmtime(cache_filename) < float(filemd5):
d,f = os.path.split(cache_filename)
try:
if not os.path.exists(d):
os.makedirs(d)
f_out = gzip.open(cache_filename, 'wb')
f_out.write(open(filename).read())
f_out.close()
except Exception, e:
print str(e)
pass
filename = cache_filename
iszip = True
filesize = os.path.getsize(filename)
if "Range" in request.headers:
range_value = request.headers["Range"].strip(' \r\n')
range_value = range_value.replace("bytes=", "")
start,end = range_value.split('-')
if end == '':
end = filesize - 1
start = int(start)
end = int(end)
headstr = "HTTP/1.1 206 Partial Content\r\nLast-Modified: %s\r\nETag: \"%s\"\r\nDate: %s\r\n" % (timestr,filemd5,curtime)
headstr += "Accept-Ranges: bytes\r\nContent-Range: bytes %s-%s/%s\r\n" % (start,end,filesize)
else:
start = 0
end = filesize - 1
headstr = "HTTP/1.1 200 OK\r\nLast-Modified: %s\r\nETag: \"%s\"\r\nDate: %s\r\n" % (timestr,filemd5,curtime)
offset = start
totalsenlen = end - start + 1
if totalsenlen < 0:
totalsenlen = 0
if iszip:
headstr += "Content-Encoding: gzip\r\n"
headstr += "Content-Length: %s\r\nConnection:keep-alive\r\n" % totalsenlen
headstr += "\r\n"
f = open(filename)
f.seek(offset)
readlen = 102400
if readlen > totalsenlen:
readlen = totalsenlen
firstdata = f.read(readlen)
headstr += firstdata
totalsenlen -= len(firstdata)
data["f"] = f
data["totalsenlen"] = totalsenlen
data["writedata"] = headstr
except Exception, e:
print str(e)+getTraceStackMsg()
data["writedata"] = "file not found"
pass
try:
data["readdata"] = ""
epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT | select.EPOLLERR | select.EPOLLHUP)
except Exception, e:
#print str(e)+getTraceStackMsg()
pass
class Worker(object):
def __init__(self):
pass
def process(self, data, epoll_fd, fd):
res = ""
add_head = ""
try:
request = QuickHTTPRequest(data["readdata"])
except Exception, e:
res = "http format error"
try:
headers = {}
headers["Content-Type"] = "text/html;charset=utf-8"
headers["Connection"] = "keep-alive"
if request.path == "/favicon.ico":
request.path = "/"+static_file_dir+request.path
if static_dir in request.path or "favicon.ico" in request.path:
sendfilejob(request,data,epoll_fd,fd)
return None
action = action_dic.get(request.action, None)
if action == None:
action = __import__(request.action)
mtime = os.path.getmtime("./%s.py" % request.action)
action_time[request.action] = mtime
action_dic[request.action] = action
else:
load_time = action_time[request.action]
mtime = os.path.getmtime("./%s.py" % request.action)
if mtime>load_time:
action = reload(sys.modules[request.action])
action_time[request.action] = mtime
action_dic[request.action] = action
method = getattr(action, request.method)
res = method(request, headers)
if headers.get("Content-Encoding","") == "gzip":
buf = StringIO()
f = gzip.GzipFile(mode='wb', fileobj=buf)
f.write(res)
f.close()
res = buf.getvalue()
except Exception, e:
logger.error(str(e)+getTraceStackMsg())
res = "page no found"
try:
if headers.get("Connection","") != "close":
data["keepalive"] = True
res_len = len(res)
headers["Content-Length"] = res_len
for key in headers:
add_head += "%s: %s\r\n" % (key, headers[key])
data["writedata"] = "HTTP/1.1 200 OK\r\n%s\r\n%s" % (add_head, res)
data["readdata"] = ""
epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT | select.EPOLLERR | select.EPOLLHUP)
except Exception, e:
print str(e)+getTraceStackMsg()
def InitLog():
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler("network-server.log")
fh.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
ch.setFormatter(formatter)
fh.setFormatter(formatter)
logger.addHandler(fh)
logger.addHandler(ch)
class MyThread(threading.Thread):
ind = 0
def __init__(self, threadCondition, shareObject, **kwargs):
threading.Thread.__init__(self, kwargs=kwargs)
self.threadCondition = threadCondition
self.shareObject = shareObject
self.setDaemon(True)
self.worker = Worker()
def processer(self, args, kwargs):
try:
param = args[0]
epoll_fd = args[1]
fd = args[2]
self.worker.process(param, epoll_fd, fd)
except:
print "job error:" + getTraceStackMsg()
def run(self):
while True:
try:
args, kwargs = self.shareObject.get()
self.processer(args, kwargs)
except Queue.Empty:
continue
except :
print "thread error:" + getTraceStackMsg()
class ThreadPool:
def __init__( self, num_of_threads=10):
self.threadCondition=threading.Condition()
self.shareObject=Queue.Queue()
self.threads = []
self.__createThreadPool( num_of_threads )
def __createThreadPool( self, num_of_threads ):
for i in range( num_of_threads ):
thread = MyThread( self.threadCondition, self.shareObject)
self.threads.append(thread)
def start(self):
for thread in self.threads:
thread.start()
def add_job( self, *args, **kwargs ):
self.shareObject.put( (args,kwargs) )
def run_main(listen_fd):
try:
epoll_fd = select.epoll()
epoll_fd.register(listen_fd.fileno(), select.EPOLLIN | select.EPOLLET | select.EPOLLERR | select.EPOLLHUP)
except select.error, msg:
logger.error(msg)
tp = ThreadPool(20)
tp.start()
params = {}
def clearfd(fd):
try:
param = params[fd]
epoll_fd.unregister(fd)
param["connections"].close()
f = param.get("f", None)
if f != None:
f.close()
except Exception, e:
pass
try:
del params[fd]
except Exception, e:
pass
last_min_time = -1
while True:
epoll_list = epoll_fd.poll()
for fd, events in epoll_list:
cur_time = time.time()
if fd == listen_fd.fileno():
while True:
try:
conn, addr = listen_fd.accept()
conn.setblocking(0)
epoll_fd.register(conn.fileno(), select.EPOLLIN | select.EPOLLET | select.EPOLLERR | select.EPOLLHUP)
conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
#conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
params[conn.fileno()] = {"addr":addr,"writelen":0, "connections":conn, "time":cur_time}
except socket.error, msg:
break
elif select.EPOLLIN & events:
param = params.get(fd,None)
if param == None:
continue
param["time"] = cur_time
datas = param.get("readdata","")
cur_sock = params[fd]["connections"]
while True:
try:
data = cur_sock.recv(102400)
if not data:
clearfd(fd)
break
else:
datas += data
except socket.error, msg:
if msg.errno == errno.EAGAIN:
param["readdata"] = datas
len_s = -1
len_e = -1
contentlen = -1
headlen = -1
len_s = datas.find("Content-Length:")
if len_s > 0:
len_e = datas.find("\r\n", len_s)
if len_s > 0 and len_e > 0 and len_e > len_s+15:
len_str = datas[len_s+15:len_e].strip()
if len_str.isdigit():
contentlen = int(datas[len_s+15:len_e].strip())
headend = datas.find("\r\n\r\n")
if headend > 0:
headlen = headend + 4
data_len = len(datas)
if (contentlen > 0 and headlen > 0 and (contentlen + headlen) == data_len) or \
(contentlen == -1 and headlen == data_len):
tp.add_job(param,epoll_fd,fd)
break
else:
clearfd(fd)
break
elif select.EPOLLHUP & events or select.EPOLLERR & events:
clearfd(fd)
logger.error("sock: %s error" % fd)
elif select.EPOLLOUT & events:
param = params.get(fd,None)
if param == None:
continue
param["time"] = cur_time
sendLen = param.get("writelen",0)
writedata = param.get("writedata", "")
total_write_len = len(writedata)
cur_sock = param["connections"]
f = param.get("f", None)
totalsenlen = param.get("totalsenlen", None)
if writedata == "":
clearfd(fd)
continue
while True:
try:
sendLen += cur_sock.send(writedata[sendLen:])
if sendLen == total_write_len:
if f != None and totalsenlen != None:
readmorelen = 102400
if readmorelen > totalsenlen:
readmorelen = totalsenlen
morefiledata = ""
if readmorelen > 0:
morefiledata = f.read(readmorelen)
if morefiledata != "":
writedata = morefiledata
sendLen = 0
total_write_len = len(writedata)
totalsenlen -= total_write_len
param["writedata"] = writedata
param["totalsenlen"] = totalsenlen
continue
else:
f.close()
del param["f"]
del param["totalsenlen"]
if param.get("keepalive", True):
param["readdata"] = ""
param["writedata"] = ""
param["writelen"] = 0
epoll_fd.modify(fd, select.EPOLLET | select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP)
else:
clearfd(fd)
break
except socket.error, msg:
if msg.errno == errno.EAGAIN:
param["writelen"] = sendLen
break
clearfd(fd)
else:
continue
#check time out
if cur_time - last_min_time > 10:
last_min_time = cur_time
objs = params.items()
for (key_fd,value) in objs:
fd_time = value.get("time", 0)
del_time = cur_time - fd_time
if del_time > 10:
clearfd(key_fd)
elif fd_time < last_min_time:
last_min_time = fd_time
if __name__ == "__main__":
reload(sys)
sys.setdefaultencoding('utf8')
InitLog()
port = int(sys.argv[1])
try:
listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
except socket.error, msg:
logger.error("create socket failed")
try:
listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
except socket.error, msg:
logger.error("setsocketopt SO_REUSEADDR failed")
try:
listen_fd.bind(('', port))
except socket.error, msg:
logger.error("bind failed")
try:
listen_fd.listen(1024)
listen_fd.setblocking(0)
except socket.error, msg:
logger.error(msg)
child_num = 8
c = 0
while c < child_num:
c = c + 1
newpid = os.fork()
if newpid == 0:
run_main(listen_fd)
run_main(listen_fd)