Search code examples
configakka-stream

How to create ProducerSettings from configuration(application.conf) in akka-stream-kafka?


I am trying to learn and use akka-stream-kafka and was going through its [documentation][1]. In the Producer settings section it tells that we can create ProducerSettings using both programmatically and and from configuration. There is an example of Programmatic Construction but there is no example of how to create it through configuration. The programmatic construction is simple, below is an example of it. However I wanted to use the config base construction and wanted the configuration to come from application.conf as it will give me more control. I cannot seem to find an example of it on google.

val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
  .withBootstrapServers("localhost:9092")

Solution

  • The docs just forward you to the Apache Kafka Javadoc for ProducerConfig, because is contains a bunch of constants you can use as keys inside your akka.kafka.producer.kafka-clients config section.

    Extending the reference config from the docs, an example would be:

    # Properties for akka.kafka.ProducerSettings can be
    # defined in this section or a configuration section with
    # the same layout. 
    akka.kafka.producer {
      # Tuning parameter of how many sends that can run in parallel.
      parallelism = 100
    
      # How long to wait for `KafkaProducer.close`
      close-timeout = 60s
    
      # Fully qualified config path which holds the dispatcher configuration
      # to be used by the producer stages. Some blocking may occur.
      # When this value is empty, the dispatcher configured for the stream
      # will be used.
      use-dispatcher = "akka.kafka.default-dispatcher"
    
      # Properties defined by org.apache.kafka.clients.producer.ProducerConfig
      # can be defined in this configuration section.
      kafka-clients {
        bootstrap.servers = "localhost:9092"
        enable.auto.commit = true
        auto.commit.interval.ms = 10000
        acks = "all"
        retries = 0
        batch.size = 16384
      }
    }
    

    The content of your application.conf file will be loaded by default by your ActorSystem, so whenever you create a ProducerSettings object as per below, it should take the config from akka.kafka.producer. You don't need to explicitly pass the config to the constructor.

    val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)