Search code examples
scalaakkaakka-httpakka-persistenceakka-testkit

Testing Akka Persistence with Akka HTTP - journal is not cleared


I try to use Akka Persistence Test Kit with Akka HTTP Test Kit, but my in-memory journal is not cleared before each test.

Very simple persistence behavior - just put string and get all stored strings:

object MyStore {
  def apply(): Behavior[Command] = EventSourcedBehavior[Command, Event, State](
    persistenceId = PersistenceId.ofUniqueId("myId"),
    emptyState = State(),
    commandHandler = (state, command) => handleCommand(state, command),
    eventHandler = (state, event) => handleEvent(state, event)
  )

  sealed trait Command
  case class AddCmd(s: String, replyTo: ActorRef[List[String]]) extends Command
  case class ReadCmd(replyTo: ActorRef[List[String]])           extends Command

  sealed trait Event
  case class AddEvent(s: String) extends Event

  case class State(values: List[String] = List())

  def handleCommand(state: State, command: Command): ReplyEffect[Event, State] = command match {
    case AddCmd(s, replyTo) => Effect.persist(AddEvent(s)).thenReply(replyTo)(updatedState => updatedState.values)
    case ReadCmd(replyTo)   => Effect.reply(replyTo)(state.values)
  }

  def handleEvent(state: State, event: Event): State = event match {
    case AddEvent(s) => state.copy(values = s :: state.values)
  }
}

Actor system configuration with persistence and serialization config:

object MySpec {
  val configuration: Config = {
    val serializationConfigString = "akka.actor.allow-java-serialization = on"
    val serialization             = ConfigFactory.parseString(serializationConfigString).resolve()
    val persistence               = PersistenceTestKitPlugin.config
    serialization.withFallback(persistence)
  }
}

My test class:

class MySpec extends AnyFunSuite with Matchers with ScalatestRouteTest with BeforeAndAfterEach {
  import MyStore._
  import akka.http.scaladsl.server.Directives._

  val persistenceTestKit: PersistenceTestKit = PersistenceTestKit(system)

  val route: Route = {
    import akka.actor.typed.scaladsl.AskPattern._
    import akka.actor.typed.scaladsl.adapter._
    implicit val typedSystem: ActorSystem[Nothing] = system.toTyped
    implicit val timeout: Timeout                  = 3.seconds

    val actor: ActorRef[Command] =
      system.spawn(behavior = MyStore(), name = "MyStore", props = Props.empty)

    get {
      val result = actor.ask(replyTo => ReadCmd(replyTo)).map(_.mkString(";"))
      complete(result)
    } ~ (post & entity(as[String])) { newRecord =>
      val result = actor.ask(replyTo => AddCmd(newRecord, replyTo)).map(_ => "OK")
      complete(result)
    }
  }

  override def createActorSystem(): akka.actor.ActorSystem =
    akka.actor.ActorSystem("MySystem", MySpec.configuration)

  override def beforeEach(): Unit = {
    persistenceTestKit.clearAll()
  }

  private def add(s: String) = {
    Post("/", s) ~> route ~> check {
      responseAs[String] shouldEqual "OK"
    }
  }

  test("Add two elements") {
    add("One")
    add("Two")

    Get() ~> route ~> check {
      responseAs[String] shouldEqual "Two;One"
    }
  }

  test("Add another two element") {
    add("Three")
    add("Four")

    Get() ~> route ~> check {
      responseAs[String] shouldEqual "Four;Three"
    }
  }
}

If I run each test separately it works. But if I run two test one after one I get in the second test:

Expected :"Four;Three[]"
Actual   :"Four;Three[;Two;One]"

My build.sbt file:

name := "persistence-http-test"

version := "0.1"

scalaVersion := "2.13.6"

val AkkaVersion = "2.6.14"
val AkkaHttpVersion = "10.2.4"
val ScalatestVersion = "3.2.5"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion,
  "com.typesafe.akka" %% "akka-stream-typed" % AkkaVersion,
  "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
  "com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion,
  "org.scalatest" %% "scalatest" % ScalatestVersion,
  "com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion,
  "com.typesafe.akka" %% "akka-http-testkit" % AkkaHttpVersion,
  "com.typesafe.akka" %% "akka-actor-testkit-typed" % AkkaVersion,
  "com.typesafe.akka" %% "akka-persistence-testkit" % AkkaVersion
)

Repository: https://github.com/LukBed/akka-persistence-http-test-issue


Solution

  • By performing persistenceTestKit.clearAll() before each test, all data in the persistence storage will be removed but the MyStore actor's in-memory state content remains unchanged -- hence the failure in subsequent tests.

    Another consequence is that the persistence storage will be out of sync with the actor's state. For data consistency, it would be better to provide some Clear-command/event handling routines similar to how Add/Read are handled:

    object MyStore {
      // ...
    
      sealed trait Command
      case class AddCmd(s: String, replyTo: ActorRef[List[String]]) extends Command
      case class ReadCmd(replyTo: ActorRef[List[String]])           extends Command
      case class ClearCmd(replyTo: ActorRef[List[String]])          extends Command
    
      sealed trait Event
      case class AddEvent(s: String) extends Event
      case object ClearEvent         extends Event
    
      case class State(values: List[String] = Nil)
    
      def handleCommand(state: State, command: Command): ReplyEffect[Event, State] = command match {
        case AddCmd(s, replyTo) => Effect.persist(AddEvent(s)).thenReply(replyTo)(_.values)
        case ReadCmd(replyTo)   => Effect.reply(replyTo)(state.values)
        case ClearCmd(replyTo)  => Effect.persist(ClearEvent).thenReply(replyTo)(_.values)
      }
    
      def handleEvent(state: State, event: Event): State = event match {
        case AddEvent(s) => state.copy(values = s :: state.values)
        case ClearEvent  => state.copy(values = Nil)
      }
    }
    

    You can now use route ~> check via, say, a put to let the Clear command/event handlers clear both the actor internal state and the persistence journal right before each test:

    class MySpec extends AnyFunSuite with Matchers with ScalatestRouteTest with BeforeAndAfterEach {
      // ...
    
      val route: Route = {
        ...
    
        get {
          val result = actor.ask(replyTo => ReadCmd(replyTo)).map(_.mkString(";"))
          complete(result)
        } ~
        put {
          val result = actor.ask(replyTo => ClearCmd(replyTo)).map(_.mkString(";"))
          complete(result)
        } ~
        post { entity(as[String]) { newRecord =>
          val result = actor.ask(replyTo => AddCmd(newRecord, replyTo)).map(_ => "OK")
          complete(result)
        } }
      }
    
      // ...
    
      override def beforeEach(): Unit = {
        initStateAndJournal()
      }
    
      private def initStateAndJournal() = {
        Put("/", "clear") ~> route ~> check {
          responseAs[String] shouldEqual ""
        }
      }
    
      // ...
    }