python非阻塞伺服器實現
不多解釋,直接上程式碼,是我們之前做戰鬥伺服器時搞得,翻出來分享一下,把業務那塊的程式碼去掉了,只貼出核心程式碼
程式入口:
#!/usr/bin/env python # encoding: utf-8 """ Server.py Create by sunshaolei at 12-9-3 Email: [email protected] """ import sys from servercore.Client import Client from servercore.HandlerPool import HandlerPool class Server(): """ """ def __init__(self, host, port): """ """ self.client = Client(host, port) self.handlerPool = HandlerPool(1) def start(self): """ """ self.client.start() self.handlerPool.start() self.handlerPool.join() server = Server(sys.argv[1], sys.argv[2]) server.start()
客戶端連線:
#!/usr/bin/env python # encoding: utf-8 """ Client.py Create by sunshaolei at 12-9-3 Email: [email protected] """ import socket import select import threading import struct import copy import time from buffer.DataBase import DataBase from buffer.Sequence import Sequence from utils.Util import Util class Client(threading.Thread): """ """ MAX_CLIENT = 1024 * 2 MAX_BUFFER = 1024*10240 RECV_LEN = 1024 ENDIAN = DataBase.HIGHT_ENDIAN def __init__(self, host, port): """ """ threading.Thread.__init__(self, name='Client') self.client_buffer = {} # 收取資料buff self.sequence = Sequence() self.host = host self.port = int(port) self.sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) self.sock.bind((self.host, self.port)) self.sock.setblocking(0) self.sock.listen(Client.MAX_CLIENT) self.onlineSocketList = [self.sock] def run(self): """ """ while True: infds, outfds, errfds = select.select(self.onlineSocketList, self.onlineSocketList, self.onlineSocketList) for infd in infds: if infd is self.sock: connect, addr = self.sock.accept() Util.write_log('info', 'new connect by: %s:%s' % addr) connect.setblocking(0) self.onlineSocketList.append(connect) else: try: self.recv(infd) except: self.disconnect(infd) for outfd in outfds: try: self.send(outfd) except : self.disconnect(outfd) for errfd in errfds: self.disconnect(errfd) time.sleep(0.001) def recv(self, fd): """ 接收資料處理 """ if fd not in self.client_buffer: self.client_buffer[fd] = '' if len(self.client_buffer[fd]) + Client.RECV_LEN < Client.MAX_BUFFER: buff = fd.recv(Client.RECV_LEN) if not buff: self.disconnect(fd) return self.client_buffer[fd] += buff data = self.client_buffer[fd] num = 0 # 放置錯誤的資料格式導致死迴圈的 while len(data) > 4 and num < 20: num += 1 tag = '>i' if Client.ENDIAN == 1 else 'i' data_len = struct.unpack(tag, data[0:4])[0] if (len(data)-4) >= data_len: # 滿足條件 database = DataBase(fd, data[4:data_len+4]) self.sequence.inQueue.put(database) self.client_buffer[fd] = data[data_len+4:] num = 0 # 如果有資料置為0 data = self.client_buffer[fd] def send(self, fd): """ 傳送資料 """ database_list = self.sequence.getOut(fd) if not database_list: return try: for database in database_list: database.fd.send(database.buffer) except Exception: self.disconnect(fd) def disconnect(self, fd): """ """ if fd in self.onlineSocketList: self.onlineSocketList.remove(fd) if fd in self.client_buffer: self.client_buffer.pop(fd) self.sequence.getOut(fd) fd.close()
邏輯處理:
#!/usr/bin/env python # encoding: utf-8 """ HandlerPool.py Create by sunshaolei at 12-9-3 Email: [email protected] """ import threading from servercore.Handler import Handler class HandlerPool(threading.Thread): """ """ def __init__(self, handlerNum=1): """ """ self.handlerNum = handlerNum self.threads = [] def start(self): """ """ self.createThread(self.handlerNum) self.wait_for_complete() def createThread(self, num): for i in range(num): thread = Handler(i) thread.start() self.threads.append(thread) def wait_for_complete(self): #等待所有執行緒完成。 while len(self.threads): thread = self.threads.pop() if thread.isAlive(): thread.join()
#!/usr/bin/env python
# encoding: utf-8
"""
Handler.py
Create by sunshaolei at 12-9-19
Email: [email protected]
"""
import threading
import traceback
from Game.fight_service import FightService
from buffer.Sequence import Sequence
from utils.Util import Util
class Handler(threading.Thread):
"""
"""
def __init__(self, threadId):
"""
"""
threading.Thread.__init__(self, name='Hander_%s' % threadId)
self.sequence = Sequence()
def run(self):
"""
"""
while True:
database = self.sequence.inQueue.get()
try:
# 此處為業務入口,可以根據 moduleId,actionId分發進行資料解包,並執行相關處理,此處省略了
moduleId = database.getByte()
actionId = database.getByte()
data = database.getUTF()
FightService.app_server_msg(database.fd, moduleId, actionId, data)
except:
exstr = traceback.format_exc()
Util.write_log('err', exstr)
#!/usr/bin/env python
# encoding: utf-8
"""
DataBase.py
Create by sunshaolei at 12-9-19
Email: [email protected]
"""
import struct
class DataBase(object):
"""
"""
HIGHT_ENDIAN = 1
LOW_ENDIAN = 0
def __init__(self, fd, buff=None, endian=HIGHT_ENDIAN):
"""
"""
self.fd = fd
self.pos = 0
self.endian = endian
self.buffer = buff if buff else ''
def reset(self):
"""
"""
self.pos = 0
def getInt(self):
"""
"""
tag = '>i' if self.endian == DataBase.HIGHT_ENDIAN else 'i'
result = struct.unpack(tag, self.buffer[self.pos:self.pos+4])[0]
self.pos += 4
return result
def getShort(self):
"""
"""
tag = '>h' if self.endian == DataBase.HIGHT_ENDIAN else 'h'
result = struct.unpack(tag, self.buffer[self.pos:self.pos+2])[0]
self.pos += 2
return result
def getByte(self):
"""
"""
tag = 'b'
result = struct.unpack(tag, self.buffer[self.pos:self.pos+1])[0]
self.pos += 1
return result
def getUTF(self):
"""
"""
tag = '>h' if self.endian == DataBase.HIGHT_ENDIAN else 'h'
length = struct.unpack(tag, self.buffer[self.pos:self.pos+2])[0]
self.pos += 2
tag = '%ss' % (length)
result = struct.unpack(tag, self.buffer[self.pos:self.pos+length])[0]
self.pos += length
return result
def setInt(self, value):
"""
"""
tag = '>i' if self.endian == DataBase.HIGHT_ENDIAN else 'i'
self.buffer += (struct.pack(tag, value))
return True
def setShort(self, value):
"""
"""
tag = '>h' if self.endian == DataBase.HIGHT_ENDIAN else 'h'
self.buffer += (struct.pack(tag, value))
return True
def setByte(self, value):
"""
"""
tag = 'b'
self.buffer += (struct.pack(tag, value))
return True
def setUTF(self, value):
"""
"""
tag = '>h' if self.endian == DataBase.HIGHT_ENDIAN else 'h'
length = len(value)
self.buffer += (struct.pack(tag, length))
tag = '%ss' % (length)
self.buffer += (struct.pack(tag, value))
return True
#!/usr/bin/env python
# encoding: utf-8
"""
Sequence.py
Create by sunshaolei at 12-9-19
Email: [email protected]
"""
import threading
from Queue import Queue
from utils.Singleton import Singleton
buffLock = threading.RLock()
class Sequence(Singleton):
"""
"""
def __init__(self):
"""
"""
self.out_buff = {}
self.inQueue = Queue(64)
def getOut(self, k):
"""
"""
buffLock.acquire()
data = None
if k in self.out_buff:
data = self.out_buff.pop(k)
buffLock.release()
return data
def addOut(self, k, v):
"""
"""
buffLock.acquire()
if k not in self.out_buff:
self.out_buff[k] = []
self.out_buff[k].append(v)
buffLock.release()
相關推薦
python非阻塞伺服器實現
不多解釋,直接上程式碼,是我們之前做戰鬥伺服器時搞得,翻出來分享一下,把業務那塊的程式碼去掉了,只貼出核心程式碼 程式入口: #!/usr/bin/env python # encoding: utf-8 """ Server.py Create by sunsha
python非阻塞式單程序伺服器
python的單程序伺服器一次只能處理一個客戶端,顯然是沒有實用價值的,但是我們可以將單程序伺服器變為非阻塞式的。 利用socket中的setblocking()方法可以將tcp套接字轉化為非阻塞式套
java NIO實現同步非阻塞伺服器
server package net.smgui.util; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.Byt
python-非阻塞IO
recv code conn waiting res import client wait bin client端: # import time # import socket # sk = socket.socket(socket.AF_INET,socket.SOCK
非阻塞伺服器需要注意的主要問題(譯)
非阻塞伺服器有一個嚴重的問題,一些人甚至在沒解決這個問題的背景下就開發自己的應用框架(比如Python的Tornado) 當你使用非阻塞伺服器的時候,你會獲得出色的效能並且不需要擔心可擴充套件性,然而同時你需要意識到一個問題:你的IO呼叫、網路系統呼叫也都是非阻塞的嗎?很
單程序非阻塞伺服器
程序的執行取決於排程演算法,多個程序就是多個資源,程序是資源分配的單位,用多程序在客戶端訪問伺服器中是很常見的應用。下面我們介紹一種單程序實現客戶端訪問伺服器的方法,其基本思想就是雖然使用單執行緒但是
python:非阻塞或非同步程式設計
例如,對於一個聊天室來說,因為有多個連線需要同時被處理,所以很顯然,阻塞或同步的方法是不合適的,這就像買票只開了一個視窗,佷多人排隊等一樣。那麼我們如何解決這個問題呢?主要有三種方法:forking、threading、非同步I/O。 Forking和threading的方
多執行緒高併發程式設計(11) -- 非阻塞演算法實現ConcurrentLinkedQueue原始碼分析
一.背景 要實現對佇列的安全訪問,有兩種方式:阻塞演算法和非阻塞演算法。阻塞演算法的實現是使用一把鎖(出隊和入隊同一把鎖ArrayBlockingQueue)和兩把鎖(出隊和入隊各一把鎖LinkedBlockingQueue)來實現;非阻塞演算法使用自旋+CAS實現。 今天來探究下使用非阻塞演算法
利用Python中SocketServer 實現客戶端與伺服器間非阻塞通訊
利用SocketServer模組來實現網路客戶端與伺服器併發連線非阻塞通訊。 首先,先了解下SocketServer模組中可供使用的類: BaseServer:包含伺服器的核心功能與混合(mix-in)類掛鉤;這個類只用於派生,所以不會生成這個類的例項;可以考慮使用TCPS
基於非阻塞socket的多執行緒伺服器的實現------一個伺服器如何與多個客戶端進行通訊?
我們首先來看服務端(涉及非阻塞socket和多執行緒): #include <stdio.h> #include <winsock2.h> #include <windows.h> #pragma comment(li
linux下socket程式設計 select實現非阻塞模式多臺客戶端與伺服器通訊
select函式原型如下: int select (int maxfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); select系統呼叫是用來讓我們的程式
異步非阻塞socket的實現
print except 事件循環 port int 性能 run utf8 try 在學習使用scrapy爬蟲框架之前,需要了解一些基礎原理 我們知道HTTP請求是基於socket模塊進行發送和接受的,但是socket套接字的在使用的中存在著阻塞,不利用爬蟲的高性能運
Python web框架 Tornado(二)異步非阻塞
.py thread bind log class multiple fin ini lex 異步非阻塞 阻塞式:(適用於所有框架,Django,Flask,Tornado,Bottle) 一個請求到來未處理完成,後續一直等待 解決方案:多線程,多進程 異步
Python的異步編程[0] -> 協程[1] -> 使用協程建立自己的異步非阻塞模型
.net post this fab htm true 底層實現 自己 print 使用協程建立自己的異步非阻塞模型 接下來例子中,將使用純粹的Python編碼搭建一個異步模型,相當於自己構建的一個asyncio模塊,這也許能對asyncio模塊底層實現的理解有更大的
非阻塞套接字實現並發處理
pre ror enc con put 服務 生成 import data 服務端 import socket server = socket.socket() server.setblocking(False) server.bind((‘0.0.0.0‘,8080)
python學習筆記之四-多進程&多線程&異步非阻塞
running executor 服務器 RoCE 進行 break python buffer 創建 ProcessPoolExecutor對multiprocessing進行了高級抽象,暴露出簡單的統一接口。 異步非阻塞 爬蟲 對於異步IO請求的本質則是【非阻塞So
scala通過akka的actor實現socket http server(NIO非阻塞模式)
1首先是sbt需要匯入的依賴 name := "HttpServer" version := "1.0" scalaVersion := "2.11.8" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-act
c/c++ llinux epoll系列4 利用epoll_wait實現非阻塞的connect
llinux epoll系列4 利用epoll_wait實現非阻塞的connect connect函式是阻塞的,而且不能設定connect函式的timeout時間,所以一旦阻塞太長時間,影響使用者的體驗,所以就出來一個需求,硬要設定connect的timeout時間。 實現方法:先把connect函式變成
Java入門系列-25-NIO(實現非阻塞網絡通信)
寫入 eve asn accept public int 次數 客戶端 服務器 還記得之前介紹NIO時對比傳統IO的一大特點嗎?就是NIO是非阻塞式的,這篇文章帶大家來看一下非阻塞的網絡操作。 補充:以數組的形式使用緩沖區 package testnio; import
基於Java NIO2實現的非同步非阻塞訊息通訊框架
原文傳送門 基於Java NIO2實現的非同步非阻塞訊息通訊框架 前奏 AIO應用開發 Future方式 Callback方式 Reader/Writer方式實現 執行緒池和Group PendingExceptio