Search code examples
javaapache-beamapache-beam-io

Beam - Error while branching PCollections


I have a pipeline that reads data from kafka. It splits the incoming data into processing and rejected outputs. Data from Kafka is read into custom class MyData and output is produced as KV<byte[], byte[]>

Define two TupleTags with MyData.

 private static final TupleTag<MyData> rejectedTag = new TupleTag<DeserializationOutput>(){};
 private static final TupleTag<MyData> processingTag = new TupleTag<DeserializationOutput>(){};

InvalidDataDoFn has application logic that splits MyData data into processing and rejected

InvalidDataDoFn invalidDataDoFn = new InvalidDataDoFn(processingTag, rejectedTag); 
PCollectionTuple mixedCollection = myCollection
    .apply(ParDo.of(invalidDataDoFn).withOutputTags(processingTag, TupleTagList.of(rejectedTag)));


OutputDoFn outputDoFn = new outputDoFn();

PCollection<MyData> processingCollection = mixedCollection.get(processingTag);

PCollection<KV<byte[], byte[]>> outputCollection = processingCollection
  .apply("ProcessElements", ParDo.of(outputDoFn));

OutputDoFn converts MyData into KV<byte[], byte[]>. While running OutputDoFn, I get a weird error stating that "Tag passed to output cannot be null" - This is from https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L559

My OutputDoFn has the following logic.

@ProcessElement
public void processElement(@Element MyData mydata,
    OutputReceiver<KV<byte[], byte[]>> output, ProcessContext c) {

  c.output(KV.of(mydata.getMessageKey(), mydata.getSomething().getBytes()));
}

Solution

  • Correct me if I'm wrong, but you'd like to use this c.output :

    public void output(OutputT output)
    

    and you're suprised that this function is used :

    public <T> void output(TupleTag<T> tag, T output)
    

    For Beam to use the first one, the argument you're passing must have the OutputT type declared at your DoFn creation :

    private class DoFnProcessContext extends DoFn<InputT, OutputT>.ProcessContext
    

    My guess here would be that the value you pass to c.output() is not exactly the type you specified when creating your DoFn. Therefore, the second output function is chosen and it misses the tag.

    Could you give the full DoFn declaration for OutputDoFn to confirm ?

    All code references from here.