I've got a block of pipeline steps that read data from specific files in a GCS bucket. The code looks similar to below:
List<String> filepaths = new ArrayList<String>("filepath1", "filepath2", "filepathMissing");
return pipeline
.apply("GatherFiles", Create.of(filepaths)).setCoder(StringUtf8Coder.of())
.apply("GatherFileData", TextIO.readAll())
.apply("ApplyCustomDoFn", ParDo.of(new CustomDoFn()))
.apply("Group", GroupByKey.<String, String>create())
.apply("AnotherCustomDoFn", ParDo.of(new AnotherCustomDoFn()));
If one of the filepaths is missing from GCS (for example, say "filepathMissing" is missing), the whole pipeline crashes. I am looking at adding a try / catch around this set of pipeline steps, but I'm unsure of the consequences of this.
My Questions:
TextIO.readAll()
?If there is any specific documentation on how to do this, please link it with your answer :)
I ended up finding an answer to my problem, above.
I had to add the code .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW) after TextIO.readAll(), as shown below.
List<String> filepaths = new ArrayList<String>("filepath1", "filepath2", "filepathMissing");
return pipeline
.apply("GatherFiles", Create.of(filepaths)).setCoder(StringUtf8Coder.of())
.apply("GatherFileData", TextIO.readAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW))
.apply("ApplyCustomDoFn", ParDo.of(new CustomDoFn()))
.apply("Group", GroupByKey.<String, String>create())
.apply("AnotherCustomDoFn", ParDo.of(new AnotherCustomDoFn()));
The unfortunate issue with this fix is that if a file is missing, your pipeline will output an INFO message at the pipeline step but no error will be thrown outside the pipeline. If you are expecting the file to be there, you may not know that the file has not been included unless you explicitly check the pipeline step's logs in dataflow.