I would like to add a prefix/suffix to HBase keys during an Apache Beam Import job. Here is the code that creates the PCollection that holds the HBase data:
Pipeline pipeline = Pipeline.create(Utils.tweakOptions(opts));
HBaseSnapshotInputConfigBuilder configurationBuilder =
new HBaseSnapshotInputConfigBuilder()
.setProjectId(opts.getProject())
.setHbaseSnapshotSourceDir(opts.getHbaseSnapshotSourceDir())
.setSnapshotName(opts.getSnapshotName())
.setRestoreDirSuffix(opts.getJobName());
PCollection<KV<ImmutableBytesWritable, Result>> readResult =
pipeline.apply(
"Read from HBase Snapshot",
HadoopFormatIO.<ImmutableBytesWritable, Result>read()
.withConfiguration(configurationBuilder.build()));
// Somehow create a new PCollection with the transformed Key - derived from the PCollection above.
PCollection<Mutation> copyResult = readResult.apply("Prefixes", ParDo.of(new TransformFn()));
During the PTransform, I receive the data as such:
public void processElement(ProcessContext context) throws Exception {
KV<ImmutableBytesWritable, Result> kv = context.element();
List<Cell> cells = checkEmptyRow(kv);
if (cells.isEmpty()) {
return;
}
// Preprocess delete markers
if (hasDeleteMarkers(cells)) {
cells = preprocessDeleteMarkers(cells);
}
// Split the row into multiple puts if it exceeds the maximum mutation limit
Iterator<Cell> cellIt = cells.iterator();
while (cellIt.hasNext()) {
Put put = new Put(kv.getKey().get());
for (int i = 0; i < MAX_CELLS && cellIt.hasNext(); i++) {
put.add(cellIt.next());
}
context.output(put);
}
}
Where would it be possible for me to update the Hbase Key? The HBase Put object needs to have the same key as context.element(), otherwise the Transform fails with:
InvalidArgumentException: Didn't receive a result for this mutation entry
It seems like PCollection -> ProcessContext -> HBase Put -> HBase Cell all need to maintain the same key during import ... Therefore I cannot modify the key after a PCollection has been created.
How could I go about creating a new PCollection that holds the same Hbase data but a different HBase key?
If I'm not mistaken, you can't just copy a cell into a Put
with a new key since that cell already contains the old row key.
From Cell
Javadoc:
Uniqueness is determined by the combination of row, column family, column qualifier, timestamp, and type.
You need to create a new Cell
with a new key, copy CF name, qualifier name, timestamp, value, etc into it and add this new cell to the new Put
object.
PS: Btw, there is a dedicated HbaseIO
in Beam that can be easier to use than HadopFormatIO
.