I've made quite a simple implementation with Akka.net Streams using Sink.ActorRefWithAck
: a subscriber asks for a large string to a publisher which sends it by slices.
It works perfectly fine locally (UT) but not remotely. And I cannot understand what's wrong? Concretly: the subscriber is able to send the request to the publisher which responds with an OnInit
message but then the OnInit.Ack
will never goes back to the publisher. This Ack
message ends up as a dead letter:
INFO Akka.Actor.EmptyLocalActorRef - Message Ack from akka.tcp://OutOfProcessTaskProcessing@localhost:12100/user/Streamer_636568240846733287 to akka://OutOfProcessTaskProcessing/user/StreamSupervisor-0/StageActorRef-0 was not delivered. 1 dead letters encountered.
Note that the log is from the destination actor so the message is handled in the right process. There is no obvious path error.
Looking at the publisher code which does not handle this message, I really don't know what I'm doing wrong:
public static void ReplyWithStreamedString(IUntypedActorContext context, string toStream, int chunkSize = 2000)
{
Source<string, NotUsed> source = Source.From(toStream.SplitBy(chunkSize));
source.To(Sink.ActorRefWithAck<string>(context.Sender, new StreamMessage.OnInit(),
new StreamMessage.OnInit.Ack(),
new StreamMessage.Completed(),
exception => new StreamMessage.Failure(exception.Message)))
.Run(context.System.Materializer());
}
Here is the subscriber code:
public static Task<string> AskStreamedString(this ICanTell self, object message, ActorSystem context, TimeSpan? timeout = null)
{
var tcs = new TaskCompletionSource<string>();
if (timeout.HasValue)
{
CancellationTokenSource ct = new CancellationTokenSource(timeout.Value);
ct.Token.Register(() => tcs.TrySetCanceled());
}
var props = Props.Create(() => new StreamerActorRef(tcs));
var tempActor = context.ActorOf(props, $"Streamer_{DateTime.Now.Ticks}");
self.Tell(message, tempActor);
return tcs.Task.ContinueWith(task =>
{
context.Stop(tempActor);
if(task.IsCanceled)
throw new OperationCanceledException();
if (task.IsFaulted)
throw task.Exception.GetBaseException();
return task.Result;
});
}
internal class StreamerActorRef : ReceiveActor
{
readonly TaskCompletionSource<string> _tcs;
private readonly StringBuilder _stringBuilder = new StringBuilder();
public StreamerActorRef(TaskCompletionSource<string> tcs)
{
_tcs = tcs;
Ready();
}
private void Ready()
{
ReceiveAny(message =>
{
switch (message)
{
case StreamMessage.OnInit _:
Sender.Tell(new StreamMessage.OnInit.Ack());
break;
case StreamMessage.Completed _:
string result = _stringBuilder.ToString();
_tcs.TrySetResult(result);
break;
case string slice:
_stringBuilder.Append(slice);
Sender.Tell(new StreamMessage.OnInit.Ack());
break;
case StreamMessage.Failure error:
_tcs.TrySetException(new InvalidOperationException(error.Reason));
break;
}
});
}
}
With messages:
public class StreamMessage
{
public class OnInit
{
public class Ack{}
}
public class Completed { }
public class Failure
{
public string Reason { get; }
public Failure(string reason)
{
Reason = reason;
}
}
}
In general sources and sinks working with actor refs have not been designed to work over remote connections - they don't cover message retries, which can cause deadlocks in your system if some stream control message won't be passed in.
The feature you're looking for is called StreamRefs (which works like actor refs, but for streams), and will be shipped as part of v1.4 release (see github pull request for more details).