I'm am having a go at writing a simple Akka.NET streams flow. The Source is an IActorRef
. The Sink is a ISubscriber
. I am using TestKit to implement it as a unit test:
[Fact]
public void AkkaStreams_ActorSourcePublisherSink_Works()
{
using (var materializer = Sys.Materializer())
{
var probe = CreateTestProbe();
var source = Source.ActorRef<HandlerErrorEvent>(10, OverflowStrategy.DropNew);
var subscriber = new Mock<ISubscriber<HandlerErrorEvent>>();
var sink = Sink.FromSubscriber<HandlerErrorEvent>(subscriber.Object);
var graph = source.ToMaterialized(sink, Keep.Both);
var (actor, publisher) = graph.Run(materializer);
subscriber.Verify(s => s.OnSubscribe(It.IsAny<ISubscription>()));
var evnt = new HandlerErrorEvent("", HandlerResult.NotHandled);
actor.Tell(evnt, ActorRefs.Nobody);
base.AwaitCondition(() =>
{
try
{
subscriber.Verify(s => s.OnNext(It.IsAny<HandlerErrorEvent>()));
return true;
}
catch(MockException)
{
return false;
}
});
}
}
The initial Verify
call on the OnSubscribe
method passes fine, but the Mock Subscriber never receives a call to OnNext
.
What am I doing wrong?
Running as netcoreapp2.0
. References:
"Akka.TestKit.Xunit2" Version="1.3.2"
"Microsoft.NET.Test.Sdk" Version="15.5.0"
"Moq" Version="4.8.0-rc1"
"xunit" Version="2.3.1"
"xunit.runner.visualstudio" Version="2.3.1"
"dotnet-xunit" Version="2.3.1"
Your ISubscriber<>
mock is not compliant to Reactive Streams specification. It states, that in order to get any message after subscribing, subscriber must first communicate a demand using ISubscription.Request(long) method.
In general if you're using Akka.Streams test kit, you don't need to mock subscriptions. Just download Akka.Streams.TestKit to get extension methods for Akka.Streams. This way you'll be able to build a fake subscriber simply by calling this.CreateManualSubscriberProbe<HandlerErrorEvent>();
inside your TestKit
class. It contains a dozens of methods you can use for assertion.
Example:
public class ExampleTest : TestKit
{
[Fact]
public void Select_should_map_output()
{
using (var materializer = Sys.Materializer())
{
// create test probe for subscriptions
var probe = this.CreateManualSubscriberProbe<int>();
// create flow materialized as publisher
var publisher = Source.From(new[] { 1, 2, 3 })
.Select(i => i + 1)
.RunWith(Sink.AsPublisher<int>(fanout: false), materializer);
// subscribe probe and receive subscription
publisher.Subscribe(probe);
var subscription = probe.ExpectSubscription();
// request number of elements to receive, here drain source utill the end
subscription.Request(4);
// validate assertions
probe.ExpectNext(2);
probe.ExpectNext(3);
probe.ExpectNext(4);
// since source had finite number of 3 elements, expect it to complete
probe.ExpectComplete();
}
}
}