Edit: Big thanks to @oleksa, but there were also other problems in the code posted below. When using the Socket.ReceiveFromAsync and Socket.SendToAsync methods, the SocketAsyncEventArgs object that you pass MUST have its RemoteEndPoint property set (preferrably to the RemoteEndPoint of the client socket). Despite the docs stating that this property is ignored, it must still be set or else an exception occurs.
According to Microsoft's documentation, the socket.SendToAsync and socket.ReceiveFromAsync methods can be used with 'connection-oriented protocols' (SendToAsync Docs) and 'byte stream-style sockets' (ReceiveFromAsync Docs) respectively.
My question is this, do these methods support TCP, and if so what prerequisites are necessary to start sending and receiving data?
I added code samples so that i could better explain my issue. I cannot get the client handler to echo back bytes received from the client. When using the synchronous SendTo methods and the accepted client socket, it works no problem and i can echo bytes. When using the asynchronous versions (which work flawlessly with UDP) the client handler never receives any bytes. This behaviour is the same when using the server listener socket, or the accepted client socket (from SocketOperations.AcceptAsync).
Some Code:
public async Task ClientHandler(object clientArgsObj)
{
SocketAsyncEventArgs clientArgs = (SocketAsyncEventArgs)clientArgsObj;
byte[] receiveBuffer = new byte[Constants.PacketSize];
Memory<byte> receiveBufferMemory = new Memory<byte>(receiveBuffer);
Socket clientSocket = clientArgs.AcceptSocket;
EndPoint remoteEndPoint = clientArgs.AcceptSocket.RemoteEndPoint;
while (true)
{
ReceiveResult result = await ReceiveAsync(clientArgs, SocketFlags.None, receiveBufferMemory);
Console.WriteLine($"[{result.RemoteEndPoint} > Server] : {Encoding.UTF8.GetString(result.Contents.Span)}");
int sentBytes = await SendAsync(result.ClientArgs, SocketFlags.None, receiveBufferMemory);
Console.WriteLine($"[Server > {result.RemoteEndPoint}] Sent {sentBytes} bytes to {result.RemoteEndPoint}");
// This bottom stuff works just fine
//int receivedBytes = clientSocket.ReceiveFrom(receiveBuffer, SocketFlags.None, ref remoteEndPoint);
//int sentBytes = clientSocket.SendTo(receiveBuffer, receivedBytes, SocketFlags.None, remoteEndPoint);
}
}
/// <inheritdoc />
public override async Task StartAsync()
{
socket.Listen(5);
while (true)
{
SocketAsyncEventArgs clientArgs = await SocketOperations.AcceptAsync(socket);
await Task.Factory.StartNew(ClientHandler, clientArgs, CancellationToken.None,
TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
}
private Task<ReceiveResult> ReceiveAsync(SocketAsyncEventArgs args, SocketFlags socketFlags, Memory<byte> outputBuffer, CancellationToken cancellationToken = default)
{
return SocketOperations.ReceiveAsync(args.AcceptSocket, args, socketFlags, outputBuffer, cancellationToken);
//return SocketOperations.ReceiveAsync(socket, args, socketFlags, outputBuffer, cancellationToken);
}
private Task<int> SendAsync(SocketAsyncEventArgs args, SocketFlags socketFlags,
Memory<byte> inputBuffer, CancellationToken cancellationToken = default)
{
return SocketOperations.SendAsync(args.AcceptSocket, args, socketFlags, inputBuffer, cancellationToken);
//return SocketOperations.SendAsync(socket, args, socketFlags, inputBuffer, cancellationToken);
}
public sealed class AsyncSocketOperations
{
private const int MaximumPooledObjects = 10;
private readonly ObjectPool<SocketAsyncEventArgs> connectAsyncArgsPool;
private readonly ArrayPool<byte> receiveBufferPool;
private readonly ArrayPool<byte> receiveFromBufferPool;
private readonly ArrayPool<byte> sendBufferPool;
private readonly ArrayPool<byte> sendToBufferPool;
private readonly ObjectPool<SocketAsyncEventArgs> socketAsyncArgsPool;
private void HandleIOCompleted(object? sender, SocketAsyncEventArgs eventArgs)
{
bool closed = false;
switch (eventArgs.LastOperation)
{
case SocketAsyncOperation.SendTo:
AsyncWriteToken asyncSendToToken = (AsyncWriteToken)eventArgs.UserToken;
if (asyncSendToToken.CancellationToken.IsCancellationRequested)
{
asyncSendToToken.CompletionSource.SetCanceled();
}
else
{
if (eventArgs.SocketError != SocketError.Success)
{
asyncSendToToken.CompletionSource.SetException(
new SocketException((int)eventArgs.SocketError));
}
else
{
asyncSendToToken.CompletionSource.SetResult(eventArgs.BytesTransferred);
}
}
sendToBufferPool.Return(asyncSendToToken.RentedBuffer, true);
break;
case SocketAsyncOperation.ReceiveFrom:
AsyncReadToken asyncReceiveFromToken = (AsyncReadToken)eventArgs.UserToken;
if (asyncReceiveFromToken.CancellationToken.IsCancellationRequested)
{
asyncReceiveFromToken.CompletionSource.SetCanceled();
}
else
{
if (eventArgs.SocketError != SocketError.Success)
{
asyncReceiveFromToken.CompletionSource.SetException(
new SocketException((int)eventArgs.SocketError));
}
else
{
eventArgs.MemoryBuffer.CopyTo(asyncReceiveFromToken.UserBuffer);
ReceiveResult result = new ReceiveResult(eventArgs, asyncReceiveFromToken.UserBuffer,
eventArgs.BytesTransferred, eventArgs.RemoteEndPoint);
asyncReceiveFromToken.CompletionSource.SetResult(result);
}
}
receiveFromBufferPool.Return(asyncReceiveFromToken.RentedBuffer, true);
break;
case SocketAsyncOperation.Disconnect:
closed = true;
break;
case SocketAsyncOperation.Accept:
AsyncAcceptToken asyncAcceptToken = (AsyncAcceptToken)eventArgs.UserToken;
if (asyncAcceptToken.CancellationToken.IsCancellationRequested)
{
asyncAcceptToken.CompletionSource.SetCanceled();
}
else
{
if (eventArgs.SocketError != SocketError.Success)
{
asyncAcceptToken.CompletionSource.SetException(
new SocketException((int)eventArgs.SocketError));
}
else
{
asyncAcceptToken.CompletionSource.SetResult(eventArgs);
}
}
connectAsyncArgsPool.Return(eventArgs);
break;
case SocketAsyncOperation.Connect:
case SocketAsyncOperation.ReceiveMessageFrom:
case SocketAsyncOperation.SendPackets:
case SocketAsyncOperation.None:
throw new NotImplementedException();
default:
throw new ArgumentOutOfRangeException();
}
if (closed)
{
// handle the client closing the connection on tcp servers at some point
}
}
private readonly struct AsyncAcceptToken
{
public readonly CancellationToken CancellationToken;
public readonly TaskCompletionSource<SocketAsyncEventArgs> CompletionSource;
public AsyncAcceptToken(TaskCompletionSource<SocketAsyncEventArgs> tcs, CancellationToken cancellationToken = default)
{
CompletionSource = tcs;
CancellationToken = cancellationToken;
}
}
private readonly struct AsyncReadToken
{
public readonly CancellationToken CancellationToken;
public readonly TaskCompletionSource<ReceiveResult> CompletionSource;
public readonly byte[] RentedBuffer;
public readonly Memory<byte> UserBuffer;
public AsyncReadToken(byte[] rentedBuffer, Memory<byte> userBuffer, TaskCompletionSource<ReceiveResult> tcs,
CancellationToken cancellationToken = default)
{
RentedBuffer = rentedBuffer;
UserBuffer = userBuffer;
CompletionSource = tcs;
CancellationToken = cancellationToken;
}
}
private readonly struct AsyncWriteToken
{
public readonly CancellationToken CancellationToken;
public readonly TaskCompletionSource<int> CompletionSource;
public readonly byte[] RentedBuffer;
public AsyncWriteToken(byte[] rentedBuffer, TaskCompletionSource<int> tcs,
CancellationToken cancellationToken = default)
{
RentedBuffer = rentedBuffer;
CompletionSource = tcs;
CancellationToken = cancellationToken;
}
}
public AsyncSocketOperations(int bufferSize, int maxPooledObjectCount = MaximumPooledObjects, bool preallocateBuffers = false)
{
MaxPooledObjects = maxPooledObjectCount;
BufferSize = bufferSize;
sendBufferPool = ArrayPool<byte>.Create(bufferSize, maxPooledObjectCount);
receiveBufferPool = ArrayPool<byte>.Create(bufferSize, maxPooledObjectCount);
sendToBufferPool = ArrayPool<byte>.Create(bufferSize, maxPooledObjectCount);
receiveFromBufferPool = ArrayPool<byte>.Create(bufferSize, maxPooledObjectCount);
connectAsyncArgsPool = new LeakTrackingObjectPool<SocketAsyncEventArgs>(
new DefaultObjectPool<SocketAsyncEventArgs>(new DefaultPooledObjectPolicy<SocketAsyncEventArgs>(),
maxPooledObjectCount));
socketAsyncArgsPool = new LeakTrackingObjectPool<SocketAsyncEventArgs>(
new DefaultObjectPool<SocketAsyncEventArgs>(new DefaultPooledObjectPolicy<SocketAsyncEventArgs>(),
maxPooledObjectCount));
for (int i = 0; i < MaxPooledObjects; i++)
{
SocketAsyncEventArgs connectArgs = new SocketAsyncEventArgs();
connectArgs.Completed += HandleIOCompleted;
connectAsyncArgsPool.Return(connectArgs);
SocketAsyncEventArgs socketArgs = new SocketAsyncEventArgs();
socketArgs.Completed += HandleIOCompleted;
socketAsyncArgsPool.Return(socketArgs);
}
if (preallocateBuffers)
{
// TODO: Allocate and return array pool buffers
}
}
public int BufferSize { get; }
public int MaxPooledObjects { get; }
public Task<SocketAsyncEventArgs> AcceptAsync(Socket socket, CancellationToken cancellationToken = default)
{
TaskCompletionSource<SocketAsyncEventArgs> tcs = new TaskCompletionSource<SocketAsyncEventArgs>();
SocketAsyncEventArgs args = connectAsyncArgsPool.Get();
args.AcceptSocket = null;
args.UserToken = new AsyncAcceptToken(tcs, cancellationToken);
if (socket.AcceptAsync(args)) return tcs.Task;
return Task.FromResult(args);
}
public Task<ReceiveResult> ReceiveFromAsync(Socket socket, SocketAsyncEventArgs args, SocketFlags socketFlags,
Memory<byte> outputBuffer, CancellationToken cancellationToken = default)
{
TaskCompletionSource<ReceiveResult> tcs = new TaskCompletionSource<ReceiveResult>();
byte[] rentedReceiveFromBuffer = receiveFromBufferPool.Rent(BufferSize);
Memory<byte> rentedReceiveFromBufferMemory = new Memory<byte>(rentedReceiveFromBuffer);
args.SetBuffer(rentedReceiveFromBufferMemory);
args.SocketFlags = socketFlags;
args.UserToken = new AsyncReadToken(rentedReceiveFromBuffer, outputBuffer, tcs, cancellationToken);
// if the receive operation doesn't complete synchronously, returns the awaitable task
if (socket.ReceiveFromAsync(args)) return tcs.Task;
args.MemoryBuffer.CopyTo(outputBuffer);
ReceiveResult result = new ReceiveResult(args, outputBuffer, args.BytesTransferred, args.RemoteEndPoint);
receiveFromBufferPool.Return(rentedReceiveFromBuffer, true);
return Task.FromResult(result);
}
public Task<int> SendToAsync(Socket socket, SocketAsyncEventArgs args, SocketFlags socketFlags,
Memory<byte> inputBuffer, CancellationToken cancellationToken = default)
{
TaskCompletionSource<int> tcs = new TaskCompletionSource<int>();
byte[] rentedSendToBuffer = sendToBufferPool.Rent(BufferSize);
Memory<byte> rentedSendToBufferMemory = new Memory<byte>(rentedSendToBuffer);
inputBuffer.CopyTo(rentedSendToBufferMemory);
args.SetBuffer(rentedSendToBufferMemory);
args.SocketFlags = socketFlags;
args.UserToken = new AsyncWriteToken(rentedSendToBuffer, tcs, cancellationToken);
// if the send operation doesn't complete synchronously, return the awaitable task
if (socket.SendToAsync(args)) return tcs.Task;
int result = args.BytesTransferred;
sendToBufferPool.Return(rentedSendToBuffer, true);
return Task.FromResult(result);
}
I believe that both SendToAsync
and ReceiveFromAsync
methods are just non-blocking copy of SendTo
and ReceiveFrom
methods
SendTo doc has sample with ProtocolType.Udp
but you can use ProtocolType.Tcp
as well.
You have to start TCP server with ReceiveFromAsync
and then send data using SendToAsync
. Please check MS SendTo
and ReceiveFrom
samples and update them with async version accordingly.