Search code examples
apache-beamshardingdataflow

Dataflow WindowIntoBatches WithShardedKey error (Python)


we have a pipeline that we need to group messages by a dynamic key (based on the message) and make an external call. The number of keys is unknown (We know the most frequent keys but new keys can appear as well). Currently I have a fixed shard number per destination as follows

SHARDS_PER_DESTINATION = {
    "key_1": 60,
    "key_2": 40,
    "key_3": 40,
    "key_4": 40
    ...
}

def make_keys(elem):
    key = elem[0][ATTRIBUTES_FIELD][CODE_ATRIBUTE]
    num_shards = SHARDS_PER_DESTINATION.get(key, 10)  # Default to 10 shards
    t = ((key, random.randint(0, num_shards)), elem)
    return t

and this is my group by function

def expand(self, pcoll):
    return (pcoll
            | "Window" >> beam.WindowInto(beam.window.FixedWindows(self.max_waiting_time))
            | "Add timestamp to windowed elements" >> beam.ParDo(AddTimestamp())
            | "Add Key" >> beam.Map(make_keys)
            | "Groupby" >> beam.GroupByKey()
            )

and it works fine. Recently I found out about GroupIntoBatches.WithShardedKey so I am trying to use that.

def make_keys(elem):
    key = elem[0][ATTRIBUTES_FIELD][CODE_ATRIBUTE]
    t = (key, elem)
    return t

def expand(self, pcoll):
    return (pcoll
            | "Add timestamp" >> beam.ParDo(AddTimestamp())
            | "Add Key" >> beam.Map(make_keys)
            | "Shard key" >> beam.GroupIntoBatches.WithShardedKey(self.max_messages, self.max_waiting_time)
            )

but no matter what I tried I keep getting the following error

ValueError: ShardedKeyCoder[TupleCoder[FastPrimitivesCoder, FastPrimitivesCoder]] cannot be made deterministic for 'Window .... messages/Groupby/GroupIntoBatches/ParDo(_GroupIntoBatchesDoFn)'.

What am I missing here?


Solution

  • The order of a Python Tuple is non-deterministic.

    Try defining your tuple key into a NamedTuple and use the RowCoder.

    from typing import NamedTuple, Tuple
    
    class MyKey(NamedTuple):
        key: str
        shard: int
    
        def __hash__(self):
            return hash((self.key, self.shard))
    
    beam.coders.registry.register_coder(MyKey, beam.coders.RowCoder)
    
    pcoll2 = pcoll | beam.Map(make_keys).with_output_types(Tuple[MyKey, MyObject])