Search code examples
scalaapache-kafkaakkareactive-streams

Akka Stream dynamic Sink depending on Message from Kafka topic


I have a Kafka consumer that reads Message. Each Message has an ID and content.

case class Message(id: String, content: String)

Depending on the ID, I want to write the Message into a separate sink. Specifically into a MongoDB collection. The Mongo provides a Sink that writes it to the DB into the specified collection.

val sink: Sink[Document, Future[Done]] = MongoSink.insertOne(collection(id))

The Problem is, i need to specify the sink when Connecting the Kafka Consumer Source, but each element defines into which sink it should go. Is there a way to dynamically use a specific sink when an element arrives. Or is this not possible and I should for example, use a different Kafka topic for each id and connect each source to a separate sink?


Solution

  • It's not totally clear how the types line up in your example (e.g. the relationship between Document and Message), but there are a couple of approaches you can take:

    • If there are a lot of possible collections and they can't be known in advance, then the least bad option in Akka Streams is going to be along the lines of
    Sink.foreachAsync[Message](parallelism) { msg =>
      val document = documentFromMessage(msg)
      val collection = collection(msg.id)
      Source.single(document).runWith(MongoSink.insertOne(collection))
    }
    

    Note that this will use a new Mongo sink for every message, which may have efficiency concerns. Note that if there's a lighter weight way (e.g. in the reactivemongo driver?) which returns a Future after inserting a single document, but uses something like a connection pool to reduce overhead for single-document inserts, that will probably be preferable.

    • If the collections are known beforehand, you can prebuild sinks for each collection and use Partition and the GraphDSL to define a sink which incorporates the prebuilt sinks
    // collection0, etc. are predefined and encompass all of the collections which might be returned by collection(id)
    val collections: Map[MongoCollection[Document], (Int, Sink[Document, Future[Done]])] = Map(
      collection0 -> (0 -> MongoSink.insertOne(collection0)),
      collection1 -> (1 -> MongoSink.insertOne(collection1)),
      collection2 -> (2 -> MongoSink.insertOne(collection2)),
      collection3 -> (3 -> MongoSink.insertOne(collection3))
    )
    
    val combinedSink = Sink.fromGraph(GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._
    
      val partition = builder.add(
        Partition[Message](
          collections.size,
          { msg => collections(collection(msg.id))._1 }
        )
      )
    
      val toDocument = Flow[Message].map(documentFromMessage)
    
      collections.foreach {
        case (_, (n, sink)) =>
          partition.out(n) ~> toDocument ~> sink
      }
    
      SinkShape.of(partition.in)
    }