Socket探索1-兩種Socket服務端實現
介紹
一次簡單的Socket探索之旅,分別對Socket服務端的兩種方式進行了測試和解析。
CommonSocket
程式碼實現
實現一個簡單的Socket服務,基本功能就是接收訊息然後加上結束訊息時間返回給客戶端。
/// <summary> /// 簡單服務,收發訊息 /// </summary> class FirstSimpleServer { public static void Run(string m_ip, int m_port) { var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); var ip = IPAddress.Parse(m_ip); var endpoint = new IPEndPoint(ip, m_port); socket.Bind(endpoint); socket.Listen(0); socket.ReceiveTimeout = -1; Task.Run(() => { while (true) { var acceptSocket = socket.Accept(); if (acceptSocket != null && acceptSocket.Connected) { Task.Run(() => { byte[] receiveBuffer = new byte[256]; int result = 0; do { if (acceptSocket.Connected) { result = acceptSocket.Receive(receiveBuffer, 0, receiveBuffer.Length, SocketFlags.None, out SocketError error); if (error == SocketError.Success && result > 0) { var recestr = Encoding.UTF8.GetString(receiveBuffer, 0, result); var Replaystr = $"Server收到訊息:{recestr};Server收到訊息的時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff")}"; Console.WriteLine(Replaystr); var strbytes = Encoding.UTF8.GetBytes(Replaystr); acceptSocket.Send(strbytes, 0, strbytes.Length, SocketFlags.None); if (recestr.Contains("stop")) { break; } } } else { break; } } while (result > 0); }).ContinueWith((t) => { System.Threading.Thread.Sleep(1000); acceptSocket.Disconnect(false); acceptSocket.Dispose(); }); } } }).Wait(); }
簡單測試
測試:一個客戶端,傳送10次資料,每次間隔50ms,
結果:客戶端的顯示如下,客戶端傳送訊息,再接收到,十次中最長的耗時10ms。
ClientReceiceServer:{Server收到訊息:{Client:1:MessageID:0;Client傳送時間:2020-04-11 13:21:22:974};Server收到訊息的時間:2020-04-11 13:21:22:981;ClientReceiceServer時間:2020-04-11 13:21:22:984} ClientReceiceServer:{Server收到訊息:{Client:1:MessageID:1;Client傳送時間:2020-04-11 13:21:23:032};Server收到訊息的時間:2020-04-11 13:21:23:032;ClientReceiceServer時間:2020-04-11 13:21:23:032} ClientReceiceServer:{Server收到訊息:{Client:1:MessageID:2;Client傳送時間:2020-04-11 13:21:23:082};Server收到訊息的時間:2020-04-11 13:21:23:082;ClientReceiceServer時間:2020-04-11 13:21:23:083} ClientReceiceServer:{Server收到訊息:{Client:1:MessageID:3;Client傳送時間:2020-04-11 13:21:23:133};Server收到訊息的時間:2020-04-11 13:21:23:133;ClientReceiceServer時間:2020-04-11 13:21:23:133} ClientReceiceServer:{Server收到訊息:{Client:1:MessageID:4;Client傳送時間:2020-04-11 13:21:23:184};Server收到訊息的時間:2020-04-11 13:21:23:184;ClientReceiceServer時間:2020-04-11 13:21:23:190} ClientReceiceServer:{Server收到訊息:{Client:1:MessageID:5;Client傳送時間:2020-04-11 13:21:23:235};Server收到訊息的時間:2020-04-11 13:21:23:235;ClientReceiceServer時間:2020-04-11 13:21:23:235} ClientReceiceServer:{Server收到訊息:{Client:1:MessageID:6;Client傳送時間:2020-04-11 13:21:23:286};Server收到訊息的時間:2020-04-11 13:21:23:286;ClientReceiceServer時間:2020-04-11 13:21:23:286} ClientReceiceServer:{Server收到訊息:{Client:1:MessageID:7;Client傳送時間:2020-04-11 13:21:23:336};Server收到訊息的時間:2020-04-11 13:21:23:336;ClientReceiceServer時間:2020-04-11 13:21:23:336} ClientReceiceServer:{Server收到訊息:{Client:1:MessageID:8;Client傳送時間:2020-04-11 13:21:23:387};Server收到訊息的時間:2020-04-11 13:21:23:387;ClientReceiceServer時間:2020-04-11 13:21:23:388} ClientReceiceServer:{Server收到訊息:{Client:1:MessageID:9;Client傳送時間:2020-04-11 13:21:23:438};Server收到訊息的時間:2020-04-11 13:21:23:438;ClientReceiceServer時間:2020-04-11 13:21:23:438}
假如客戶端傳送訊息速度加快,對服務端會有什麼影響?測試將客戶端傳送訊息的間隔修改為1ms
System.Threading.Thread.Sleep(1);
結果如下,並沒有發現問題。
ClientReceiceServer:{Server收到訊息:{Client:1:MessageID:0;Client傳送時間:2020-04-11 13:48:57:193};Server收到訊息的時間:2020-04-11 13:48:57:196;ClientReceiceServer時間:2020-04-11 13:48:57:197} ClientReceiceServer:{Server收到訊息:{Client:1:MessageID:1;Client傳送時間:2020-04-11 13:48:57:198};Server收到訊息的時間:2020-04-11 13:48:57:198;ClientReceiceServer時間:2020-04-11 13:48:57:201} ClientReceiceServer:{Server收到訊息:{Client:1:MessageID:2;Client傳送時間:2020-04-11 13:48:57:200};Server收到訊息的時間:2020-04-11 13:48:57:201;ClientReceiceServer時間:2020-04-11 13:48:57:202} ClientReceiceServer:{Server收到訊息:{Client:1:MessageID:3;Client傳送時間:2020-04-11 13:48:57:202};Server收到訊息的時間:2020-04-11 13:48:57:202;ClientReceiceServer時間:2020-04-11 13:48:57:203} ClientReceiceServer:{Server收到訊息:{Client:1:MessageID:4;Client傳送時間:2020-04-11 13:48:57:204};Server收到訊息的時間:2020-04-11 13:48:57:204;ClientReceiceServer時間:2020-04-11 13:48:57:204} ClientReceiceServer:{Server收到訊息:{Client:1:MessageID:5;Client傳送時間:2020-04-11 13:48:57:206};Server收到訊息的時間:2020-04-11 13:48:57:206;ClientReceiceServer時間:2020-04-11 13:48:57:207} ClientReceiceServer:{Server收到訊息:{Client:1:MessageID:6;Client傳送時間:2020-04-11 13:48:57:208};Server收到訊息的時間:2020-04-11 13:48:57:208;ClientReceiceServer時間:2020-04-11 13:48:57:208} ClientReceiceServer:{Server收到訊息:{Client:1:MessageID:7;Client傳送時間:2020-04-11 13:48:57:209};Server收到訊息的時間:2020-04-11 13:48:57:209;ClientReceiceServer時間:2020-04-11 13:48:57:211} ClientReceiceServer:{Server收到訊息:{Client:1:MessageID:8;Client傳送時間:2020-04-11 13:48:57:211};Server收到訊息的時間:2020-04-11 13:48:57:211;ClientReceiceServer時間:2020-04-11 13:48:57:212} ClientReceiceServer:{Server收到訊息:{Client:1:MessageID:9;Client傳送時間:2020-04-11 13:48:57:213};Server收到訊息的時間:2020-04-11 13:48:57:213;ClientReceiceServer時間:2020-04-11 13:48:57:213}
再極致一點,將客戶端的傳送間隔取消,迴圈傳送。看到下面服務端接收訊息的結果,可看到訊息包很混亂。仔細分析一下,發現其實伺服器其實就收到3次訊息,前兩次接受256個位元組,最後一次接收138位元組。這是由於設定服務端接收快取的大小為256個位元組。說明發送比較快或並行傳送的時候,服務端會很快將接收的快取塊填滿,一旦填滿,Receive方法就會返回,不然就處於阻塞狀態。
Server收到訊息:{Client:1:MessageID:1;Client傳送時間:2020-04-11 13:51:18:723}{Client:1:MessageID:2;Client傳送時間:2020-04-11 13:51:18:724}{Client:1:MessageID:3;Client傳送時間:2020-04-11 13:51:18:724}{Client:1:MessageID:4;Client傳送時間:2020-04-11 13:51:18:;Server收到訊息的時間:2020-04-11 13:51:18:724
Server收到訊息:724}{Client:1:MessageID:5;Client傳送時間:2020-04-11 13:51:18:724}{Client:1:MessageID:6;Client傳送時間:2020-04-11 13:51:18:724}{Client:1:MessageID:7;Client傳送時間:2020-04-11 13:51:18:724}{Client:1:MessageID:8;Client傳送時間:2020-04-11 13:51;Server收到訊息的時間:2020-04-11 13:51:18:729
Server收到訊息::18:724}{Client:1:MessageID:9;Client傳送時間:2020-04-11 13:51:18:724};Server收到訊息的時間:2020-04-11 13:51:18:732
通過一個簡單的方法解決這個問題,每次客戶端傳送固定長度的訊息,服務端接收固定長度的訊息。現在客戶端傳送的訊息是65個位元組,設定服務端接收資料的快取塊為65位元組。
{Client:1:MessageID:1;Client傳送時間:2020-04-11 13:51:18:723}
byte[] receiveBuffer = new byte[65];
再連續傳送10條訊息,下面為服務端測試的結果,結果顯示正常:
Server收到訊息:{Client:1:MessageID:0;Client傳送時間:2020-04-11 14:19:44:774};Server收到訊息的時間:2020-04-11 14:19:44:778
Server收到訊息:{Client:1:MessageID:1;Client傳送時間:2020-04-11 14:19:44:776};Server收到訊息的時間:2020-04-11 14:19:44:781
Server收到訊息:{Client:1:MessageID:2;Client傳送時間:2020-04-11 14:19:44:776};Server收到訊息的時間:2020-04-11 14:19:44:781
Server收到訊息:{Client:1:MessageID:3;Client傳送時間:2020-04-11 14:19:44:776};Server收到訊息的時間:2020-04-11 14:19:44:782
Server收到訊息:{Client:1:MessageID:4;Client傳送時間:2020-04-11 14:19:44:776};Server收到訊息的時間:2020-04-11 14:19:44:782
Server收到訊息:{Client:1:MessageID:5;Client傳送時間:2020-04-11 14:19:44:776};Server收到訊息的時間:2020-04-11 14:19:44:782
Server收到訊息:{Client:1:MessageID:6;Client傳送時間:2020-04-11 14:19:44:776};Server收到訊息的時間:2020-04-11 14:19:44:783
Server收到訊息:{Client:1:MessageID:7;Client傳送時間:2020-04-11 14:19:44:776};Server收到訊息的時間:2020-04-11 14:19:44:783
Server收到訊息:{Client:1:MessageID:8;Client傳送時間:2020-04-11 14:19:44:776};Server收到訊息的時間:2020-04-11 14:19:44:784
Server收到訊息:{Client:1:MessageID:9;Client傳送時間:2020-04-11 14:19:44:776};Server收到訊息的時間:2020-04-11 14:19:44:784
併發訊息測試
如果並行傳送訊息,同時有兩個訊息到服務端,訊息內容會混亂嗎?客戶端進行並行訊息傳送測試。下面為測試結果,發現並沒有問題,說明一個訊息可能沒有被拆分,或則即使被拆分了在網路通訊底層也會恢復原來的訊息結構。
Parallel.For(1, 10, (i) =>
{
var Replaystr =
$"{{Client:1:MessageID:{i};Client傳送時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff")}}}";
var strbytes = Encoding.UTF8.GetBytes(Replaystr);
socket.Send(strbytes, 0, strbytes.Length, SocketFlags.None);
});
Server收到訊息:{Client:1:MessageID:2;Client傳送時間:2020-04-11 17:11:44:568};Server收到訊息的時間:2020-04-11 17:11:44:572
Server收到訊息:{Client:1:MessageID:1;Client傳送時間:2020-04-11 17:11:44:568};Server收到訊息的時間:2020-04-11 17:11:44:575
Server收到訊息:{Client:1:MessageID:4;Client傳送時間:2020-04-11 17:11:44:572};Server收到訊息的時間:2020-04-11 17:11:44:576
Server收到訊息:{Client:1:MessageID:5;Client傳送時間:2020-04-11 17:11:44:572};Server收到訊息的時間:2020-04-11 17:11:44:576
Server收到訊息:{Client:1:MessageID:6;Client傳送時間:2020-04-11 17:11:44:572};Server收到訊息的時間:2020-04-11 17:11:44:576
Server收到訊息:{Client:1:MessageID:7;Client傳送時間:2020-04-11 17:11:44:572};Server收到訊息的時間:2020-04-11 17:11:44:576
Server收到訊息:{Client:1:MessageID:8;Client傳送時間:2020-04-11 17:11:44:572};Server收到訊息的時間:2020-04-11 17:11:44:577
Server收到訊息:{Client:1:MessageID:9;Client傳送時間:2020-04-11 17:11:44:572};Server收到訊息的時間:2020-04-11 17:11:44:577
Server收到訊息:{Client:1:MessageID:3;Client傳送時間:2020-04-11 17:11:44:571};Server收到訊息的時間:2020-04-11 17:11:44:577
併發客戶端測試
再進一步測試,假設有多個客戶端同時連線,並行傳送訊息。
Parallel.For(0, 9, (Clienti) =>
{
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
var ip = IPAddress.Parse(m_ip);
var endpoint = new IPEndPoint(ip, m_port);
socket.ReceiveTimeout = -1;
Task.Run(() =>
{
socket.Connect(endpoint);
...
...
}
});
結果:這個測試是放在虛擬機器中,使用的是NAT網路模式,同一個子網內客戶端從發出訊息接收服務端返回訊息最長耗時有6秒,還是比較誇張的。
服務端結果:
客戶端結果:
總結
這個Socket服務在少量客戶端連線的時候好像沒什麼問題,它能抗住大量客戶端的連線併發測試嗎?我想答案肯定是否定的,為什麼呢?因為每個客戶端連線都需要消耗1個執行緒,執行緒是很昂貴的資源,每個執行緒自生就要消耗1M記憶體,100客戶端連線什麼都不做就消耗了100M,更不用說執行緒之間的上下文切換需要消耗更寶貴的CPU資源,所以這個服務端根本應對不了大量客戶端的連線。
那麼最理想的Socket服務端是什麼樣子的呢?在我看來就是隻有與CPU核數相同的執行緒量在執行,如果4核那麼就4個執行緒在執行,然後每個執行緒處理超級多的客戶端,最好沒有阻塞,不休息。怎樣才能實現這個目標呢?微軟給了一個簡單的例子,已經極大程度的實現了這個想法,一起來看看吧
SocketAsyncEventArgs
MSDN中SocketAsyncEventArgs
程式碼實現
我仿照微軟提供的這個例項擼了個簡單的Socket服務端
public class SocketArgsServer
{
private static int m_numConnections;
private static int m_receiveBufferSize;
private static int m_sendBufferSize;
private static byte[] m_receivebuffer;
private static Stack<int> m_freeReceiveIndexPool;
private static int m_currentReceiveIndex;
private static byte[] m_sendbuffer;
private static Stack<int> m_freeSendIndexPool;
private static int m_currentSendIndex;
private static Stack<SocketAsyncEventArgs> m_ReadPool;
private static Stack<SocketAsyncEventArgs> m_WritePool;
private static Semaphore m_maxNumberAcceptedClients;
private static int m_numConnectedSockets;
private static int m_totalBytesRead;
private static Socket listenSocket;
public static void Run(string m_ip, int m_port, int numConnections, int m_receiveBuffer, int m_sentBuffer)
{
m_numConnections = numConnections;
m_receiveBufferSize = m_receiveBuffer;
m_sendBufferSize = m_sentBuffer;
m_receivebuffer = new byte[m_receiveBufferSize * m_numConnections];
m_freeReceiveIndexPool = new Stack<int>();
m_currentReceiveIndex = 0;
m_sendbuffer = new byte[m_sendBufferSize * m_numConnections];
m_freeSendIndexPool = new Stack<int>();
m_currentSendIndex = 0;
m_ReadPool = new Stack<SocketAsyncEventArgs>(m_numConnections);
m_WritePool = new Stack<SocketAsyncEventArgs>(m_numConnections);
m_maxNumberAcceptedClients = new Semaphore(m_numConnections, m_numConnections);
m_numConnectedSockets = 0;
m_totalBytesRead = 0;
for (int i = 0; i < m_numConnections; i++)
{
var readEventArg = new SocketAsyncEventArgs();
readEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(ReadWriteIOComleted);
readEventArg.UserToken = new AsyncUserToken();
if (m_freeReceiveIndexPool.Count > 0)
{
readEventArg.SetBuffer(m_receivebuffer, m_freeReceiveIndexPool.Pop(), m_receiveBufferSize);
}
else
{
if ((m_receiveBufferSize * m_numConnections - m_receiveBufferSize) < m_currentReceiveIndex)
{
new ArgumentException("接收快取設定異常");
}
readEventArg.SetBuffer(m_receivebuffer, m_currentReceiveIndex, m_receiveBufferSize);
m_currentReceiveIndex += m_receiveBufferSize;
}
m_ReadPool.Push(readEventArg);
var writeEventArg = new SocketAsyncEventArgs();
writeEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(ReadWriteIOComleted);
writeEventArg.UserToken = new AsyncUserToken();
if (m_freeSendIndexPool.Count > 0)
{
writeEventArg.SetBuffer(m_sendbuffer, m_freeSendIndexPool.Pop(), m_sendBufferSize);
}
else
{
if ((m_sendBufferSize * m_numConnections - m_sendBufferSize) < m_currentSendIndex)
{
new ArgumentException("傳送快取設定異常");
}
writeEventArg.SetBuffer(m_sendbuffer, m_currentSendIndex, m_sendBufferSize);
m_currentSendIndex += m_sendBufferSize;
}
m_WritePool.Push(writeEventArg);
}
listenSocket = new Socket(new IPEndPoint(IPAddress.Parse(m_ip), m_port).AddressFamily, SocketType.Stream, ProtocolType.Tcp);
listenSocket.Bind(new IPEndPoint(IPAddress.Parse(m_ip), m_port));
listenSocket.Listen(100);
StartAccept(null);
Console.WriteLine("Press any key to terminate the server process....");
Console.ReadKey();
}
public static void ReadWriteIOComleted(object sender, SocketAsyncEventArgs e)
{
switch (e.LastOperation)
{
case SocketAsyncOperation.Receive:
ProcessReceive(e);
break;
case SocketAsyncOperation.Send:
ProcessSend(e);
break;
default:
throw new ArgumentException("The last operation completed on the socket was not a receive or send");
}
}
public static void ProcessSend(SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.Success)
{
AsyncUserToken token = (AsyncUserToken)e.UserToken;
bool willRaiseEvent = token.Socket.ReceiveAsync(token.readEventArgs);
if (!willRaiseEvent)
{
ProcessReceive(token.readEventArgs);
}
}
else
{
CloseClientSocket(e);
}
}
public static void CloseClientSocket(SocketAsyncEventArgs e)
{
AsyncUserToken token = e.UserToken as AsyncUserToken;
try
{
token.Socket.Shutdown(SocketShutdown.Send);
}
catch (Exception exception)
{
Console.WriteLine(exception);
}
token.Socket.Close();
Interlocked.Decrement(ref m_numConnectedSockets);
m_ReadPool.Push(token.readEventArgs);
m_WritePool.Push(token.writeEventArgs);
token.Socket = null;
token.readEventArgs = null;
token.writeEventArgs = null;
m_maxNumberAcceptedClients.Release();
}
public static void ProcessReceive(SocketAsyncEventArgs e)
{
AsyncUserToken token = (AsyncUserToken)e.UserToken;
if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
{
Interlocked.Add(ref m_totalBytesRead, e.BytesTransferred);
byte[] data = new byte[e.BytesTransferred];
Array.Copy(e.Buffer, e.Offset, data, 0, e.BytesTransferred);
var recestr = Encoding.UTF8.GetString(data);
var Replaystr =
$"Server收到訊息:{recestr};Server收到訊息的時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff")}";
Console.WriteLine(Replaystr);
var strbytes = Encoding.UTF8.GetBytes(Replaystr);
Array.Copy(strbytes, 0, token.writeEventArgs.Buffer, token.writeEventArgs.Offset,
strbytes.Length);
bool willRaiseEvent = token.Socket.SendAsync(token.writeEventArgs);
if (!willRaiseEvent)
{
ProcessSend(token.writeEventArgs);
}
}
else
{
CloseClientSocket(e);
}
}
public static void ProcessAccept(SocketAsyncEventArgs e)
{
Interlocked.Increment(ref m_numConnectedSockets);
SocketAsyncEventArgs readEventArgs = m_ReadPool.Pop();
SocketAsyncEventArgs writeEventArgs = m_WritePool.Pop();
((AsyncUserToken)readEventArgs.UserToken).Socket = e.AcceptSocket;
((AsyncUserToken)readEventArgs.UserToken).readEventArgs = readEventArgs;
((AsyncUserToken)readEventArgs.UserToken).writeEventArgs = writeEventArgs;
((AsyncUserToken)writeEventArgs.UserToken).Socket = e.AcceptSocket;
((AsyncUserToken)writeEventArgs.UserToken).readEventArgs = readEventArgs;
((AsyncUserToken)writeEventArgs.UserToken).writeEventArgs = writeEventArgs;
bool willRaiseEvent = e.AcceptSocket.ReceiveAsync(readEventArgs);
if (!willRaiseEvent)
{
ProcessReceive(readEventArgs);
}
StartAccept(e);
}
public static void StartAccept(SocketAsyncEventArgs listenEventArg)
{
if (listenEventArg == null)
{
listenEventArg = new SocketAsyncEventArgs();
listenEventArg.Completed += new EventHandler<SocketAsyncEventArgs>((sender, e) => ProcessAccept(e));
}
else
{
listenEventArg.AcceptSocket = null;
}
m_maxNumberAcceptedClients.WaitOne();
bool willRaiseEvent = listenSocket.AcceptAsync(listenEventArg);
if (!willRaiseEvent)
{
ProcessAccept(listenEventArg);
}
}
}
class AsyncUserToken
{
public Socket Socket { get; set; }
public SocketAsyncEventArgs readEventArgs { set; get; }
public SocketAsyncEventArgs writeEventArgs { set; get; }
}
併發測試
先直接上測試結果,該測試環境還是在虛擬機器中,忽略一下服務端收到訊息時間,因為虛擬機器時間和主機時間不是同步的。可以看到服務端傳送訊息到接收到訊息最長耗時2s。
總結
這個Socket服務端直接丟棄了執行緒的概念,通過SocketAsyncEventArgs來實現了之前執行緒實現的所有功能。一個SocketAsyncEventArgs來監測連線,客戶端連線的時候從SocketAsyncEventArgsPool中分配兩個SocketAsyncEventArgs分別負責讀寫訊息。讀寫訊息的快取塊也進行了統一管理,共同組成一個大的快取塊進行重複使用。當客戶端失去連線的時候將分配的讀寫SocketAsyncEventArgs返還給SocketAsyncEventArgsPool進行重複使用。
最後
在本文中探索了兩種socket服務端的實現,並對這兩種socket服務端進行了簡單的剖析,我看了SuperSocket的底層實現思想採用的是第二種方式。目前這種方式的弊端我還沒想到,歡迎大家一起探討