Search code examples
mongodbscalaakkaakka-streamalpakka

Alpakka MongoDB - override MongoSource implementation


I have similar problem as in Alpakka MongoDB - specify type in MongoSource

So I implemented my own MongoSource as was in the solution and used it:

object MyMongoSource {
  def apply[T](query: Observable[T]): Source[T, NotUsed] =
    Source.fromPublisher(ObservableToPublisher(query))
}

val source: Source[Book, NotUsed] = MyMongoSource[Book](mongoDb.books.find()) 

But ObservableToPublisher is private class so I get the following error:

object ObservableToPublisher in package mongodb cannot be accessed in package akka.stream.alpakka.mongodb

How to resolve this?


Solution

  • Define MyMongoSource to be in the akka.stream.alpakka.mongodb.scaladsl package:

    package akka.stream.alpakka.mongodb.scaladsl
    
    import akka.NotUsed
    import akka.stream.alpakka.mongodb.ObservableToPublisher
    import akka.stream.scaladsl.Source
    import org.mongodb.scala.Observable
    
    object MyMongoSource {
      def apply[T](query: Observable[T]): Source[T, NotUsed] =
        Source.fromPublisher(ObservableToPublisher(query))
    }