Search code examples
google-cloud-dataflowapache-beam

What should I use as the Key for GroupIntoBatches.withShardedKey


I want to batch the calls to an external service in my streaming dataflow job for unbounded sources. I used windowing + attach a dummy key + GroupByKey as below

messages
    // 1. Windowing
    .apply("window-5-seconds",
        Window.<Message>into(FixedWindows.of(Duration.standardSeconds(5)))
                .triggering(
                    Repeatedly.forever(AfterPane.elementCountAtLeast(1000)
                        .orFinally(AfterWatermark.pastEndOfWindow())))
                .withAllowedLateness(Duration.ZERO)
                .discardingFiredPanes()
    )
    // 2. attach arbitrary key
    .apply("attach-arbitrary-key", ParDo.of(new MySink.AttachDummyKey()))
    // 3. group by key
    .apply(GroupByKey.create())
    // 4. call my service
    .apply("call-my-service",
        ParDo.of(new MySink(myClient)));

This implementation caused performance issues as I attached a dummy key to all the messages that caused the transform to not execute in parallel at all. After reading this answer, I switched to GroupIntoBatches transform as below.

messages
    // 1. Windowing
    .apply("window-5-seconds",
        Window.<Message>into(FixedWindows.of(Duration.standardSeconds(5)))
                .triggering(
                    Repeatedly.forever(AfterPane.elementCountAtLeast(1000)
                        .orFinally(AfterWatermark.pastEndOfWindow())))
                .withAllowedLateness(Duration.ZERO)
                .discardingFiredPanes()
    )
    // 2. attach sharding key
    .apply("attach-sharding-key", ParDo.of(new MySink.AttachShardingKey()))
    // 3. group by key into batches
    .apply("group-into-batches",
        GroupIntoBatches.<String, MessageWrapper>ofSize(1000)
            .withMaxBufferingDuration(Duration.standardSeconds(5);)
            .withShardedKey())
    // 4. call my service
    .apply("call-my-service",
        ParDo.of(new MySink(myClient)));

The document states that withShardedKey increases parallelism by spreading one key over multiple threads but the question is what would be a good key when using withShardedKey?

If this truly is runner-determined sharding, would it make sense to use a single dummy key? Or the same problem would occur just like GroupByKey? Currently I do not have a good key to use, I was thinking of creating a hash based on some fields of the message. If I do pick a key that could evenly distribute the traffic, would it still make sense to use withShardedKey? Or it might cause each shard not to include enough data that GroupIntoBatches may not actually be useful?


Solution

  • Usually the key would be a natural key, but since you mentioned that there's no such key, I feel there are a few trade-offs to consider.

    You can apply a static key, but the parallelism will just depend on the number of threads (GroupIntoBatches semantic) which is runner specific:

    Outputs batched elements associated with sharded input keys. By default, keys are sharded to such that the input elements with the same key are spread to all available threads executing the transform. Runners may override the default sharding to do a better load balancing during the execution time.

    If your pipeline can afford more calls (with eventually not full batches, depending on the distribution), applying a random key (using a small range - would have to try an ideal balance) instead of static may provide better guarantees.

    I recommend watching this session which provides some relevant information: Beam Summit 2021 - Autoscaling your transforms with auto-sharded GroupIntoBatches