I have a Kafka consumer that reads Message. Each Message has an ID and content.
case class Message(id: String, content: String)
Depending on the ID, I want to write the Message into a separate sink. Specifically into a MongoDB collection. The Mongo provides a Sink that writes it to the DB into the specified collection.
val sink: Sink[Document, Future[Done]] = MongoSink.insertOne(collection(id))
The Problem is, i need to specify the sink when Connecting the Kafka Consumer Source, but each element defines into which sink it should go. Is there a way to dynamically use a specific sink when an element arrives. Or is this not possible and I should for example, use a different Kafka topic for each id and connect each source to a separate sink?
It's not totally clear how the types line up in your example (e.g. the relationship between Document
and Message
), but there are a couple of approaches you can take:
Sink.foreachAsync[Message](parallelism) { msg =>
val document = documentFromMessage(msg)
val collection = collection(msg.id)
Source.single(document).runWith(MongoSink.insertOne(collection))
}
Note that this will use a new Mongo sink for every message, which may have efficiency concerns. Note that if there's a lighter weight way (e.g. in the reactivemongo driver?) which returns a Future
after inserting a single document, but uses something like a connection pool to reduce overhead for single-document inserts, that will probably be preferable.
Partition
and the GraphDSL
to define a sink which incorporates the prebuilt sinks// collection0, etc. are predefined and encompass all of the collections which might be returned by collection(id)
val collections: Map[MongoCollection[Document], (Int, Sink[Document, Future[Done]])] = Map(
collection0 -> (0 -> MongoSink.insertOne(collection0)),
collection1 -> (1 -> MongoSink.insertOne(collection1)),
collection2 -> (2 -> MongoSink.insertOne(collection2)),
collection3 -> (3 -> MongoSink.insertOne(collection3))
)
val combinedSink = Sink.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val partition = builder.add(
Partition[Message](
collections.size,
{ msg => collections(collection(msg.id))._1 }
)
)
val toDocument = Flow[Message].map(documentFromMessage)
collections.foreach {
case (_, (n, sink)) =>
partition.out(n) ~> toDocument ~> sink
}
SinkShape.of(partition.in)
}