Search code examples
javagoogle-cloud-datastorekotlin-coroutines

Can the Google Cloud Datastore query results iterator be made to run faster (preferably, parallelized with chunked queries)?


I'm using the Java API to run queries. And I understand that the QueryResults object that's returned by datastore.run() uses a lazy-loading iterator, so the time to iterate through all the results is quite long when retrieving a large set of results.

I'm already using a Cursor for most operations where paging is a possibility, and that works around the issue in those cases. I'm also using datastore.get() instead of queries whenever I know the entity keys in advance (and with that method, I can manually separate the query into smaller chunks, and run those in parallel using Kotlin coroutines).

However, there are several cases where I have to use a query, and I also need to get all the results at once because there's some back-end processing involved with those results. And in those cases, iterating through all the results becomes pretty time-intensive. The dataset is relatively small now (around 5,000 entities), but it'll grow progressively higher, so I'd like to set up a better solution than just brute-force iterating through all the results, and having to wait for it to finish.

Ideally, I'd like to be able to chunk the query into smaller sets of results (maybe 500 - 1000), and then iterate through all the chunks in parallel (again, using coroutines). But I don't see anything that allows me to do that. Cursors require serial processing because you can't get a cursor for the next query without first iterating through all the results first.

Is there any way to run a chunked/split query? Or any other way to improve the performance when I absolutely have to retrieve all the results for a query?


Solution

  • Ok, I found a way to make it happen, and I have to give full credit to this blog post for giving me an idea for how to do it...

    The basic premise is that running a key query is much faster than running a full entity query. So, instead of running a normal query, we run the key query, which gives us the keys for all of the results:

    val query = Query.newKeyQueryBuilder()
        .setKind("my_tem_kind")
        .setFilter(myFilter)
        // set limit, etc.
    
    val queryResults = store.run(keyQuery.build())
    val keys = queryResults.asSequence().toSet()
    

    and now that we have a set of Keys, we can split it into chunks, and run them in parallel:

    val jobs = mutableListOf<Job>()
    keys.chunked(500).forEach {
        jobs.add(CoroutineScope(Dispatchers.IO).launch {
            val results = store.get(it)
            while(results.hasNext()){
                // process the result
            }
        })
    }
    runBlocking { jobs.joinAll() }