Search code examples
mongodbapache-sparkstratio

Spark: querying Mongodb with Stratio and RDD


I am querying MongoDB with Spark using Stratio (0.11.). I am interested to use RDDs (no DataFrame).

What I am doing right now is:

val mongoRDD = new MongodbRDD(sqlContext, readConfig, new MongodbPartitioner(readConfig))
mongoRDD.foreach(println)

and it displays the collection content in a correct way.

Is there a way to use a query (as String or built via QueryBuilder) with Stratio (in my case the query is $near type) to apply to MongodbRDD?


Solution

  • As @zero323 has hinted, the way to do that is using filters parameter. These filters are checked by the library and matched against the MongoDB QueryBuilder available filters.

    From Spark-MongoDB source code:

    sFilters.foreach {
        case EqualTo(attribute, value) =>
          queryBuilder.put(attribute).is(checkObjectID(attribute, value))
        case GreaterThan(attribute, value) =>
          queryBuilder.put(attribute).greaterThan(checkObjectID(attribute, value))
        case GreaterThanOrEqual(attribute, value) =>
          queryBuilder.put(attribute).greaterThanEquals(checkObjectID(attribute, value))
        case In(attribute, values) =>
          queryBuilder.put(attribute).in(values.map(value => checkObjectID(attribute, value)))
        case LessThan(attribute, value) =>
          queryBuilder.put(attribute).lessThan(checkObjectID(attribute, value))
        case LessThanOrEqual(attribute, value) =>
          queryBuilder.put(attribute).lessThanEquals(checkObjectID(attribute, value))
        case IsNull(attribute) =>
          queryBuilder.put(attribute).is(null)
        case IsNotNull(attribute) =>
          queryBuilder.put(attribute).notEquals(null)
        case And(leftFilter, rightFilter) if !parentFilterIsNot =>
          queryBuilder.and(filtersToDBObject(Array(leftFilter)), filtersToDBObject(Array(rightFilter)))
        case Or(leftFilter, rightFilter)  if !parentFilterIsNot =>
          queryBuilder.or(filtersToDBObject(Array(leftFilter)), filtersToDBObject(Array(rightFilter)))
        case StringStartsWith(attribute, value) if !parentFilterIsNot =>
          queryBuilder.put(attribute).regex(Pattern.compile("^" + value + ".*$"))
        case StringEndsWith(attribute, value) if !parentFilterIsNot =>
          queryBuilder.put(attribute).regex(Pattern.compile("^.*" + value + "$"))
        case StringContains(attribute, value) if !parentFilterIsNot =>
          queryBuilder.put(attribute).regex(Pattern.compile(".*" + value + ".*"))
        case Not(filter) =>
          filtersToDBObject(Array(filter), true)
      }
    

    As you can see, near is not being applied but it seems like it could be easily added to the connector functionality since QueryBuilder offers methods to use that MongoDB function.

    You can try to modify the connector. However I'll try to implement it and make a PR in the following days.

    EDIT:

    A PR has been opened including a source filter type which describes $near so you can use MongodbRdd as:

    val mongoRDD = new MongodbRDD(
        sqlContext,
        readConfig,
        new MongodbPartitioner(readConfig),
        filters = FilterSection(Array(Near("x", 3.0, 4.0))))
    )