Search code examples
javagoogle-cloud-dataflowapache-beamdataflow

debug apache beam / dataflow in a debugger?


This is highly related to this post first -> How to do this type of testing in Dataflow(called feature testing at twitter)?

We have some code like this in production

@Override
public PDone expand(PCollectionTuple in) {
    final RosterPipelineOptions options = (RosterPipelineOptions) in.getPipeline().getOptions();
    try {
        final Schema schema = new Schema.Parser().parse(PractitionerStandardAvroWriter.class.getResourceAsStream("/standardFormat.avsc"));
        final String schemaStr = new BufferedReader(new InputStreamReader(PractitionerStandardAvroWriter.class.getResourceAsStream("/standardFormat.avsc")))
            .lines().collect(Collectors.joining("\n"));

        final PCollection<Validated<PractitionerStandardOutputDto>> validOutputs = in.get(PractitionerStandardOutputConverter.VALID_OUTPUTS_TAG);
        final PCollection<Validated<PractitionerStandardOutputDto>> invalidOutputs = in.get(PractitionerStandardOutputConverter.INVALID_OUTPUTS_TAG);

        final PCollection<GenericRecord> validRecords = validOutputs.apply(
            "Transform Valid Standard Output into Generic Rows", ParDo.of(new GenericRecordConverter(schemaStr)));
        final PCollection<GenericRecord> invalidRecords = invalidOutputs.apply(
            "Transform Invalid Standard Output into Generic Rows", ParDo.of(new GenericRecordConverter(schemaStr)));

        validRecords
            .setCoder(AvroCoder.of(GenericRecord.class, schema))
            .apply("Write Records", AvroIO.writeGenericRecords(schema)
                .to(options.getValidOutput())
                .withoutSharding());

        final PCollection<String> invalidRows = invalidRecords
            .setCoder(AvroCoder.of(GenericRecord.class, schema))
            .apply("Convert Error Avro to Csv", ParDo.of(new AvroToCsvConvertFn(schemaStr, ",")));
        invalidRows.apply("Write Error Records to Csv",
            TextIO.write().to(options.getInvalidOutput()).withoutSharding());

        return PDone.in(in.getPipeline());
    }
    catch (IOException e) {
        SneakyThrow.sneak(e); return null;
    }
}

but when we debug it in a test, we have no visibility into the bugs. It's insanely hard to step through this code to see what is going on since I think it is run AFTER this code to morph the data and this code is only the definition of how to deal with incoming data. Correct me if I am wrong here?

Two questions

  • Is this the best way to write debuggable apache-beam/dataflow code that we can step through and easily see where our bug is?
  • Is there some other way to easily debug it since I suspect the 'real execution' happens after that method when stuff is applied?

thanks, Dean


Solution

  • In general, like in your other question, my advice would be the following:

    1. To step through your pipeline, you can write a unit test that you run with your IDE, and it will run in the DirectRunner. This allows you to step through your pipeline easily. This is not running in Dataflow, but rather locally - it's still valuable.

    You can set breakpoints in expand, which will be hit at pipeline construction time. You can set breakpoints in the process of a DoFn, or split, read for Sources - these will be hit at pipeline execution time.

    1. Advice on writing pipelines that are debuggable - well, in this case, my advice would be to write composable transforms that can be tested individually. You can use various test utilities that Beam has to write tests for your pipelines. See: https://beam.apache.org/documentation/pipelines/test-your-pipeline/

    Some of the most valuable test uitilities are PAssert, TestStream, and TestPipeline. I recommend you review the page I shared, and see if these utilities will help.


    For your particular pipeline, I might think that you can separate the PTransform into smaller sections, and write simple unit tests for each section.