Search code examples
apache-sparkapache-spark-sqlhbasebatch-processingamazon-emr

Batch processing job (Spark) with lookup table that's too big to fit into memory


I'm trying to write a batch job to process a couple of hundreds of terabytes that currently sit in an HBase database (in an EMR cluster in AWS), all in a single large table. For every row I'm processing, I need to get additional data from a lookup table (a simple integer to string mapping) that is in a second HBase table. We'd be doing 5-10 lookups per row.

My current implementation uses a Spark job that's distributing partitions of the input table to its workers, in the following shape:

Configuration hBaseConfig = newHBaseConfig();
hBaseConfig.set(TableInputFormat.SCAN, convertScanToString(scan));
hBaseConfig.set(TableInputFormat.INPUT_TABLE, tableName);

JavaPairRDD<ImmutableBytesWritable, Result> table = sparkContext.newAPIHadoopRDD(hBaseConfig, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);
table.map(val -> { 
    // some preprocessing  
}).foreachPartition(p -> {
    p.forEachRemaining(row -> {
        // code that does the lookup
    });
});

The problem is that the lookup table is too big to fit in the workers' memory. They all need access to all parts of the lookup table, but their access pattern would significantly benefit from a cache.

Am I right in thinking that I cannot use a simple map as a broadcast variable because it'd need to fit into memory?

Spark uses a shared nothing architecture, so I imagine there won't be an easy way to share a cache across all workers, but can we build a simple LRU cache for every individual worker?

How would I implement such a local worker cache that gets the data from the lookup table in HBase on a cache miss? Can I somehow distribute a reference to the second table to all workers?

I'm not set on my choice of technology, apart from HBase as the data source. Is there a framework other than Spark which could be a better fit for my use case?


Solution

  • You have a few of options for dealing with this requirement:

    1- Use RDD or Dataset joins

    You can load both of your HBase tables as Spark RDD or Datasets and then do a join on your lookup key. Spark will split both RDD into partitions and shuffle content around so that rows with the same keys end up on the same executors. By managing the number of number of partitions within spark you should be able to join 2 tables on any arbitrary sizes.

    2- Broadcast a resolver instance

    Instead of broadcasting a map, you can broadcast a resolver instance that does a HBase lookup and temporary LRU cache. Each executor will get a copy of this instance and can manage its own cache and you can invoke them within for foreachPartition() code.

    Beware, the resolver instance needs to implement Serializable so you will have to declare the cache, HBase connections and HBase Configuration properties as transient to be initialized on each executor.

    I run such a setup in Scala on one of the projects I maintain: it works and can be more efficient than the straight Spark join if you know your access patterns and manage you cache efficiently

    3- Use the HBase Spark connector to implement your lookup logic

    Apache HBase has recently incorporated improved HBase Spark connectors The documentation is pretty sparse right now, you need to look at the JIRA tickets and the documentation of the previous incarnation of these tools Cloudera's SparkOnHBase but the last unit test in the test suite looks pretty much like what you want

    I have no experience with this API though.