Search code examples
scalaunit-testingakka-clusterdistributedpubsub

How can I test a publisher to a DistributedPubSub in Akka Cluster?


I have an actor whose sole responsibility is to forward messages it receives from external interfaces (command-line, user, etc.) to appropriate topics. I want to test that it correctly publishes these messages.

I will need to create some dummy subscriber who will expect messages published to a certain topic and make assertions about the messages it receives.

Here's my code that I attempted to realize that:

Messages.scala

case class Foo(foo: String)

InterfaceForwardingActor.scala

import akka.actor.{Actor, ActorLogging}
import akka.cluster.pubsub.{DistributedPubSub, DistributedPubSubMediator}
import com.typesafe.config.Config

/** Actor responsible for forwarding stimuli external to the system.
  * For instance, messages from the command-line interface or from a UI.
  *
  */
class InterfaceForwardingActor extends Actor with ActorLogging {
  import DistributedPubSubMediator.Publish

  protected val mediator = DistributedPubSub(context.system).mediator

  log.info(s"Hello from interface forwarder.")

  final val topic = "info"

  def receive = {
    case foo: Foo => {
      log.info("Forwarding a Foo message")
      mediator ! Publish(topic, foo)
    }
  }  
}

and the test code

InterfaceForwardingActorTest.scala

import akka.actor.{ActorSystem, Props}
import akka.cluster.client.ClusterClient.Publish
import akka.cluster.pubsub.DistributedPubSub
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
import com.typesafe.config.ConfigFactory
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}

class InterfaceForwardingActorTest extends 
    TestKit(ActorSystem("InterfaceForwardingActorSpec")) with ImplicitSender with 
    WordSpecLike with Matchers with BeforeAndAfterAll {

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  "An InterfaceForwardingActor" must {
    val interfaceForwardingActor = system.actorOf(Props[InterfaceForwardingActor])

    val probe = TestProbe()
    val mediator = DistributedPubSub(system).mediator

    // subscribe the test probe to the "info" topic
    mediator ! Publish("info", probe.ref)

    "publish a Foo message" in {
      val msg = Foo("test")
      interfaceForwardingActor ! msg
      probe.expectMsg(msg)
    }
  }
}

What I find is that the probe, which is subscribed to the info topic, doesn't receive the message within the default timeout period of 3 seconds and the assertion fails. The interesting part, however, is that I do see the log message stating that the interface forwarding actor is indeed forwarding a Foo message.

What am I doing wrong in my test?


Solution

  • The TestProbe should be subscribed to the topic in the the test code:

    mediator ! Subscribe("info", probe.ref)

    instead of

    mediator ! Publish("info", probe.ref)

    Documentation page of distributed pub-sub is here, for reference.