Search code examples
apache-kafkakafka-producer-api

Kafka large message configuration


I read many topics about configuration and I still don't understand.

when I do:

../kafka_2.11-1.1.0/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my_topic --config max.message.bytes=10000000

this helps

but in my application a tried everything

def producerSettings(system: ActorSystem): ProducerSettings[String, KafkaMessage] =
    ProducerSettings(system, new StringSerializer, new GenericSerde[KafkaMessage].serializer())
      .withBootstrapServers("localhost:9092")
      .withProperty("auto.create.topics.enable", "true")
      .withProperty("replica.fetch.max.bytes", maxSize)
      .withProperty("message.max.bytes", maxSize)

val producer: KafkaProducer[String, KafkaMessage] = configuration.producerSettings(context.system)
    .withBootstrapServers("localhost:" + config.getInt("kafka.port"))
    .withProperty("message.max.bytes", maxSize)
    .withProperty("max.request.size", maxSize)
    .createKafkaProducer()

when i start clean kafka - i see in logs

INFO Created log for partition my_topic-0 in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version ->
1.1-IV0, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms
-> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)

a you see max.message.bytes -> 1000012 still at default value and i have to large record message exception again

what i am doing wrong? I want do configuration by my application - not console command. Is it possible?


Solution

  • This is probably because your topic is auto created by the broker. When a topic gets created using the auto creation by the broker, it uses default config of the broker.

    You can either change the default config in the broker which is message.max.bytes but I would not recommend that as the broker config would apply to all the other topics auto created.

    Or you can create the topic explicitly in your application with the desired config so that broker won't create the topic automatically with its default config.