I am using apache camel kafka as client for producing message, what I observed is kafka producer taking 1 ms to push a message, if I merge message into batch by using camel aggregation then it is taking 100ms to push a single message.
Brief description of installation 3 kafka clusther 16Core 32GB RAM
Sample Code
String endpoint="kafka:test?topic=test&brokers=nodekfa:9092,nodekfb:9092,nodekfc:9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536";
Message message = new Message();
String payload = new ObjectMapper().writeValueAsString(message);
StopWatch stopWatch = new StopWatch();
stopWatch.watch();
for (int i=0;i<size;i++)
{
producerTemplate.sendBody(endpoint,ExchangePattern.InOnly, payload);
}
logger.info("Time taken to push {} message is {}",size,stopWatch.getElasedTime());
camel producer endpoint
kafka:[topic]?topic=[topic]&brokers=[brokers]&maxInFlightRequest=1
I am getting throughput of 1000/s though kafka documentation brag producer tps around 100,000.
Let me know if there is any bug in camel-kafka or in kafka itself.
Producer config
acks = 1
batch.size = 65536
bootstrap.servers = [nodekfa:9092, nodekfb:9092, nodekfc:9092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 1
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retries = 0
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
Test Logs
DEBUG [2019-06-02 17:30:46,781] c.g.p.f.u.AuditEventNotifier: >>> Took 3 millis for the exchange on the route : null
DEBUG [2019-06-02 17:30:46,781] c.g.p.f.u.AuditEventNotifier: >>> Took 3 millis to send to external system : kafka://test?brokers=nodekfa%3A9092%2Cnodekfb%3A9092%2Cnodekfc%3A9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536&topic=test by thead http-nio-8551-exec-6
DEBUG [2019-06-02 17:30:46,783] c.g.p.f.u.AuditEventNotifier: >>> Took 2 millis for the exchange on the route : null
DEBUG [2019-06-02 17:30:46,783] c.g.p.f.u.AuditEventNotifier: >>> Took 2 millis to send to external system : kafka://test?brokers=nodekfa%3A9092%2Cnodekfb%3A9092%2Cnodekfc%3A9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536&topic=test by thead http-nio-8551-exec-6
DEBUG [2019-06-02 17:30:46,784] c.g.p.f.u.AuditEventNotifier: >>> Took 1 millis for the exchange on the route : null
DEBUG [2019-06-02 17:30:46,785] c.g.p.f.u.AuditEventNotifier: >>> Took 2 millis to send to external system : kafka://test?brokers=nodekfa%3A9092%2Cnodekfb%3A9092%2Cnodekfc%3A9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536&topic=test by thead http-nio-8551-exec-6
DEBUG [2019-06-02 17:30:46,786] c.g.p.f.u.AuditEventNotifier: >>> Took 1 millis for the exchange on the route : null
DEBUG [2019-06-02 17:30:46,786] c.g.p.f.u.AuditEventNotifier: >>> Took 1 millis to send to external system : kafka://test?brokers=nodekfa%3A9092%2Cnodekfb%3A9092%2Cnodekfc%3A9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536&topic=test by thead http-nio-8551-exec-6
DEBUG [2019-06-02 17:30:46,788] c.g.p.f.u.AuditEventNotifier: >>> Took 2 millis for the exchange on the route : null
DEBUG [2019-06-02 17:30:46,788] c.g.p.f.u.AuditEventNotifier: >>> Took 2 millis to send to external system : kafka://test?brokers=nodekfa%3A9092%2Cnodekfb%3A9092%2Cnodekfc%3A9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536&topic=test by thead http-nio-8551-exec-6
INFO [2019-06-02 17:30:46,788] c.g.p.f.a.MessageApiController: Time taken to push 5 message is 10ms
It is clearly taking minimum 1ms for message, default worker pool max size is 20 , if i set compression codec to snappy this will make performance worst.
Let me know what I am missing !!
I am facing the same issue, from this email https://camel.465427.n5.nabble.com/Kafka-Producer-Performance-tp5785767p5785860.html I used https://camel.apache.org/manual/latest/aggregate-eip.html to create batches and got better performance
from("direct:dp.events")
.aggregate(constant(true), new ArrayListAggregationStrategy())
.completionSize(3)
.to(kafkaUri)
.to("log:out?groupInterval=1000&groupDelay=500")
.end();
I get :
INFO Received: 1670 new messages, with total 13949 so far. Last group took: 998 millis which is: 1,673.347 messages per second. average: 1,262.696
This is using 1 Azure Event Hub using Kafka Protocol w/ one partition. The weird thing is that when I use another EH w/ 5 partitions I get bad performance compare to the 1 partition example...
I was able to get 3K message per second by increasing the workerPoolCoreSize and the workerPoolMaxSize, in addition to adding partition keys to the messages and adding aggregation before sending to kafka endpoint