Search code examples
c#.net-coreakka.netakka.net-streams

Why isn't my Akka.NET Stream Subscriber receiving messages?


I'm am having a go at writing a simple Akka.NET streams flow. The Source is an IActorRef. The Sink is a ISubscriber. I am using TestKit to implement it as a unit test:

[Fact]
public void AkkaStreams_ActorSourcePublisherSink_Works()
{
    using (var materializer = Sys.Materializer())
    {
        var probe = CreateTestProbe();
        var source = Source.ActorRef<HandlerErrorEvent>(10, OverflowStrategy.DropNew);
        var subscriber = new Mock<ISubscriber<HandlerErrorEvent>>();
        var sink = Sink.FromSubscriber<HandlerErrorEvent>(subscriber.Object);
        var graph = source.ToMaterialized(sink, Keep.Both);
        var (actor, publisher) = graph.Run(materializer);

        subscriber.Verify(s => s.OnSubscribe(It.IsAny<ISubscription>()));

        var evnt = new HandlerErrorEvent("", HandlerResult.NotHandled);
        actor.Tell(evnt, ActorRefs.Nobody);

        base.AwaitCondition(() =>
        {
            try
            {
                subscriber.Verify(s => s.OnNext(It.IsAny<HandlerErrorEvent>()));
                return true;
            }
            catch(MockException)
            {
                return false;
            }
        });
    }
}

The initial Verify call on the OnSubscribe method passes fine, but the Mock Subscriber never receives a call to OnNext.

What am I doing wrong?

Running as netcoreapp2.0. References:

"Akka.TestKit.Xunit2" Version="1.3.2"
"Microsoft.NET.Test.Sdk" Version="15.5.0"
"Moq" Version="4.8.0-rc1"
"xunit" Version="2.3.1"
"xunit.runner.visualstudio" Version="2.3.1"
"dotnet-xunit" Version="2.3.1"

Solution

  • Your ISubscriber<> mock is not compliant to Reactive Streams specification. It states, that in order to get any message after subscribing, subscriber must first communicate a demand using ISubscription.Request(long) method.

    In general if you're using Akka.Streams test kit, you don't need to mock subscriptions. Just download Akka.Streams.TestKit to get extension methods for Akka.Streams. This way you'll be able to build a fake subscriber simply by calling this.CreateManualSubscriberProbe<HandlerErrorEvent>(); inside your TestKit class. It contains a dozens of methods you can use for assertion.

    Example:

    public class ExampleTest : TestKit
    {
        [Fact]
        public void Select_should_map_output()
        {
            using (var materializer = Sys.Materializer())
            {
                // create test probe for subscriptions
                var probe = this.CreateManualSubscriberProbe<int>();
    
                // create flow materialized as publisher
                var publisher = Source.From(new[] { 1, 2, 3 })
                    .Select(i => i + 1)
                    .RunWith(Sink.AsPublisher<int>(fanout: false), materializer);
    
                // subscribe probe and receive subscription
                publisher.Subscribe(probe);
                var subscription = probe.ExpectSubscription();
    
                // request number of elements to receive, here drain source utill the end
                subscription.Request(4); 
    
                // validate assertions
                probe.ExpectNext(2);
                probe.ExpectNext(3);
                probe.ExpectNext(4);
    
                // since source had finite number of 3 elements, expect it to complete
                probe.ExpectComplete();
            }
        }
    }