C#SocketAsyncEventArgs實現高效能多併發TCPSocket通訊 (客戶端實現)
阿新 • • 發佈:2019-02-16
http://freshflower.iteye.com/blog/2285286
上一篇講了伺服器端的實現, 這一篇就是客戶端的實現.
與伺服器不同的是客戶端的實現需要多個SocketAsyncEventArgs共同協作,至少需要兩個:接收的只需要一個,傳送的需要一個,也可以多個,這在多執行緒中尤為重要,接下來說明。
客戶端一般需要資料的時候,就要發起請求,在多執行緒環境中,請求伺服器一般不希望列隊等候,這樣會大大拖慢程式的處理。如果傳送資料包的SocketAsyncEventArgs只有一個,且當他正在工作的時候, 下一個請求也來訪問,這時會丟擲異常, 提示當前的套接字正在工作, 所以這不是我們願意看到, 唯有增加
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Net.Sockets;
- using System.Text;
- namespace Plates.Client.Net
- {
- class MySocketEventArgs : SocketAsyncEventArgs
- {
- /// <summary>
- /// 標識,只是一個編號而已
- /// </summary>
- publicint ArgsTag { get; set; }
- /// <summary>
- /// 設定/獲取使用狀態
- /// </summary>
- publicbool IsUsing { get; set; }
- }
- }
接下來,我們還需要BufferManager類,這個類已經在服務端貼出來了,與服務端是一樣的, 再貼一次:
C#程式碼- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Net.Sockets;
- using System.Text;
- using System.Threading.Tasks;
- namespace Plates.Client.Net
- {
- class BufferManager
- {
- int m_numBytes; // the total number of bytes controlled by the buffer pool
- byte[] m_buffer; // the underlying byte array maintained by the Buffer Manager
- Stack<int> m_freeIndexPool; //
- int m_currentIndex;
- int m_bufferSize;
- public BufferManager(int totalBytes, int bufferSize)
- {
- m_numBytes = totalBytes;
- m_currentIndex = 0;
- m_bufferSize = bufferSize;
- m_freeIndexPool = new Stack<int>();
- }
- // Allocates buffer space used by the buffer pool
- publicvoid InitBuffer()
- {
- // create one big large buffer and divide that
- // out to each SocketAsyncEventArg object
- m_buffer = newbyte[m_numBytes];
- }
- // Assigns a buffer from the buffer pool to the
- // specified SocketAsyncEventArgs object
- //
- // <returns>true if the buffer was successfully set, else false</returns>
- publicbool SetBuffer(SocketAsyncEventArgs args)
- {
- if (m_freeIndexPool.Count > 0)
- {
- args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize);
- }
- else
- {
- if ((m_numBytes - m_bufferSize) < m_currentIndex)
- {
- returnfalse;
- }
- args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize);
- m_currentIndex += m_bufferSize;
- }
- returntrue;
- }
- // Removes the buffer from a SocketAsyncEventArg object.
- // This frees the buffer back to the buffer pool
- publicvoid FreeBuffer(SocketAsyncEventArgs args)
- {
- m_freeIndexPool.Push(args.Offset);
- args.SetBuffer(null, 0, 0);
- }
- }
- }
接下來是重點實現了,別的不多說,看程式碼:
C#程式碼- using System;
- using System.Collections;
- using System.Collections.Generic;
- using System.Linq;
- using System.Net;
- using System.Net.Sockets;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- namespace Plates.Client.Net
- {
- class SocketManager: IDisposable
- {
- privateconst Int32 BuffSize = 1024;
- // The socket used to send/receive messages.
- private Socket clientSocket;
- // Flag for connected socket.
- private Boolean connected = false;
- // Listener endpoint.
- private IPEndPoint hostEndPoint;
- // Signals a connection.
- privatestatic AutoResetEvent autoConnectEvent = new AutoResetEvent(false);
- BufferManager m_bufferManager;
- //定義接收資料的物件
- List<byte> m_buffer;
- //傳送與接收的MySocketEventArgs變數定義.
- private List<MySocketEventArgs> listArgs = new List<MySocketEventArgs>();
- private MySocketEventArgs receiveEventArgs = new MySocketEventArgs();
- int tagCount = 0;
- /// <summary>
- /// 當前連線狀態
- /// </summary>
- publicbool Connected { get { return clientSocket != null && clientSocket.Connected; } }
- //伺服器主動發出資料受理委託及事件
- publicdelegatevoid OnServerDataReceived(byte[] receiveBuff);
- publicevent OnServerDataReceived ServerDataHandler;
- //伺服器主動關閉連線委託及事件
- publicdelegatevoid OnServerStop();
- publicevent OnServerStop ServerStopEvent;
- // Create an uninitialized client instance.
- // To start the send/receive processing call the
- // Connect method followed by SendReceive method.
- internal SocketManager(String ip, Int32 port)
- {
- // Instantiates the endpoint and socket.
- hostEndPoint = new IPEndPoint(IPAddress.Parse(ip), port);
- clientSocket = new Socket(hostEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
- m_bufferManager = new BufferManager(BuffSize * 2, BuffSize);
- m_buffer = new List<byte>();
- }
- /// <summary>
- /// 連線到主機
- /// </summary>
- /// <returns>0.連線成功, 其他值失敗,參考SocketError的值列表</returns>
- internal SocketError Connect()
- {
- SocketAsyncEventArgs connectArgs = new SocketAsyncEventArgs();
- connectArgs.UserToken = clientSocket;
- connectArgs.RemoteEndPoint = hostEndPoint;
- connectArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnConnect);
- clientSocket.ConnectAsync(connectArgs);
- autoConnectEvent.WaitOne(); //阻塞. 讓程式在這裡等待,直到連線響應後再返回連線結果
- return connectArgs.SocketError;
- }
- /// Disconnect from the host.
- internalvoid Disconnect()
- {
- clientSocket.Disconnect(false);
- }
- // Calback for connect operation
- privatevoid OnConnect(object sender, SocketAsyncEventArgs e)
- {
- // Signals the end of connection.
- autoConnectEvent.Set(); //釋放阻塞.
- // Set the flag for socket connected.
- connected = (e.SocketError == SocketError.Success);
- //如果連線成功,則初始化socketAsyncEventArgs
- if (connected)
- initArgs(e);
- }
- #region args
- /// <summary>
- /// 初始化收發引數
- /// </summary>
- /// <param name="e"></param>
- privatevoid initArgs(SocketAsyncEventArgs e)
- {
- m_bufferManager.InitBuffer();
- //傳送引數
- initSendArgs();
- //接收引數
- receiveEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
- receiveEventArgs.UserToken = e.UserToken;
- receiveEventArgs.ArgsTag = 0;
- m_bufferManager.SetBuffer(receiveEventArgs);
- //啟動接收,不管有沒有,一定得啟動.否則有資料來了也不知道.
- if (!e.ConnectSocket.ReceiveAsync(receiveEventArgs))
- ProcessReceive(receiveEventArgs);
- }
- /// <summary>
- /// 初始化傳送引數MySocketEventArgs
- /// </summary>
- /// <returns></returns>
- MySocketEventArgs initSendArgs()
- {
- MySocketEventArgs sendArg = new MySocketEventArgs();
- sendArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
- sendArg.UserToken = clientSocket;
- sendArg.RemoteEndPoint = hostEndPoint;
- sendArg.IsUsing = false;
- Interlocked.Increment(ref tagCount);
- sendArg.ArgsTag = tagCount;
- lock (listArgs)
- {
- listArgs.Add(sendArg);
- }
- return sendArg;
- }
- void IO_Completed(object sender, SocketAsyncEventArgs e)
- {
- MySocketEventArgs mys = (MySocketEventArgs)e;
- // determine which type of operation just completed and call the associated handler
- switch (e.LastOperation)
- {
- case SocketAsyncOperation.Receive:
- ProcessReceive(e);
- break;
- case SocketAsyncOperation.Send:
- mys.IsUsing = false; //資料傳送已完成.狀態設為False
- ProcessSend(e);
- break;
- default:
- thrownew ArgumentException("The last operation completed on the socket was not a receive or send");
- }
- }
- // This method is invoked when an asynchronous receive operation completes.
- // If the remote host closed the connection, then the socket is closed.
- // If data was received then the data is echoed back to the client.
- //
- privatevoid ProcessReceive(SocketAsyncEventArgs e)
- {
- try
- {
- // check if the remote host closed the connection
- Socket token = (Socket)e.UserToken;
- if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
- {
- //讀取資料
- byte[] data = newbyte[e.BytesTransferred];
- Array.Copy(e.Buffer, e.Offset, data, 0, e.BytesTransferred);
- lock (m_buffer)
- {
- m_buffer.AddRange(data);
- }
- do
- {
- //注意: 這裡是需要和伺服器有協議的,我做了個簡單的協議,就是一個完整的包是包長(4位元組)+包資料,便於處理,當然你可以定義自己需要的;
- //判斷包的長度,前面4個位元組.
- byte[] lenBytes = m_buffer.GetRange(0, 4).ToArray();
- int packageLen = BitConverter.ToInt32(lenBytes, 0);
- if (packageLen <= m_buffer.Count - 4)
- {
- //包夠長時,則提取出來,交給後面的程式去處理
- byte[] rev = m_buffer.GetRange(4, packageLen).ToArray();
- //從資料池中移除這組資料,為什麼要lock,你懂的
- lock (m_buffer)
- {
- m_buffer.RemoveRange(0, packageLen + 4);
- }
- //將資料包交給前臺去處理
- DoReceiveEvent(rev);
- }
- else
- { //長度不夠,還得繼續接收,需要跳出迴圈
- break;
- }
- } while (m_buffer.Count > 4);
- //注意:你一定會問,這裡為什麼要用do-while迴圈?
- //如果當服務端傳送大資料流的時候,e.BytesTransferred的大小就會比服務端傳送過來的完整包要小,
- //需要分多次接收.所以收到包的時候,先判斷包頭的大小.夠一個完整的包再處理.
- //如果伺服器短時間內傳送多個小資料包時, 這裡可能會一次性把他們全收了.
- //這樣如果沒有一個迴圈來控制,那麼只會處理第一個包,
- //剩下的包全部留在m_buffer中了,只有等下一個資料包過來後,才會放出一個來.
- //繼續接收
- if (!token.ReceiveAsync(e))
- this.ProcessReceive(e);
- }
- else
- {
- ProcessError(e);
- }
- }
- catch (Exception xe)
- {
- Console.WriteLine(xe.Message);
- }
- }
- // This method is invoked when an asynchronous send operation completes.
- // The method issues another receive on the socket to read any additional
- // data sent from the client
- //
- // <param name="e"></param>
- privatevoid ProcessSend(SocketAsyncEventArgs e)
- {
- if (e.SocketError != SocketError.Success)
- {
- ProcessError(e);
- }
- }
- #endregion
- #region read write
- // Close socket in case of failure and throws
- // a SockeException according to the SocketError.
- privatevoid ProcessError(SocketAsyncEventArgs e)
- {
- Socket s = (Socket)e.UserToken;
- if (s.Connected)
- {
- // close the socket associated with the client
- try
- {
- s.Shutdown(SocketShutdown.Both);
- }
- catch (Exception)
- {
- // throws if client process has already closed
- }
- finally
- {
- if (s.Connected)
- {
- s.Close();
- }
- connected = false;
- }
- }
- //這裡一定要記得把事件移走,如果不移走,當斷開伺服器後再次連線上,會造成多次事件觸發.
- foreach (MySocketEventArgs arg in listArgs)
- arg.Completed -= IO_Completed;
- receiveEventArgs.Completed -= IO_Completed;
- if (ServerStopEvent != null)
- ServerStopEvent();
- }
- // Exchange a message with the host.
- internalvoid Send(byte[] sendBuffer)
- {
- if (connected)
- {
- //先對資料進行包裝,就是把包的大小作為頭加入,這必須與伺服器端的協議保持一致,否則造成伺服器無法處理資料.
- byte[] buff = newbyte[sendBuffer.Length + 4];
- Array.Copy(BitConverter.GetBytes(sendBuffer.Length), buff, 4);
- Array.Copy(sendBuffer, 0, buff, 4, sendBuffer.Length);
- //查詢有沒有空閒的傳送MySocketEventArgs,有就直接拿來用,沒有就建立新的.So easy!
- MySocketEventArgs sendArgs = listArgs.Find(a => a.IsUsing == false);
- if (sendArgs == null) {
- sendArgs = initSendArgs();
- }
- lock (sendArgs) //要鎖定,不鎖定讓別的執行緒搶走了就不妙了.
- {
- sendArgs.IsUsing = true;
- sendArgs.SetBuffer(buff, 0, buff.Length);
- }
- clientSocket.SendAsync(sendArgs);
- }
- else
- {
- thrownew SocketException((Int32)SocketError.NotConnected);
- }
- }
- /// <summary>
- /// 使用新程序通知事件回撥
- /// </summary>
- /// <param name="buff"></param>
- privatevoid DoReceiveEvent(byte[] buff)
- {
- if (ServerDataHandler == null) return;
- //ServerDataHandler(buff); //可直接呼叫.
- //但我更喜歡用新的執行緒,這樣不拖延接收新資料.
- Thread thread = new Thread(new ParameterizedThreadStart((obj) =>
- {
- ServerDataHandler((byte[])obj);
- }));
- thread.IsBackground = true;
- thread.Start(buff);
- }
- #endregion
- #region IDisposable Members
- // Disposes the instance of SocketClient.
- publicvoid Dispose()
- {
- autoConnectEvent.Close();
- if (clientSocket.Connected)
- {
- clientSocket.Close();
- }
- }
- #endregion
- }
- }
好了, 怎麼使用, 那是再簡單不過的事了, 當然連線同一個伺服器的同一埠, 這個類你只需要初始化一次就可以了, 不要建立多個, 這樣太浪費資源. 上面是定義了通訊的基礎類, 那麼接下來就是把相關的方法再包裝一下, 做成供前臺方便呼叫的含有靜態方法的類就OK了.
C#程式碼- using Newtonsoft.Json;
- using Plates.Common;
- using Plates.Common.Base;
- using Plates.Common.Beans;
- using RuncomLib.File;
- using RuncomLib.Log;
- using RuncomLib.Text;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Net.Sockets;
- using System.Security.Cryptography;
- using System.Text;
- using System.Threading;
- using System.Timers;
- namespace Plates.Client.Net
- {
- class Request
- {
- //定義,最好定義成靜態的, 因為我們只需要一個就好
- static SocketManager smanager = null;
- static UserInfoModel userInfo = null;
- //定義事件與委託
- publicdelegatevoid ReceiveData(object message);
- publicdelegatevoid ServerClosed();
- publicstaticevent ReceiveData OnReceiveData;
- publicstaticevent ServerClosed OnServerClosed;
- /// <summary>
- /// 心跳定時器
- /// </summary>
- static System.Timers.Timer heartTimer = null;
- /// <summary>
- /// 心跳包
- /// </summary>
- static ApiResponse heartRes = null;
- /// <summary>
- /// 判斷是否已連線
- /// </summary>
- publicstaticbool Connected
- {
- get { return smanager != null && smanager.Connected; }
- }
- /// <summary>
- /// 已登入的使用者資訊
- /// </summary>
- publicstatic UserInfoModel UserInfo
- {
- get { return userInfo; }
- }
- #region 基本方法
- /// <summary>
- /// 連線到伺服器
- /// </summary>
- /// <returns></returns>
- publicstatic SocketError Connect()
- {
- if (Connected) return SocketError.Success;
- //我這裡是讀取配置,
- string ip = Config.ReadConfigString("socket", "server", "");
- int port = Config.ReadConfigInt("socket", "port", 13909);
- if (string.IsNullOrWhiteSpace(ip) || port <= 1000) return SocketError.Fault;
- //建立連線物件, 連線到伺服器
- smanager = new SocketManager(ip, port);
- SocketError error = smanager.Connect();
- if (error == SocketError.Success){
- //連線成功後,就註冊事件. 最好在成功後再註冊.
- smanager.ServerDataHandler += OnReceivedServerData;
- smanager.ServerStopEvent += OnServerStopEvent;
- }
- return error;
- }
- /// <summary>
- /// 斷開連線
- /// </summary>
- publicstaticvoid Disconnect()
- {
- try
- {
- smanager.Disconnect();
- }
- catch (Exception) { }
- }
- /// <summary>
- /// 傳送請求
- /// </summary>
- /// <param name="request"></param>
- /// <returns></returns>
- publicstaticbool Send(ApiResponse request)
- {
- return Send(JsonConvert.SerializeObject(request));
- }
- /// <summary>
- /// 傳送訊息
- /// </summary>
- /// <param name="message">訊息實體</param>
- /// <returns>True.已傳送; False.未傳送</returns>
- publicstaticbool Send(string message)
- {
- if (!Connected) returnfalse;
- byte[] buff = Encoding.UTF8.GetBytes(message);
- //加密,根據自己的需要可以考慮把訊息加密
- //buff = AESEncrypt.Encrypt(buff, m_aesKey);
- smanager.Send(buff);
- returntrue;
- }
- /// <summary>
- /// 傳送位元組流
- /// </summary>
- /// <param name="buff"></param>
- /// <returns></returns>
- staticbool Send(byte[] buff)
- {
- if (!Connected) returnfalse;
- smanager.Send(buff);
- returntrue;
- }
- /// <summary>
- /// 接收訊息
- /// </summary>
- /// <param name="buff"></param>
- privatestaticvoid OnReceivedServerData(byte[] buff)
- {
- //To do something
- //你要處理的程式碼,可以實現把buff轉化成你具體的物件, 再傳給前臺
- if (OnReceiveData != null)
- OnReceiveData(buff);
- }
- /// <summary>
- /// 伺服器已斷開
- /// </summary>
- privatestaticvoid OnServerStopEvent()
- {
- if (OnServerClosed != null)
- OnServerClosed();
- }
- #endregion
- #region 心跳包
- //心跳包也是很重要的,看自己的需要了, 我只定義出來, 你自己找個地方去呼叫吧
- /// <summary>
- /// 開啟心跳
- /// </summary>
- privatestaticvoid StartHeartbeat()
- {
- if (heartTimer == null)
- {
- heartTimer = new System.Timers.Timer();
- heartTimer.Elapsed += TimeElapsed;
- }
- heartTimer.AutoReset = true; //迴圈執行
- heartTimer.Interval = 30 * 1000; //每30秒執行一次
- heartTimer.Enabled = true;
- heartTimer.Start();
- //初始化心跳包
- heartRes = new ApiResponse((int)ApiCode.心跳);
- heartRes.data = new Dictionary<string, object>();
- heartRes.data.Add("beat", Function.Base64Encode(userInfo.nickname + userInfo.userid + DateTime.Now.ToString("HH:mm:ss")));
- }
- /// <summary>
- /// 定時執行
- /// </summary>
- /// <param name="source"></param>
- /// <param name="e"></param>
- staticvoid TimeElapsed(object source, ElapsedEventArgs e)
- {
- Request.Send(heartRes);
- }
- #endregion
- }
- }
好了, 就這些, 所有的請求都是非同步進行的, 如果你想同步進行, 我也有實現過, 等有空了再貼上來.
如果你還沒有弄懂伺服器端, 請進入: