Search code examples
javaapache-kafkakafka-producer-api

Update TTL for a particular topic in kafka using Java


Update TTL for a topic so records stay in the topic for 10 days. I have to do this for a particular topic only by Leaving all other topics TTL's the same, current configuration, I have to do this using java because I am pushing a topic to kafka through Java. I am setting following properties for pushing a topic to kafka

Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_SERVERS);
props.put("acks", ACKS);
props.put("retries", RETRIES);
props.put("linger.ms", new Integer(LINGER_MS));
props.put("buffer.memory", new Integer(BUFFER_MEMORY));
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Solution

  • You can do that using the AdminClient, following a snippet of code that get the current configuration (just for testing) and then update the "retention.ms" config on the topic named "test".

    Properties props = new Properties();
    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    
    AdminClient adminClient = AdminClient.create(props);
    
    ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "test");
    
    // get the current topic configuration
    DescribeConfigsResult describeConfigsResult  =
            adminClient.describeConfigs(Collections.singleton(resource));
    
    Map<ConfigResource, Config> config = describeConfigsResult.all().get();
    
    System.out.println(config);
    
    // create a new entry for updating the retention.ms value on the same topic
    ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "50000");
    Map<ConfigResource, Config> updateConfig = new HashMap<ConfigResource, Config>();
    updateConfig.put(resource, new Config(Collections.singleton(retentionEntry)));
    
    AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(updateConfig);
    alterConfigsResult.all();
    
    describeConfigsResult  = adminClient.describeConfigs(Collections.singleton(resource));
    
    config = describeConfigsResult.all().get();
    
    System.out.println(config);
    
    adminClient.close();