I've been playing a little with the streams extension package for Akka.Net and noticed this error at attempting to combine buffer and throttle methods:
using (var system = ActorSystem.Create("test-system"))
using (var materializer = system.Materializer(GetSettings(system)))
{
int index = 0;
var sink = Sink.ActorRefWithAck<KeyValue>(
system.ActorOf<Writer>(),
new OnInitMessage(),
new OnAcknowledgeMessage(),
OnComplete.Instance,
exception => new OnError(exception));
ServiceBusSource
.Create(client, message =>
{
var json = new StreamReader(message.GetBody<Stream>(), Encoding.UTF8).ReadToEnd();
var result = JsonConvert.DeserializeObject<KeyValue>(json);
message.Complete();
return result;
})
.WithLogger(system, entity => $"{entity.Key} => {entity.Value}")
.Buffer(1, OverflowStrategy.Fail)
.Throttle(1, TimeSpan.FromSeconds(5), 3, ThrottleMode.Shaping)
.ToMaterialized(sink, Keep.Right)
.Run(materializer);
Console.ReadLine();
}
I'm using ServiceBusSource from Alpakka These are the packages I'm referencing:
I'm intentionally making it fail in order to see how behaves BUT, after failing from buffer's strategy, the stream completes and no more elements are being pulled.
KeyValue.cs
public class KeyValue
{
public int Id { get; set; }
public string Key { get; set; }
public string Value { get; set; }
public DateTime Produced { get; set; }
public DateTime Emitted { get; set; }
public override string ToString()
{
return $"[{Produced}] - [{Emitted}] => {Id} {Key}:{Value}";
}
}
GetSettings Method:
ActorMaterializerSettings GetSettings(ActorSystem system)
{
return ActorMaterializerSettings.Create(system)
.WithSupervisionStrategy(cause =>
{
system.Log.Error(cause, "Failed");
return Directive.Resume;
});
}
There are several ways of handling errors inside of a stream - most of them described in docs:
Recover
to make a fallback event from error.RecoverWithRetries
to allow to redirect to a different stream upon error.Restart.WithBackoff
to rebuild a retry stream after exponential backoff delay.WithSupervisionStrategy
- which is a very limited option, as it works only on stages that refer to it explicitly (as explained in docs).Your case is by design - when you use OverflowStrategy.Fail
it means, that once overflow is reached, an error will be produced. Reaction of most of the akka stages is to close stream immediately upon failure.