Search code examples
apache-kafkaapache-kafka-streamsspring-kafka

Kafka Streams - Consumer memory overload


I am planning a Spring+Kafka Streams application that handles incoming messages and stores updated internal state as a result of these messages. This state is predicted to reach ~500mb per unique key (There are likely to be ~10k unique keys distributed across 2k partitions).

This state must generally be held in-memory for effective operation of my application but even on disk I would still face a similar problem (albeit just at a later date of scaling).

I am planning to deploy this application into a dynamically scaling environment such as AWS and will set a minimum number of instances, but I am wary of 2 situations:

  • On first startup (where perhaps just 1 consumer starts first) it will not be able to handle taking assignment of all the partitions because the in memory state will overflow the instances available memory.
  • After a major outtage (AWS availability zone outtage) it could be that 33% of consumers are taken out of the group and the additional memory load on the remaining instances could actually take out everyone who remains.

How do people protect their consumers from taking on more partitions than they can handle such that they do not overflow available memory/disk?


Solution

  • See the kafka documentation.

    Since 0.11...

    enter image description here

    EDIT

    For your second use case (and it also works for the first), perhaps you could implement a custom PartitionAssignor that limits the number of partitions assigned to each instance.

    I haven't tried it; I don't know how the broker will react to the presence of unassigned partitions.

    EDIT2

    This seems to work ok; but YMMV...

    public class NoMoreThanFiveAssignor extends RoundRobinAssignor {
    
        @Override
        public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                Map<String, Subscription> subscriptions) {
    
            Map<String, List<TopicPartition>> assignments = super.assign(partitionsPerTopic, subscriptions);
            assignments.forEach((memberId, assigned) -> {
                if (assigned.size() > 5) {
                    System.out.println("Reducing assignments from " + assigned.size() + " to 5 for " + memberId);
                    assignments.put(memberId, 
                            assigned.stream()
                                .limit(5)
                                .collect(Collectors.toList()));
                }
            });
            return assignments;
        }
    
    }
    

    and

    @SpringBootApplication
    public class So54072362Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So54072362Application.class, args);
        }
    
        @Bean
        public NewTopic topic() {
            return new NewTopic("so54072362", 15, (short) 1);
        }
    
        @KafkaListener(id = "so54072362", topics = "so54072362")
        public void listen(ConsumerRecord<?, ?> record) {
            System.out.println(record);
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<String, String> template) {
            return args -> {
                for (int i = 0; i < 15; i++) {
                    template.send("so54072362", i, "foo", "bar");
                }
            };
        }
    
    }
    

    and

    spring.kafka.consumer.properties.partition.assignment.strategy=com.example.NoMoreThanFiveAssignor
    spring.kafka.consumer.enable-auto-commit=false
    spring.kafka.consumer.auto-offset-reset=earliest
    

    and

    Reducing assignments from 15 to 5 for consumer-2-f37221f8-70bb-421d-9faf-6591cc26a76a
    2019-01-07 15:24:28.288  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Successfully joined group with generation 7
    2019-01-07 15:24:28.289  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Setting newly assigned partitions [so54072362-0, so54072362-1, so54072362-2, so54072362-3, so54072362-4]
    2019-01-07 15:24:28.296  INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so54072362-0, so54072362-1, so54072362-2, so54072362-3, so54072362-4]
    2019-01-07 15:24:46.303  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Attempt to heartbeat failed since group is rebalancing
    2019-01-07 15:24:46.303  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Revoking previously assigned partitions [so54072362-0, so54072362-1, so54072362-2, so54072362-3, so54072362-4]
    2019-01-07 15:24:46.303  INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: [so54072362-0, so54072362-1, so54072362-2, so54072362-3, so54072362-4]
    2019-01-07 15:24:46.304  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] (Re-)joining group
    Reducing assignments from 8 to 5 for consumer-2-c9a6928a-520c-4646-9dd9-4da14636744b
    Reducing assignments from 7 to 5 for consumer-2-f37221f8-70bb-421d-9faf-6591cc26a76a
    2019-01-07 15:24:46.310  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Successfully joined group with generation 8
    2019-01-07 15:24:46.311  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Setting newly assigned partitions [so54072362-9, so54072362-5, so54072362-7, so54072362-1, so54072362-3]
    2019-01-07 15:24:46.315  INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so54072362-9, so54072362-5, so54072362-7, so54072362-1, so54072362-3]
    2019-01-07 15:24:58.324  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Attempt to heartbeat failed since group is rebalancing
    2019-01-07 15:24:58.324  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Revoking previously assigned partitions [so54072362-9, so54072362-5, so54072362-7, so54072362-1, so54072362-3]
    2019-01-07 15:24:58.324  INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: [so54072362-9, so54072362-5, so54072362-7, so54072362-1, so54072362-3]
    2019-01-07 15:24:58.324  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] (Re-)joining group
    2019-01-07 15:24:58.330  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Successfully joined group with generation 9
    2019-01-07 15:24:58.332  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Setting newly assigned partitions [so54072362-14, so54072362-11, so54072362-5, so54072362-8, so54072362-2]
    2019-01-07 15:24:58.336  INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so54072362-14, so54072362-11, so54072362-5, so54072362-8, so54072362-2]
    

    Of course, this leaves the unassigned partitions dangling, but it sounds like that's what you want, until the region comes back online.