I'm playing with the akka-stream-and-http-experimental
1.0. So far, I've a user service that can accept and respond to HTTP requests. I'm also going to have an appointment service that can manage appointments. In order to make appointments, one must be an existing user. Appointment service will check with the user service if the user exists. Now this obviously can be done over HTTP but I'd rather have the appointment service send a message to the user service. Being new to this, I'm not clear how to use actors (as akka-http
abstracts that) to send and receive messages. There's mention of ActorRef
and ActorPublisher
in the doc but no examples of the former and the later looks like an overkill for my need.
My code looks like the following and is on Github:
trait UserReadResource extends ActorPlumbing {
val userService: UserService
val readRoute = {
// route stuff
}
}
trait ActorPlumbing {
implicit val system: ActorSystem
implicit def executor: ExecutionContextExecutor
implicit val materializer: Materializer
def config: Config
val logger: LoggingAdapter
}
trait UserService { // Implemented by Slick and MongoDB in the backend
def findByFirstName(firstName: String): Future[immutable.Seq[User]]
}
object UserApp extends App with UserReadResource with UserWriteResource with ActorPlumbing {
override implicit val system = ActorSystem()
override implicit def executor = system.dispatcher
override implicit val materializer = ActorMaterializer()
override def config = ConfigFactory.load()
override val logger = Logging(system, getClass)
private val collection = newCollection("users")
val userRepository = new MongoDBUserRepository(collection)
val userService: UserService = new MongoDBUserRepositoryAdapter(userRepository) with UserBusinessDelegate {
// implicitly finds the executor in scope. Ain't that cute?
override implicit def executor = implicitly
}
Http().bindAndHandle(readRoute ~ writeRoute, config.getString("http.interface"), config.getInt("http.port"))
}
Edit:
I figured out how to send messages, which could be done using Source.actorRef
. That only emits the messages into the stream. What I'd like to do is for the route handler class to receive the response. That way when I create the appointment service, it's actor can call the user service actor and receive the response in the same manner as the user route handler in my example does.
Pseudo code:
val src = Source.single(name) \\ How to send this to an actor and get the response
Edit 2:
Based on the @yardena answer, I came up with the following but the last line doesn't compile. My actor publisher returns a Future
which I'm guessing will be wrapped in a Promise
and then delivered as a Future
to the route handler.
get {
parameters("firstName".?, "lastName".?).as(FindByNameRequest) { name =>
type FindResponse = Future[FindByNameResponse]
val src: Source[FindResponse, Unit] = Source.actorPublisher[FindResponse](businessDelegateProps).mapMaterializedValue {
_ ! name
}
val emptyResponse = Future.apply(FindByNameResponse(OK, Seq.empty))
val sink = Sink.fold(emptyResponse)((_, response: FindResponse) => response)
complete(src.runWith(sink)) // doesn't compile
}
}
I ended up with using Actor.ask
. Simple.