Search code examples
scalatcpakka-stream

What is the most recommended way to capture TCP Server's output to an Akka Stream queue?


I am experimenting with Akka Streams, with a view of understand exactly how one is supposed to consume what a TCP Server receives from a client (the server doesn't need to respond to client).

Here's a standard TCP server implementation (after applying what I understand from @heiko-seeberger 's succinct explanation here):

def runServer(system: ActorSystem, address: String, port: Int, collectingSink: Sink[ByteString,NotUsed]): Unit = {
    implicit val sys = system
    import system.dispatcher
    implicit val materializer = ActorMaterializer()

    val handler = Sink.foreach[IncomingConnection] { conn =>
      conn.handleWith(
        Flow[ByteString]
          .via(JsonFraming.objectScanner(maximumObjectLength = 400))
          .alsoTo(collectingSink)
          .map(b => ByteString.empty)
          .filter(_ == false)
      )
    }

    val connections = Tcp().bind(address, port)
    val binding = connections.to(handler).run()

    binding.onComplete {
      case Success(b) =>
        println("Server started, listening on: " + b.localAddress)
      case Failure(e) =>
        println(s"Server could not bind to $address:$port: ${e.getMessage}")
        system.terminate()
    }
  }

The value that I pass as collectingSink parameter to the runServer() function is constructed this way:

import akka.stream.scaladsl.{Flow, JsonFraming, Sink}
import akka.util.ByteString
import play.api.libs.json.Json

object DeviceDataProcessor {

      case class Readings (
                     radiationLevel: Double,
                     ambientTemp: Double,
                     photoSensor: Double,
                     humidity: Double,
                     sensorUUID: String,
                     timestampAttached: Long)

      val xformToDeviceReadings = Flow[ByteString]
        .via(JsonFraming.objectScanner(maximumObjectLength = 400))
        .map(b => {

            val jsonified = Json.parse(b.utf8String)
            val readings  =  Readings(
                           (jsonified \ "radiation_level")         .as[Double],
                           (jsonified \ "ambient_temperature")     .as[Double],
                           (jsonified \ "photosensor")             .as[Double],
                           (jsonified \ "humidity")                .as[Double],
                           (jsonified \ "sensor_uuid")             .as[String],
                           (jsonified \ "timestamp")               .as[Long]
            )
            readings
        })
        .to(Sink.queue())

    }

Finally, this is how I run my Driver:

object ConsumerDriver extends App {
       val actorSystem = ActorSystem("ServerSide")
       TCPServer.runServer(actorSystem,"127.0.0.1", 9899,DeviceDataProcessor.xformToDeviceReadings)
}

I am failing to grasp the reasoning behind two things here:

1) The type of xformToDeviceReadings is derived as

Sink[ByteStream,NotUsed]

Shouldn't the mapped type Readings appear here?

2) How do I begin to read from this queue and pass the elements into another upstream flow? Should I have to materialize first and then use the materialized queue as my new Source?

I have gone through the documentation at Akka site. But, I will be happy to be redirected to any specific portion of this document, or other posts on SO.

Please help me plug the gap in my concept.


Solution

  • 1) the type is Sink[ByteString,NotUsed] as Akka Streams is left biased when composing materialized values. This means that the materialized value exposed by your combined sink (xformToDeviceReadings) is the one coming from its first stage (a map, which materializes to NotUsed).

    To expose the materialized value you want you need to change to

    ...
    .toMat(Sink.queue())(Keep.right)
    

    Note that the type of your Sink now changes to Sink[ByteString, SinkQueueWithCancel[Readings]]

    2) to interact with your queue of Readings you need to run your stream, hence obtaining your materialized value (the queue) and start pulling items from it. This could happen in the connection handling:

    val handler: Sink[IncomingConnection, Future[Done]] = Flow[IncomingConnection]
      .map { conn =>
        conn.handleWith(
          Flow[ByteString]
            .via(JsonFraming.objectScanner(maximumObjectLength = 400))
            .alsoToMat(collectingSink)(Keep.right)
            .map(_ => ByteString.empty)
            .filter(_ == false)
        )
      }
      .mapAsync(1){ queue ⇒
        queue.pull()
      }
      .toMat(Sink.foreach(println))(Keep.right)
    

    Note that the solution above is not ideal, especially because the materialized queue is most likely not thread safe. If your purpose is to plumb these Readings to some other downstream flow, you better directly connect a Sink fit for the purpose, instead of going through a queue.