C#高效能Socket伺服器SocketAsyncEventArgs的實現(IOCP)
阿新 • • 發佈:2019-01-04
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Net.Sockets; using System.Net; using System.Threading; namespace ServerTest { /// <summary> /// IOCP SOCKET伺服器 /// </summary> public class IOCPServer : IDisposable { const int opsToPreAlloc = 2; #region Fields /// <summary> /// 伺服器程式允許的最大客戶端連線數 /// </summary> private int _maxClient; /// <summary> /// 監聽Socket,用於接受客戶端的連線請求 /// </summary> private Socket _serverSock; /// <summary> /// 當前的連線的客戶端數 /// </summary> private int _clientCount; /// <summary> /// 用於每個I/O Socket操作的緩衝區大小 /// </summary> private int _bufferSize = 1024; /// <summary> /// 訊號量 /// </summary> Semaphore _maxAcceptedClients; /// <summary> /// 緩衝區管理 /// </summary> BufferManager _bufferManager; /// <summary> /// 物件池 /// </summary> SocketAsyncEventArgsPool _objectPool; private bool disposed = false; #endregion #region Properties /// <summary> /// 伺服器是否正在執行 /// </summary> public bool IsRunning { get; private set; } /// <summary> /// 監聽的IP地址 /// </summary> public IPAddress Address { get; private set; } /// <summary> /// 監聽的埠 /// </summary> public int Port { get; private set; } /// <summary> /// 通訊使用的編碼 /// </summary> public Encoding Encoding { get; set; } #endregion #region Ctors /// <summary> /// 非同步IOCP SOCKET伺服器 /// </summary> /// <param name="listenPort">監聽的埠</param> /// <param name="maxClient">最大的客戶端數量</param> public IOCPServer(int listenPort,int maxClient) : this(IPAddress.Any, listenPort, maxClient) { } /// <summary> /// 非同步Socket TCP伺服器 /// </summary> /// <param name="localEP">監聽的終結點</param> /// <param name="maxClient">最大客戶端數量</param> public IOCPServer(IPEndPoint localEP, int maxClient) : this(localEP.Address, localEP.Port,maxClient) { } /// <summary> /// 非同步Socket TCP伺服器 /// </summary> /// <param name="localIPAddress">監聽的IP地址</param> /// <param name="listenPort">監聽的埠</param> /// <param name="maxClient">最大客戶端數量</param> public IOCPServer(IPAddress localIPAddress, int listenPort, int maxClient) { this.Address = localIPAddress; this.Port = listenPort; this.Encoding = Encoding.Default; _maxClient = maxClient; _serverSock = new Socket(localIPAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp); _bufferManager = new BufferManager(_bufferSize * _maxClient * opsToPreAlloc,_bufferSize); _objectPool = new SocketAsyncEventArgsPool(_maxClient); _maxAcceptedClients = new Semaphore(_maxClient, _maxClient); } #endregion #region 初始化 /// <summary> /// 初始化函式 /// </summary> public void Init() { // Allocates one large byte buffer which all I/O operations use a piece of. This gaurds // against memory fragmentation _bufferManager.InitBuffer(); // preallocate pool of SocketAsyncEventArgs objects SocketAsyncEventArgs readWriteEventArg; for (int i = 0; i < _maxClient; i++) { //Pre-allocate a set of reusable SocketAsyncEventArgs readWriteEventArg = new SocketAsyncEventArgs(); readWriteEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(OnIOCompleted); readWriteEventArg.UserToken = null; // assign a byte buffer from the buffer pool to the SocketAsyncEventArg object _bufferManager.SetBuffer(readWriteEventArg); // add SocketAsyncEventArg to the pool _objectPool.Push(readWriteEventArg); } } #endregion #region Start /// <summary> /// 啟動 /// </summary> public void Start() { if (!IsRunning) { Init(); IsRunning = true; IPEndPoint localEndPoint = new IPEndPoint(Address, Port); // 建立監聽socket _serverSock = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); //_serverSock.ReceiveBufferSize = _bufferSize; //_serverSock.SendBufferSize = _bufferSize; if (localEndPoint.AddressFamily == AddressFamily.InterNetworkV6) { // 配置監聽socket為 dual-mode (IPv4 & IPv6) // 27 is equivalent to IPV6_V6ONLY socket option in the winsock snippet below, _serverSock.SetSocketOption(SocketOptionLevel.IPv6, (SocketOptionName)27, false); _serverSock.Bind(new IPEndPoint(IPAddress.IPv6Any, localEndPoint.Port)); } else { _serverSock.Bind(localEndPoint); } // 開始監聽 _serverSock.Listen(this._maxClient); // 在監聽Socket上投遞一個接受請求。 StartAccept(null); } } #endregion #region Stop /// <summary> /// 停止服務 /// </summary> public void Stop() { if (IsRunning) { IsRunning = false; _serverSock.Close(); //TODO 關閉對所有客戶端的連線 } } #endregion #region Accept /// <summary> /// 從客戶端開始接受一個連線操作 /// </summary> private void StartAccept(SocketAsyncEventArgs asyniar) { if (asyniar == null) { asyniar = new SocketAsyncEventArgs(); asyniar.Completed += new EventHandler<SocketAsyncEventArgs>(OnAcceptCompleted); } else { //socket must be cleared since the context object is being reused asyniar.AcceptSocket = null; } _maxAcceptedClients.WaitOne(); if (!_serverSock.AcceptAsync(asyniar)) { ProcessAccept(asyniar); //如果I/O掛起等待非同步則觸發AcceptAsyn_Asyn_Completed事件 //此時I/O操作同步完成,不會觸發Asyn_Completed事件,所以指定BeginAccept()方法 } } /// <summary> /// accept 操作完成時回撥函式 /// </summary> /// <param name="sender">Object who raised the event.</param> /// <param name="e">SocketAsyncEventArg associated with the completed accept operation.</param> private void OnAcceptCompleted(object sender, SocketAsyncEventArgs e) { ProcessAccept(e); } /// <summary> /// 監聽Socket接受處理 /// </summary> /// <param name="e">SocketAsyncEventArg associated with the completed accept operation.</param> private void ProcessAccept(SocketAsyncEventArgs e) { if (e.SocketError == SocketError.Success) { Socket s = e.AcceptSocket;//和客戶端關聯的socket if (s.Connected) { try { Interlocked.Increment(ref _clientCount);//原子操作加1 SocketAsyncEventArgs asyniar = _objectPool.Pop(); asyniar.UserToken = s; Log4Debug(String.Format("客戶 {0} 連入, 共有 {1} 個連線。", s.RemoteEndPoint.ToString(), _clientCount)); if (!s.ReceiveAsync(asyniar))//投遞接收請求 { ProcessReceive(asyniar); } } catch (SocketException ex) { Log4Debug(String.Format("接收客戶 {0} 資料出錯, 異常資訊: {1} 。", s.RemoteEndPoint, ex.ToString())); //TODO 異常處理 } //投遞下一個接受請求 StartAccept(e); } } } #endregion #region 傳送資料 /// <summary> /// 非同步的傳送資料 /// </summary> /// <param name="e"></param> /// <param name="data"></param> public void Send(SocketAsyncEventArgs e, byte[] data) { if (e.SocketError == SocketError.Success) { Socket s = e.AcceptSocket;//和客戶端關聯的socket if (s.Connected) { Array.Copy(data, 0, e.Buffer, 0, data.Length);//設定傳送資料 //e.SetBuffer(data, 0, data.Length); //設定傳送資料 if (!s.SendAsync(e))//投遞傳送請求,這個函式有可能同步傳送出去,這時返回false,並且不會引發SocketAsyncEventArgs.Completed事件 { // 同步傳送時處理髮送完成事件 ProcessSend(e); } else { CloseClientSocket(e); } } } } /// <summary> /// 同步的使用socket傳送資料 /// </summary> /// <param name="socket"></param> /// <param name="buffer"></param> /// <param name="offset"></param> /// <param name="size"></param> /// <param name="timeout"></param> public void Send(Socket socket, byte[] buffer, int offset, int size, int timeout) { socket.SendTimeout = 0; int startTickCount = Environment.TickCount; int sent = 0; // how many bytes is already sent do { if (Environment.TickCount > startTickCount + timeout) { //throw new Exception("Timeout."); } try { sent += socket.Send(buffer, offset + sent, size - sent, SocketFlags.None); } catch (SocketException ex) { if (ex.SocketErrorCode == SocketError.WouldBlock || ex.SocketErrorCode == SocketError.IOPending || ex.SocketErrorCode == SocketError.NoBufferSpaceAvailable) { // socket buffer is probably full, wait and try again Thread.Sleep(30); } else { throw ex; // any serious error occurr } } } while (sent < size); } /// <summary> /// 傳送完成時處理函式 /// </summary> /// <param name="e">與傳送完成操作相關聯的SocketAsyncEventArg物件</param> private void ProcessSend(SocketAsyncEventArgs e) { if (e.SocketError == SocketError.Success) { Socket s = (Socket)e.UserToken; //TODO } else { CloseClientSocket(e); } } #endregion #region 接收資料 /// <summary> ///接收完成時處理函式 /// </summary> /// <param name="e">與接收完成操作相關聯的SocketAsyncEventArg物件</param> private void ProcessReceive(SocketAsyncEventArgs e) { if (e.SocketError == SocketError.Success)//if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success) { // 檢查遠端主機是否關閉連線 if (e.BytesTransferred > 0) { Socket s = (Socket)e.UserToken; //判斷所有需接收的資料是否已經完成 if (s.Available == 0) { //從偵聽者獲取接收到的訊息。 //String received = Encoding.ASCII.GetString(e.Buffer, e.Offset, e.BytesTransferred); //echo the data received back to the client //e.SetBuffer(e.Offset, e.BytesTransferred); byte[] data = new byte[e.BytesTransferred]; Array.Copy(e.Buffer, e.Offset, data, 0, data.Length);//從e.Buffer塊中複製資料出來,保證它可重用 string info=Encoding.Default.GetString(data); Log4Debug(String.Format("收到 {0} 資料為 {1}",s.RemoteEndPoint.ToString(),info)); //TODO 處理資料 //增加伺服器接收的總位元組數。 } if (!s.ReceiveAsync(e))//為接收下一段資料,投遞接收請求,這個函式有可能同步完成,這時返回false,並且不會引發SocketAsyncEventArgs.Completed事件 { //同步接收時處理接收完成事件 ProcessReceive(e); } } } else { CloseClientSocket(e); } } #endregion #region 回撥函式 /// <summary> /// 當Socket上的傳送或接收請求被完成時,呼叫此函式 /// </summary> /// <param name="sender">激發事件的物件</param> /// <param name="e">與傳送或接收完成操作相關聯的SocketAsyncEventArg物件</param> private void OnIOCompleted(object sender, SocketAsyncEventArgs e) { // Determine which type of operation just completed and call the associated handler. switch (e.LastOperation) { case SocketAsyncOperation.Accept: ProcessAccept(e); break; case SocketAsyncOperation.Receive: ProcessReceive(e); break; default: throw new ArgumentException("The last operation completed on the socket was not a receive or send"); } } #endregion #region Close /// <summary> /// 關閉socket連線 /// </summary> /// <param name="e">SocketAsyncEventArg associated with the completed send/receive operation.</param> private void CloseClientSocket(SocketAsyncEventArgs e) { Log4Debug(String.Format("客戶 {0} 斷開連線!",((Socket)e.UserToken).RemoteEndPoint.ToString())); Socket s = e.UserToken as Socket; CloseClientSocket(s, e); } /// <summary> /// 關閉socket連線 /// </summary> /// <param name="s"></param> /// <param name="e"></param> private void CloseClientSocket(Socket s, SocketAsyncEventArgs e) { try { s.Shutdown(SocketShutdown.Send); } catch (Exception) { // Throw if client has closed, so it is not necessary to catch. } finally { s.Close(); } Interlocked.Decrement(ref _clientCount); _maxAcceptedClients.Release(); _objectPool.Push(e);//SocketAsyncEventArg 物件被釋放,壓入可重用佇列。 } #endregion #region Dispose /// <summary> /// Performs application-defined tasks associated with freeing, /// releasing, or resetting unmanaged resources. /// </summary> public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } /// <summary> /// Releases unmanaged and - optionally - managed resources /// </summary> /// <param name="disposing"><c>true</c> to release /// both managed and unmanaged resources; <c>false</c> /// to release only unmanaged resources.</param> protected virtual void Dispose(bool disposing) { if (!this.disposed) { if (disposing) { try { Stop(); if (_serverSock != null) { _serverSock = null; } } catch (SocketException ex) { //TODO 事件 } } disposed = true; } } #endregion public void Log4Debug(string msg) { Console.WriteLine("notice:"+msg); } } }
BufferManager.cs 這個類是快取管理類,是採用MSDN上面的例子一樣的 地址: https://msdn.microsoft.com/zh-cn/library/bb517542.aspx