I would like to add a prefix/suffix to HBase keys during an Apache Beam Import job. Here is the code that creates the PCollection that holds the HBase data:
Pipeline pipeline = Pipeline.create(Utils.tweakOptions(opts));
HBaseSnapshotInputConfigBuilder configurationBuilder =
new HBaseSnapshotInputConfigBuilder()
PCollection<KV<ImmutableBytesWritable, Result>> readResult =
"Read from HBase Snapshot",
HadoopFormatIO.<ImmutableBytesWritable, Result>read()
// Somehow create a new PCollection with the transformed Key - derived from the PCollection above.
PCollection<Mutation> copyResult = readResult.apply("Prefixes", ParDo.of(new TransformFn()));
During the PTransform, I receive the data as such:
public void processElement(ProcessContext context) throws Exception {
KV<ImmutableBytesWritable, Result> kv = context.element();
List<Cell> cells = checkEmptyRow(kv);
if (cells.isEmpty()) {
// Preprocess delete markers
if (hasDeleteMarkers(cells)) {
cells = preprocessDeleteMarkers(cells);
// Split the row into multiple puts if it exceeds the maximum mutation limit
Iterator<Cell> cellIt = cells.iterator();
while (cellIt.hasNext()) {
Put put = new Put(kv.getKey().get());
for (int i = 0; i < MAX_CELLS && cellIt.hasNext(); i++) {
Where would it be possible for me to update the Hbase Key? The HBase Put object needs to have the same key as context.element(), otherwise the Transform fails with:
InvalidArgumentException: Didn't receive a result for this mutation entry
It seems like PCollection -> ProcessContext -> HBase Put -> HBase Cell all need to maintain the same key during import ... Therefore I cannot modify the key after a PCollection has been created.
How could I go about creating a new PCollection that holds the same Hbase data but a different HBase key?
If I'm not mistaken, you can't just copy a cell into a Put
with a new key since that cell already contains the old row key.
From Cell
Uniqueness is determined by the combination of row, column family, column qualifier, timestamp, and type.
You need to create a new Cell
with a new key, copy CF name, qualifier name, timestamp, value, etc into it and add this new cell to the new Put
PS: Btw, there is a dedicated HbaseIO
in Beam that can be easier to use than HadopFormatIO