I'm trying to create a simple producer which create a topic with some partitions provided by configuration.
According to Alpakka Producer Setting Doc any property from org.apache.kafka.clients.producer.ProducerConfig
can be set in kafka-clients
section. And, there is a num.partitions
property as commented in Producer API Doc .
Thus, I added that property to my application.conf
file as given below:
topic = "topic"
topic = ${?TOPIC}
# Properties for akka.kafka.ProducerSettings can be
# defined in this section or a configuration section with
# the same layout.
akka.kafka.producer {
# Tuning parameter of how many sends that can run in parallel.
parallelism = 100
parallelism = ${?PARALLELISM}
# Duration to wait for `KafkaConsumer.close` to finish.
close-timeout = 20s
# Fully qualified config path which holds the dispatcher configuration
# to be used by the producer stages. Some blocking may occur.
# When this value is empty, the dispatcher configured for the stream
# will be used.
use-dispatcher = "akka.kafka.default-dispatcher"
# The time interval to commit a transaction when using the `Transactional.sink` or `Transactional.flow`
eos-commit-interval = 100ms
# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
kafka-clients {
bootstrap.servers = "my-kafka:9092"
bootstrap.servers = ${?BOOTSTRAPSERVERS}
num.partitions = "3"
num.partitions = ${?NUM_PARTITIONS}
}
}
The producer application code is also given below:
object Main extends App {
val config = ConfigFactory.load()
implicit val system: ActorSystem = ActorSystem("producer")
implicit val materializer: Materializer = ActorMaterializer()
val producerConfigs = config.getConfig("akka.kafka.producer")
val producerSettings = ProducerSettings(producerConfigs, new StringSerializer, new StringSerializer)
val topic = config.getString("topic")
val done: Future[Done] =
Source(1 to 100000)
.map(_.toString)
.map(value => new ProducerRecord[String, String](topic, value))
.runWith(Producer.plainSink(producerSettings))
implicit val ec: ExecutionContextExecutor = system.dispatcher
done onComplete {
case Success(_) => println("Done"); system.terminate()
case Failure(err) => println(err.toString); system.terminate()
}
}
But, this doesn't work. Producer creates a topic with a single partition instead of 3 partitions as I've set by configuration:
num.partitions = "3"
Finally, Kafkacat output is given below:
~$ kafkacat -b my-kafka:9092 -L
Metadata for all topics (from broker -1: my-kafka:9092/bootstrap):
3 brokers:
broker 2 at my-kafka-2.my-kafka-headless.default:9092
broker 1 at my-kafka-1.my-kafka-headless.default:9092
broker 0 at my-kafka-0.my-kafka-headless.default:9092
1 topics:
topic "topic" with 1 partitions:
partition 0, leader 2, replicas: 2, isrs: 2
What is wrong? Is it possible to set properties from Kafka Producer API in kafka-clients
section using Alpakka?
# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
As this says, ProducerConfig
is for producer settings, not broker settings, which is what num.partitions
is (I think you got lost in which table the property was shown on the Apache Kafka docs... scroll to the top of it to see the proper header).
There is no way to set the partitions of a topic from the producer... You would need to use AdminClient
class to create a topic, and the number of partitions is a parameter there, not a configuation property.
Sample code
val props = new Properties()
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
val adminClient = AdminClient.create(props)
val numPartitions = 3
val replicationFactor = 3.toShort
val newTopic = new NewTopic("new-topic-name", numPartitions, replicationFactor)
val configs = Map(TopicConfig.COMPRESSION_TYPE_CONFIG -> "gzip")
// settings some configs
newTopic.configs(configs.asJava)
adminClient.createTopics(List(newTopic).asJavaCollection)
And then you can start the producer