Currently ~70 records/s will be processed on a single node with a single Kafka broker. The throughput is low, as is the CPU utilisation and the memory usage. My topology:
projekte
.leftJoin(wirtschaftseinheiten)
.leftJoin(mietobjekte)
.cogroup { _, current, previous: ProjektAggregat ->
previous.copy(
projekt = current.projekt,
wirtschaftseinheit = current.wirtschaftseinheit,
mietobjekt = current.mietobjekt,
projektErstelltAm = current.projektErstelltAm
)
}
.cogroup(projektstatus.groupByKey()) { _, projektstatusEvent, aggregat -> aggregat + projektstatusEvent }
.cogroup(befunde.groupByKey()) { _, befundAggregat, aggregat -> aggregat + befundAggregat }
.cogroup(aufgaben.groupByKey()) { _, aufgabeAggregat, aggregat -> aggregat + aufgabeAggregat }
.cogroup(durchfuehrungen.groupByKey()) { _, durchfuehrungAggregat, aggregat -> aggregat + durchfuehrungAggregat }
.cogroup(gruppen.groupByKey()) { _, gruppeAggregat, aggregat -> aggregat + gruppeAggregat }
.aggregate({ ProjektAggregat() }, Materialized.`as`(projektStoreSupplier))
I've tried to increase different size to feed more data to my stream:
How can I increase throughput to achieve optimal system utilisation?
With the following properties I could speed up processing by factor 4 by increasing I/O throughput:
# consumer
max.partition.fetch.bytes: 52428800
max.poll.records: 5000
fetch.min.bytes: 5242880
#producer
max.request.size: 52428800
batch.size: 200000
linger.ms: 100
# common
cache.max.bytes.buffering: 52428800
Increasing parallelism to 20 on a 24-core system I further could achieve an optimal CPU utilisation of 80% and speed up the processing further by a factor of 9:
num.stream.threads: 20