I try to use Akka Persistence Test Kit with Akka HTTP Test Kit, but my in-memory journal is not cleared before each test.
Very simple persistence behavior - just put string and get all stored strings:
object MyStore {
def apply(): Behavior[Command] = EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId.ofUniqueId("myId"),
emptyState = State(),
commandHandler = (state, command) => handleCommand(state, command),
eventHandler = (state, event) => handleEvent(state, event)
)
sealed trait Command
case class AddCmd(s: String, replyTo: ActorRef[List[String]]) extends Command
case class ReadCmd(replyTo: ActorRef[List[String]]) extends Command
sealed trait Event
case class AddEvent(s: String) extends Event
case class State(values: List[String] = List())
def handleCommand(state: State, command: Command): ReplyEffect[Event, State] = command match {
case AddCmd(s, replyTo) => Effect.persist(AddEvent(s)).thenReply(replyTo)(updatedState => updatedState.values)
case ReadCmd(replyTo) => Effect.reply(replyTo)(state.values)
}
def handleEvent(state: State, event: Event): State = event match {
case AddEvent(s) => state.copy(values = s :: state.values)
}
}
Actor system configuration with persistence and serialization config:
object MySpec {
val configuration: Config = {
val serializationConfigString = "akka.actor.allow-java-serialization = on"
val serialization = ConfigFactory.parseString(serializationConfigString).resolve()
val persistence = PersistenceTestKitPlugin.config
serialization.withFallback(persistence)
}
}
My test class:
class MySpec extends AnyFunSuite with Matchers with ScalatestRouteTest with BeforeAndAfterEach {
import MyStore._
import akka.http.scaladsl.server.Directives._
val persistenceTestKit: PersistenceTestKit = PersistenceTestKit(system)
val route: Route = {
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.adapter._
implicit val typedSystem: ActorSystem[Nothing] = system.toTyped
implicit val timeout: Timeout = 3.seconds
val actor: ActorRef[Command] =
system.spawn(behavior = MyStore(), name = "MyStore", props = Props.empty)
get {
val result = actor.ask(replyTo => ReadCmd(replyTo)).map(_.mkString(";"))
complete(result)
} ~ (post & entity(as[String])) { newRecord =>
val result = actor.ask(replyTo => AddCmd(newRecord, replyTo)).map(_ => "OK")
complete(result)
}
}
override def createActorSystem(): akka.actor.ActorSystem =
akka.actor.ActorSystem("MySystem", MySpec.configuration)
override def beforeEach(): Unit = {
persistenceTestKit.clearAll()
}
private def add(s: String) = {
Post("/", s) ~> route ~> check {
responseAs[String] shouldEqual "OK"
}
}
test("Add two elements") {
add("One")
add("Two")
Get() ~> route ~> check {
responseAs[String] shouldEqual "Two;One"
}
}
test("Add another two element") {
add("Three")
add("Four")
Get() ~> route ~> check {
responseAs[String] shouldEqual "Four;Three"
}
}
}
If I run each test separately it works. But if I run two test one after one I get in the second test:
Expected :"Four;Three[]"
Actual :"Four;Three[;Two;One]"
My build.sbt file:
name := "persistence-http-test"
version := "0.1"
scalaVersion := "2.13.6"
val AkkaVersion = "2.6.14"
val AkkaHttpVersion = "10.2.4"
val ScalatestVersion = "3.2.5"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion,
"com.typesafe.akka" %% "akka-stream-typed" % AkkaVersion,
"com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
"com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion,
"org.scalatest" %% "scalatest" % ScalatestVersion,
"com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion,
"com.typesafe.akka" %% "akka-http-testkit" % AkkaHttpVersion,
"com.typesafe.akka" %% "akka-actor-testkit-typed" % AkkaVersion,
"com.typesafe.akka" %% "akka-persistence-testkit" % AkkaVersion
)
Repository: https://github.com/LukBed/akka-persistence-http-test-issue
By performing persistenceTestKit.clearAll()
before each test, all data in the persistence storage will be removed but the MyStore actor's in-memory state content remains unchanged -- hence the failure in subsequent tests.
Another consequence is that the persistence storage will be out of sync with the actor's state. For data consistency, it would be better to provide some Clear
-command/event handling routines similar to how Add
/Read
are handled:
object MyStore {
// ...
sealed trait Command
case class AddCmd(s: String, replyTo: ActorRef[List[String]]) extends Command
case class ReadCmd(replyTo: ActorRef[List[String]]) extends Command
case class ClearCmd(replyTo: ActorRef[List[String]]) extends Command
sealed trait Event
case class AddEvent(s: String) extends Event
case object ClearEvent extends Event
case class State(values: List[String] = Nil)
def handleCommand(state: State, command: Command): ReplyEffect[Event, State] = command match {
case AddCmd(s, replyTo) => Effect.persist(AddEvent(s)).thenReply(replyTo)(_.values)
case ReadCmd(replyTo) => Effect.reply(replyTo)(state.values)
case ClearCmd(replyTo) => Effect.persist(ClearEvent).thenReply(replyTo)(_.values)
}
def handleEvent(state: State, event: Event): State = event match {
case AddEvent(s) => state.copy(values = s :: state.values)
case ClearEvent => state.copy(values = Nil)
}
}
You can now use route ~> check
via, say, a put
to let the Clear
command/event handlers clear both the actor internal state and the persistence journal right before each test:
class MySpec extends AnyFunSuite with Matchers with ScalatestRouteTest with BeforeAndAfterEach {
// ...
val route: Route = {
...
get {
val result = actor.ask(replyTo => ReadCmd(replyTo)).map(_.mkString(";"))
complete(result)
} ~
put {
val result = actor.ask(replyTo => ClearCmd(replyTo)).map(_.mkString(";"))
complete(result)
} ~
post { entity(as[String]) { newRecord =>
val result = actor.ask(replyTo => AddCmd(newRecord, replyTo)).map(_ => "OK")
complete(result)
} }
}
// ...
override def beforeEach(): Unit = {
initStateAndJournal()
}
private def initStateAndJournal() = {
Put("/", "clear") ~> route ~> check {
responseAs[String] shouldEqual ""
}
}
// ...
}