I'm trying to test a new feature of Akka 2.4 - PersistentFSM
(http://doc.akka.io/docs/akka/2.4.0/scala/persistence.html#Persistent_FSM) with the simple example below.
This example generates 5 random integers and ads it to the sequence. What I'd like it to do is to persist the data and on the next run of the application keep appending to existing sequence of numbers. Current documentation on PersistentFSM
is a little scarce and there's no clear way how to make this work. Any ideas?
TestFSM.scala:
import akka.actor.{Actor, ActorSystem, Props}
import akka.persistence.fsm.PersistentFSM
import akka.persistence.fsm.PersistentFSM.FSMState
import scala.reflect._
import scala.util.Random
final case class SetNumber(num: Integer)
sealed trait State extends FSMState
case object Idle extends State {
override def identifier: String = "Idle"
}
case object Active extends State {
override def identifier: String = "Active"
}
sealed trait Data {
def add(number: Integer): Data
}
case object Empty extends Data {
def add(number: Integer) = Numbers(Vector(number))
}
final case class Numbers(queue: Seq[Integer]) extends Data {
def add(number: Integer) = Numbers(queue :+ number)
}
sealed trait DomainEvt
case class SetNumberEvt(num: Integer) extends DomainEvt
class Generator extends Actor with PersistentFSM[State, Data, DomainEvt] {
override def applyEvent(domainEvent: DomainEvt, currentData: Data): Data = {
domainEvent match {
case SetNumberEvt(num) => currentData.add(num)
}
}
override def persistenceId: String = "generator"
override def domainEventClassTag: ClassTag[DomainEvt] = classTag[DomainEvt]
startWith(Idle, Empty)
when(Idle) {
case Event(SetNumber(num), Empty) =>
goto(Active) applying SetNumberEvt(num)
}
when(Active) {
case Event(SetNumber(num), numbers: Data) =>
println(numbers)
stay applying SetNumberEvt(num)
}
initialize()
}
object TestFSM extends App {
val system = ActorSystem()
val actor = system.actorOf(Props[Generator])
actor ! SetNumber(Random.nextInt())
actor ! SetNumber(Random.nextInt())
actor ! SetNumber(Random.nextInt())
actor ! SetNumber(Random.nextInt())
actor ! SetNumber(Random.nextInt())
Thread.sleep(1000)
system.terminate()
}
reference.conf:
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.journal.leveldb.dir = "target/journal"
akka.persistence.snapshot-store.local.dir = "target/snapshots"
PersistentFSM
now implicitly persists both the state changes and the domain events as the FSM proceeds.
Although the documentation you link to is lacking, it is possible to see from the code that persistAll
is getting called under the hood at the appropriate point: https://github.com/akka/akka/blob/v2.4.0/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala#L110
Suggest you test the implementation as you have it to see whether it does indeed persist.
EDIT:
I was able to execute your code, unchanged, and see the target/journal
directory get created. I used the following build.sbt
:
name := "Scratch"
version := "1.0"
scalaVersion := "2.11.7"
sbtVersion := "0.13.7"
mainClass := Some("TestFSM")
fork := true
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.0"
libraryDependencies += "com.typesafe.akka" %% "akka-persistence" % "2.4.0"
libraryDependencies += "org.iq80.leveldb" % "leveldb" % "0.7"
libraryDependencies += "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8"
And the following application.conf
:
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.journal.leveldb.dir = "target/journal"
akka.persistence.snapshot-store.local.dir = "target/snapshots"
I think application.conf
is the appropriate way to specify application specific conf rather than reference.conf
, which is meant for libraries.