I am trying to insert records depends datatype to mongodb with the https://doc.akka.io/docs/alpakka/current/mongodb.html api.
Let`s first take a look at the datatype:
sealed trait MsgDoc {
}
final case class MsgPreFailure(raw: String, reasons: Chain[String]) extends MsgDoc
final case class MsgProceed(raw: String, status: MsgStatus) extends MsgDoc
sealed trait MsgStatus {
}
case object MsgSuccess extends MsgStatus
final case class MsgFailure(reasons: Chain[String]) extends MsgStatus
final case class MsgUnknown(reason: String) extends MsgStatus
For every inhabitant of MsgDoc
sum type, I am going to create they own registry, because I would like to store on differenet collection
val preFailureRegistry = fromRegistries(fromProviders(classOf[MsgPreFailure]), DEFAULT_CODEC_REGISTRY)
val proceedRegistry = fromRegistries(fromProviders(classOf[MsgProceed]), DEFAULT_CODEC_REGISTRY)
private val client = MongoClients.create("mongodb://localhost:27017")
private val db = client.getDatabase("Message")
private val preFailureColl = db
.getCollection("Failure", classOf[MsgPreFailure])
.withCodecRegistry(preFailureRegistry)
private val proceedColl = db
.getCollection("Proceed", classOf[MsgProceed])
.withCodecRegistry(proceedRegistry)
Then let`s try to insert one record into the mongo db:
Source
.single[MsgDoc](MsgPreFailure("Test", Chain("Foo", "Foo", "Foo")))
.runWith(MongoSink.insertOne(???))
As the argument to the insertOne
method, I would like to pass either preFailureColl
or proceedColl
depends on the datatype(either MsgPreFailure
or MsgProceed
).
In the example above, it is MsgPreFailure
, then it should do the following call
Source
.single[MsgDoc](MsgPreFailure("Test", Chain("Foo", "Foo", "Foo")))
.runWith(MongoSink.insertOne(preFailureColl))
The question is, how to pattern match within the insertOne
method to take the right reference?
You need to create a stream that has two branches which are terminated with different sinks. One of the ways to achieve that is with a divertTo
operator.