Search code examples
scalagoogle-cloud-bigtablespotify-scio

Transform HBase Scan to RowFilter


I'm using scio from spotify for my Dataflow jobs. In last scio version, new bigtable java api is used (com.google.bigtable.v2)

Now scio bigtable entry point required "RowFilter" to filter instead of Hbase "Scan". Is there a simple way to transform "Scan" to "RowFilter" ? I looked for adapters in source code but I'm not sure how to use it. I don't find documentation to easy migrate from hbase api to "new" api.

A simple scan I used in my code that I need to transform:

val scan = new Scan()
scan.setRowPrefixFilter("helloworld".getBytes)
scan.addColumn("family".getBytes, "qualifier".getBytes)
scan.setMaxVersions()

Solution

  • In theory, you can add the bigtable-hbase dependency to the project and call com.google.cloud.bigtable.hbase.adapters.Adapters.SCAN_ADAPTER.adapt(scan) to convert the Scan to a RowFilter, or more specifically a [ReadRowsRequest][3] which contains a [RowFilter][4]. (The links are to the protobuf definition of those objects which contain the variables and extensive comments).

    That said, the bigtable-hbase dependency adds quite a few transitive dependencies. I would use the bigtable-hbase SCAN_ADAPTER in a standalone project, and then print the RowFilter to see how it's constructed.

    In the specific case that you mention, the RowFilter is quite simple, but there may be additional complications. You have three parts to your scan, so I'll give a breakdown of how to achieve them:

    1. scan.setRowPrefixFilter("helloworld".getBytes). This translates to a start key and end key on BigtableIO. "helloworld" is the start key, and you can calculate the end key with RowKeyUtil. calculateTheClosestNextRowKeyForPrefix. The default BigtableIO does not expose set start key and set end key, so the scio version will have to change to make those setters public.

    2. scan.addColumn("family".getBytes, "qualifier".getBytes) translates to two RowFilters added to a RowFilter with a Chain (mostly analogous to an AND). The first RowFilter will have familyNameRegexFilter set, and the second RowFilter will have columnNameRegexFilter

    3. scan.setMaxVersions() converts to a RowFilter with cellsPerColumnLimitFilter set. It would need to be added to a the chain from #2. Warning: If you use a timestampRangeFilter or value filter of a RowFilter to limit the range of the columns, make sure to put the cellsPerColumnLimitFilter at the end of the chain.