Search code examples
javahbaseapache-beamapache-beam-io

Is there a way to modify/create new Keys in a PCollection during HBase Import using Beam?


I would like to add a prefix/suffix to HBase keys during an Apache Beam Import job. Here is the code that creates the PCollection that holds the HBase data:

    Pipeline pipeline = Pipeline.create(Utils.tweakOptions(opts));
    HBaseSnapshotInputConfigBuilder configurationBuilder =
        new HBaseSnapshotInputConfigBuilder()
            .setProjectId(opts.getProject())
            .setHbaseSnapshotSourceDir(opts.getHbaseSnapshotSourceDir())
            .setSnapshotName(opts.getSnapshotName())
            .setRestoreDirSuffix(opts.getJobName());

    PCollection<KV<ImmutableBytesWritable, Result>> readResult =
        pipeline.apply(
            "Read from HBase Snapshot",
            HadoopFormatIO.<ImmutableBytesWritable, Result>read()
                .withConfiguration(configurationBuilder.build()));

    // Somehow create a new PCollection with the transformed Key - derived from the PCollection above.
    PCollection<Mutation> copyResult = readResult.apply("Prefixes", ParDo.of(new TransformFn()));

During the PTransform, I receive the data as such:

  public void processElement(ProcessContext context) throws Exception {
    KV<ImmutableBytesWritable, Result> kv = context.element();
    List<Cell> cells = checkEmptyRow(kv);
    if (cells.isEmpty()) {
      return;
    }

    // Preprocess delete markers
    if (hasDeleteMarkers(cells)) {
      cells = preprocessDeleteMarkers(cells);
    }

    // Split the row into multiple puts if it exceeds the maximum mutation limit
    Iterator<Cell> cellIt = cells.iterator();

    while (cellIt.hasNext()) {
      Put put = new Put(kv.getKey().get());

      for (int i = 0; i < MAX_CELLS && cellIt.hasNext(); i++) {
        put.add(cellIt.next());
      }

      context.output(put);
    }
  }

Where would it be possible for me to update the Hbase Key? The HBase Put object needs to have the same key as context.element(), otherwise the Transform fails with:

InvalidArgumentException: Didn't receive a result for this mutation entry

It seems like PCollection -> ProcessContext -> HBase Put -> HBase Cell all need to maintain the same key during import ... Therefore I cannot modify the key after a PCollection has been created.

How could I go about creating a new PCollection that holds the same Hbase data but a different HBase key?


Solution

  • If I'm not mistaken, you can't just copy a cell into a Put with a new key since that cell already contains the old row key.

    From Cell Javadoc:

    Uniqueness is determined by the combination of row, column family, column qualifier, timestamp, and type.

    You need to create a new Cell with a new key, copy CF name, qualifier name, timestamp, value, etc into it and add this new cell to the new Put object.

    PS: Btw, there is a dedicated HbaseIO in Beam that can be easier to use than HadopFormatIO.