Search code examples
scalaakkaakka-streamalpakka

MongoDB take the right reference with pattern matching


I am trying to insert records depends datatype to mongodb with the https://doc.akka.io/docs/alpakka/current/mongodb.html api.
Let`s first take a look at the datatype:

  sealed trait MsgDoc {
    }

    final case class MsgPreFailure(raw: String, reasons: Chain[String]) extends MsgDoc

    final case class MsgProceed(raw: String, status: MsgStatus) extends MsgDoc


    sealed trait MsgStatus {

    }

    case object MsgSuccess extends MsgStatus

    final case class MsgFailure(reasons: Chain[String]) extends MsgStatus

    final case class MsgUnknown(reason: String) extends MsgStatus

For every inhabitant of MsgDoc sum type, I am going to create they own registry, because I would like to store on differenet collection

    val preFailureRegistry = fromRegistries(fromProviders(classOf[MsgPreFailure]), DEFAULT_CODEC_REGISTRY)
    val proceedRegistry = fromRegistries(fromProviders(classOf[MsgProceed]), DEFAULT_CODEC_REGISTRY)

    private val client = MongoClients.create("mongodb://localhost:27017")
    private val db = client.getDatabase("Message")

    private val preFailureColl = db
      .getCollection("Failure", classOf[MsgPreFailure])
      .withCodecRegistry(preFailureRegistry)


    private val proceedColl = db
      .getCollection("Proceed", classOf[MsgProceed])
      .withCodecRegistry(proceedRegistry)  

Then let`s try to insert one record into the mongo db:

    Source
      .single[MsgDoc](MsgPreFailure("Test", Chain("Foo", "Foo", "Foo")))
      .runWith(MongoSink.insertOne(???))

As the argument to the insertOne method, I would like to pass either preFailureColl or proceedColl depends on the datatype(either MsgPreFailure or MsgProceed). In the example above, it is MsgPreFailure, then it should do the following call

    Source
      .single[MsgDoc](MsgPreFailure("Test", Chain("Foo", "Foo", "Foo")))
      .runWith(MongoSink.insertOne(preFailureColl))  

The question is, how to pattern match within the insertOne method to take the right reference?


Solution

  • You need to create a stream that has two branches which are terminated with different sinks. One of the ways to achieve that is with a divertTo operator.