This is highly related to this post first -> How to do this type of testing in Dataflow(called feature testing at twitter)?
We have some code like this in production
@Override
public PDone expand(PCollectionTuple in) {
final RosterPipelineOptions options = (RosterPipelineOptions) in.getPipeline().getOptions();
try {
final Schema schema = new Schema.Parser().parse(PractitionerStandardAvroWriter.class.getResourceAsStream("/standardFormat.avsc"));
final String schemaStr = new BufferedReader(new InputStreamReader(PractitionerStandardAvroWriter.class.getResourceAsStream("/standardFormat.avsc")))
.lines().collect(Collectors.joining("\n"));
final PCollection<Validated<PractitionerStandardOutputDto>> validOutputs = in.get(PractitionerStandardOutputConverter.VALID_OUTPUTS_TAG);
final PCollection<Validated<PractitionerStandardOutputDto>> invalidOutputs = in.get(PractitionerStandardOutputConverter.INVALID_OUTPUTS_TAG);
final PCollection<GenericRecord> validRecords = validOutputs.apply(
"Transform Valid Standard Output into Generic Rows", ParDo.of(new GenericRecordConverter(schemaStr)));
final PCollection<GenericRecord> invalidRecords = invalidOutputs.apply(
"Transform Invalid Standard Output into Generic Rows", ParDo.of(new GenericRecordConverter(schemaStr)));
validRecords
.setCoder(AvroCoder.of(GenericRecord.class, schema))
.apply("Write Records", AvroIO.writeGenericRecords(schema)
.to(options.getValidOutput())
.withoutSharding());
final PCollection<String> invalidRows = invalidRecords
.setCoder(AvroCoder.of(GenericRecord.class, schema))
.apply("Convert Error Avro to Csv", ParDo.of(new AvroToCsvConvertFn(schemaStr, ",")));
invalidRows.apply("Write Error Records to Csv",
TextIO.write().to(options.getInvalidOutput()).withoutSharding());
return PDone.in(in.getPipeline());
}
catch (IOException e) {
SneakyThrow.sneak(e); return null;
}
}
but when we debug it in a test, we have no visibility into the bugs. It's insanely hard to step through this code to see what is going on since I think it is run AFTER this code to morph the data and this code is only the definition of how to deal with incoming data. Correct me if I am wrong here?
Two questions
thanks, Dean
In general, like in your other question, my advice would be the following:
You can set breakpoints in expand
, which will be hit at pipeline construction time. You can set breakpoints in the process
of a DoFn, or split
, read
for Sources - these will be hit at pipeline execution time.
Some of the most valuable test uitilities are PAssert
, TestStream
, and TestPipeline
. I recommend you review the page I shared, and see if these utilities will help.
For your particular pipeline, I might think that you can separate the PTransform into smaller sections, and write simple unit tests for each section.