1. 程式人生 > >corefx 源碼學習:NetworkStream.ReadAsync 是如何從 Socket 異步讀取數據的

corefx 源碼學習:NetworkStream.ReadAsync 是如何從 Socket 異步讀取數據的

並發 override rri this git event sin libraries ali

最近遇到 NetworkStream.ReadAsync 在 Linux 上高並發讀取數據的問題,由此激發了閱讀 corefx 中 System.Net.Sockets 實現源碼(基於 corefx 2.2)的興趣。

這篇隨筆是閱讀 NetworkStream.ReadAsync 相關源碼的簡單筆記,基於在 Linux 上運行的場景。

NetworkStream 繼承自 System.IO.Stream ,System.IO.Stream.ReadAsync 方法簽名是

public Task<int> ReadAsync(byte[] buffer, int offset, int
count);

實際調用的是

public virtual Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)

上面的的方法被 NetworkStream 重寫(override),調用的是 Socket 的 ReceiveAsync 方法

return _streamSocket.ReceiveAsync(
    new Memory<byte>(buffer, offset, size),
    SocketFlags.None,
    fromNetworkStream: 
true, cancellationToken).AsTask();

Socket.ReceiveAsync 的方法簽名

internal ValueTask<int> ReceiveAsync(Memory<byte> buffer, SocketFlags socketFlags, bool fromNetworkStream, CancellationToken cancellationToken)

主要實現代碼

AwaitableSocketAsyncEventArgs saea = LazyInitializer.EnsureInitialized(ref
LazyInitializer.EnsureInitialized(ref _cachedTaskEventArgs).ValueTaskReceive); if (saea.Reserve()) { saea.SetBuffer(buffer); saea.SocketFlags = socketFlags; saea.WrapExceptionsInIOExceptions = fromNetworkStream; var result = saea.ReceiveAsync(this); return result; } else { // We couldn‘t get a cached instance, due to a concurrent receive operation on the socket. // Fall back to wrapping APM. return new ValueTask<int>(ReceiveAsyncApm(buffer, socketFlags)); }

通常情況下都會使用 AwaitableSocketAsyncEventArgs 異步讀取數據,所以我們這裏只從 saea.ReceiveAsync 往下看。

saea.ReceiveAsync 調用的是 Socket.ReceiveAsync(SocketAsyncEventArgs e) 方法,而後者調用的是 SocketAsyncEventArgs.DoOperationReceive(SafeCloseSocket handle)

在 Linux 上 DoOperationReceive 的實現在 SocketAsyncEventArgs.Unix.cs 中,主要代碼如下

internal unsafe SocketError DoOperationReceive(SafeCloseSocket handle)
{
    //...
    if (_bufferList == null)
    {
        errorCode = handle.AsyncContext.ReceiveAsync(_buffer.Slice(_offset, _count), _socketFlags, out bytesReceived, out flags, TransferCompletionCallback);
    }
    else
    {
        errorCode = handle.AsyncContext.ReceiveAsync(_bufferListInternal, _socketFlags, out bytesReceived, out flags, TransferCompletionCallback);
    }

    if (errorCode != SocketError.IOPending)
    {
        CompleteTransferOperation(bytesReceived, null, 0, flags, errorCode);
        FinishOperationSync(errorCode, bytesReceived, flags);
    }

    return errorCode;
}

handle.AsyncContext.ReceiveAsync 對應的 Linux 實現在 SocketAsyncContext.Unix.cs 中,調用的是 SocketAsyncContext 的 ReceiveFrom 方法,ReceiveFrom 的主要實現代碼如下

public SocketError ReceiveFromAsync(Memory<byte> buffer,  SocketFlags flags, byte[] socketAddress, ref int socketAddressLen, out int bytesReceived, out SocketFlags receivedFlags, Action<int, byte[], int, SocketFlags, SocketError> callback)
{
    SetNonBlocking();

    SocketError errorCode;
    int observedSequenceNumber;
    if (_receiveQueue.IsReady(this, out observedSequenceNumber) &&
        SocketPal.TryCompleteReceiveFrom(_socket, buffer.Span, flags, socketAddress, ref socketAddressLen, out bytesReceived, out receivedFlags, out errorCode))
    {
        return errorCode;
    }

    BufferMemoryReceiveOperation operation = RentBufferMemoryReceiveOperation();
    operation.Callback = callback;
    operation.Buffer = buffer;
    operation.Flags = flags;
    operation.SocketAddress = socketAddress;
    operation.SocketAddressLen = socketAddressLen;

    if (!_receiveQueue.StartAsyncOperation(this, operation, observedSequenceNumber))
    {
        receivedFlags = operation.ReceivedFlags;
        bytesReceived = operation.BytesTransferred;
        errorCode = operation.ErrorCode;

        ReturnOperation(operation);
        return errorCode;
    }

    bytesReceived = 0;
    receivedFlags = SocketFlags.None;
    return SocketError.IOPending;
}

SocketPal.TryCompleteReceiveFrom 的實現代碼在 SocketPal.Unix.cs 中,所調用的另一個 TryCompleteReceiveFrom 方法的簽名是

public static unsafe bool TryCompleteReceiveFrom(SafeCloseSocket socket, Span<byte> buffer, IList<ArraySegment<byte>> buffers, SocketFlags flags, byte[] socketAddress, ref int socketAddressLen, out int bytesReceived, out SocketFlags receivedFlags, out SocketError errorCode)

該方法調用的是 Receive 方法

private static unsafe int Receive(SafeCloseSocket socket, SocketFlags flags, IList<ArraySegment<byte>> buffers, byte[] socketAddress, ref int socketAddressLen, out SocketFlags receivedFlags, out Interop.Error errno)

在 Receive 方法中調用了

errno = Interop.Sys.ReceiveMessage(
    socket.DangerousGetHandle(), 
    &messageHeader,
    flags,
    &received);

Interop.Sys.ReceiveMessage 對應的是 Linux 本地庫中的方法

internal static partial class Sys
{
    [DllImport(Libraries.SystemNative, EntryPoint = "SystemNative_ReceiveMessage")]
    internal static extern unsafe Error ReceiveMessage(IntPtr socket, MessageHeader* messageHeader, SocketFlags flags, long* received);
}

Libraries.SystemNative 對應的是哪個庫呢?

它就是 System.Native.so

$ find /usr/share/dotnet/ -name System.Native.so
/usr/share/dotnet/shared/Microsoft.NETCore.App/2.2.0/System.Native.so

接下來根據 SocketError.IOPending 的情況閱讀源碼。

SocketAsyncEventArgs 在 DoOperationReceive 方法中調用 SocketAsyncContext.ReceiveFrom 方法時(handle.AsyncContext.ReceiveAsync)傳遞了 TransferCompletionCallback 參數值,在異步操作時是通過這個 callback 讀取 socket 數據的,對應的方法是 TransferCompletionCallbackCore 。

private void TransferCompletionCallbackCore(int bytesTransferred, byte[] socketAddress, int socketAddressSize, SocketFlags receivedFlags, SocketError socketError)
{
    CompleteTransferOperation(bytesTransferred, socketAddress, socketAddressSize, receivedFlags, socketError);

    CompletionCallback(bytesTransferred, receivedFlags, socketError);
}

TransferCompletionCallbackCore 中進一步調用 CompletionCallback

private void CompletionCallback(int bytesTransferred, SocketFlags flags, SocketError socketError)
{
    if (socketError == SocketError.Success)
    {
        FinishOperationAsyncSuccess(bytesTransferred, flags);
    }
    else
    {
        if (_currentSocket.CleanedUp)
        {
            socketError = SocketError.OperationAborted;
        }

        FinishOperationAsyncFailure(socketError, bytesTransferred, flags);
    }
}

在 CompletionCallback 中當 SocketError.Success 時進一步調用 FinishOperationAsyncSuccess

internal void FinishOperationAsyncSuccess(int bytesTransferred, SocketFlags flags)
{
    FinishOperationSyncSuccess(bytesTransferred, flags);

    // Raise completion event.
    if (_context == null)
    {
        OnCompleted(this);
    }
    else
    {
        ExecutionContext.Run(_context, s_executionCallback, this);
    }
}

從上面的代碼可以看出實際調用的也是 FinishOperationSyncSuccess ,異步與同步讀取數據最終調用的是同一個方法。

corefx 源碼學習:NetworkStream.ReadAsync 是如何從 Socket 異步讀取數據的