Search code examples
mongodbapache-sparkpysparkapache-spark-sqlstratio

MongoDB query filters using Stratio's Spark-MongoDB library


I'm trying to query a MongoDB collection using Stratio's Spark-MongoDB library. I followed this thread to get started with and I'm currently running the following piece of code:

reader = sqlContext.read.format("com.stratio.datasource.mongodb")
data = reader.options(host='<ip>:27017', database='<db>', collection='<col>').load()

This will load the whole collection into Spark dataframe and as the collection is large, it's a taking a lot of time. Is there any way to specify query filters and load only selected data into Spark?


Solution

  • Spark dataframe processing requires schema knowledge. When working with data sources with flexible and/or unknown schema, before Spark can do anything with the data, it has to discover its schema. This is what load() does. It looks at the data only for the purpose of discovering the schema of data. When you perform an action on data, e.g., collect(), Spark will actually read the data for processing purposes.

    There is only one way to radically speed up load() and that's by providing the schema yourself and thus obviating the need for schema discovery. Here is an example taken from the library documentation:

    import org.apache.spark.sql.types._
    val schemaMongo = StructType(StructField("name", StringType, true) :: StructField("age", IntegerType, true ) :: Nil)
    val df = sqlContext.read.schema(schemaMongo).format("com.stratio.datasource.mongodb").options(Map("host" -> "localhost:27017", "database" -> "highschool", "collection" -> "students")).load
    

    You can get a slight gain by sampling only a fraction of the documents in the collection by setting the schema_samplingRatio configuration parameter to a value less than the 1.0 default. However, since Mongo doesn't have sampling built in, you'll still be accessing potentially a lot of data.