Search code examples
google-cloud-dataflowapache-beamgoogle-cloud-pubsubapache-beam-iobeam

How to use PCollection<PubsubMessage> as a sideinput in Beam?


I am working on a Beam (Dataflow) pipeline, where the task is to read the messages from pubsub and then perform some transformations. In case there are some failures in any of these transformations I want to send message to the dead letter queue.

Flow looks something like this

// read from pubsub
PCollection<PubsubMessage> message =  p.apply("Read", PubsubIO.readMessagesWithAttributes().fromSubscription('subscription_name'));

// Transformation1

// Transformation2

// Save

The message is the input to transformation1 and the output of Transformation1 is input to Transformation2. I am able to send the original message to dead letter queue if there is any exception in Transformation1. The problem comes when I try to do the same in case Transformation2 fails.

My requirement is that I have to send the original message back even when Transformation2 fails. Is there a way to do it?

in Transformation2 I tried using sideInput, like this:

PCollectionView<List<PubsubMessage>> messageView = message.apply((View.asList())); <-- error
PCollection<TableRow> valid_table_rows = transformation1_Result.apply("Transformation2", ParDo.of(new transformation2()).withSideInputs(messageView));

But at runtime this gives me an error:

[2023-05-18, 16:17:52 EDT] {beam.py:127} WARNING - Caused by: java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.

The idea is that how can I send the original message to dead letter queue, regardless of which transformation fails.

Any help will be much appreciated.


Solution

  • It's not perfect but I think you have to transfer your initial message in each step and transformations.

    You have to be carreful with Serialisation if you are using SerializableCoder, if you can't pass the PubSubMessage in a Serialisable object.

    You can create an intermediate object containing the same fields as PubSubMessage :

    public class InputMessage implements Serializable {
        private ByteString key;
        private ByteString data;
    }
    

    Then transfer this message in each transformation

    Example :

    class YourObject {
       private InputMessage message;
    
       // other fields.
    }
    
    pipeline
       .apply("Map", MapElements.into(of(YourObject.class)).via(Class::yourTransform))
    
    
    public YourObject yourTransform(final PubSubMessage message) {
            // Apply your different transformations.
             
            InputMessage message = new InputMessage(message.getKey(), message.getData())
            yourObject.setMessage(message);
    
            return yourObject;
        }
    

    Then in the catch bloc, side output and TupleTag, you can use the input object.