Search code examples
streaminggoogle-cloud-dataflow

How do I track states across runners in a DataFlow Job?


I'm currently creating a Streaming Dataflow job that only carries out computation if and only if there is an increment in the "Ring" column of my data.

My data flow code

Job=      (p | "Read" >> beam.io.ReadFromPubSub(topic=topic)
         | "Parse Json" >> beam.Map(json.loads)
         | "ParDo Divisors" >> ParDo(UpdateDelayTable()))

Data flowing in from pubsub:

Ring [
{...,"Ring":1},
{...,"Ring":1},
{...,"Ring":1},
{...,"Ring":2}
...]

I want my dataflow to track the current ring number and only triggers a function if and only if the ring number has incremented. How should I go about doing this.


Solution

  • Pub/Sub

    There is no guarantee that {"Ring": 2} will definitely be received/sent by Pub/Sub after {"Ring": 1}.

    It seems that you have to enable receiving messages in order first for Pub/Sub. And also make sure the Pub/Sub service receives Ring data incrementally.

    Dataflow

    Then to achieve it with Dataflow, you can use stateful processing.

    But be mindful that the "state" of "Ring" is per key (and per window). To do what you want, all the elements need to have the same key and fall into the same window (global window in this case). It's going to be a very "hot" key.

    An example code:

    from apache_beam.transforms.userstate import ReadModifyWriteStateSpec
    from apache_beam.coders import coders
    
    
    class RingFn(beam.DoFn):
      RING_STATE = ReadModifyWriteStateSpec(
          name='Ring', coder=coders.VarIntCoder())
    
      def process(self, element, ring=beam.DoFn.StateParam(RING_STATE)):
        current_ring = ring.read() or 0
        if element['Ring'] > current_ring:
            print('Carry out your computation here!')
            ring.write(element['Ring'])
    
    
      # Usage
      pcoll | beam.ParDo(RingFn())
    
      # Check your keys if you are not sure what they are.
      pcoll | beam.Keys() | beam.Map(print)