I have scenario where multiple threads are writing to the same queue.
Appender threads receive updates from the different markets (each thread single market) and push those data into same queue:
ChronicleQueue queue = SingleChronicleQueueBuilder.binary(path + "/market").build();
final ExcerptTailer tailer = queue.createTailer();
appender.writeDocument(
wire -> {
wire
.getValueOut().text("buy")
.getValueOut().text(exchange.name())
.getValueOut().text(currencyPair.toString())
.getValueOut().dateTime(LocalDateTime.now(Clock.systemUTC()))
.getValueOut().text(price);
});
Then I have completely separate process(different JVM) to continuously read from the queue by doing:
while (true){
tailer.readDocument(........
But while I generate about 10 updates to the queue per second, the tailer process about one record per 3 seconds. I think I am missing something fundamental here :-)
Or what is the correct way to continuously listen for updates on the queue? I wasn't able to find any other solution than while (true) then do...
I am developing on 18-core machine (36 threads) and use Java Affinity to assign each work to itc own CPU.
Thanks for any hints.
Creating a queue is very expensive, try to only do this once per process if you can.
Creating a Tailer is also expensive, you should create this once and keep polling for the updates.
Creating objects can be expensive, I would avoid creating any objects. e.g. avoid calling toString
or LocalDate.now
Here is an example of benchmarking
String path = OS.getTarget();
ChronicleQueue queue = SingleChronicleQueueBuilder.binary(path + "/market").build();
ExcerptAppender appender = queue.acquireAppender();
Exchange exchange = Exchange.EBS;
CurrencyPair currencyPair = CurrencyPair.EURUSD;
double price = 1.2345;
for (int t = 0; t < 5; t++) {
long start = System.nanoTime();
int messages = 100000;
for (int i = 0; i < messages; i++) {
try (DocumentContext dc = appender.writingDocument()) {
ValueOut valueOut = dc.wire().getValueOut();
valueOut.text("buy")
.getValueOut().asEnum(exchange)
.getValueOut().asEnum(currencyPair)
.getValueOut().int64(System.currentTimeMillis())
.getValueOut().float64(price);
}
}
long time = System.nanoTime() - start;
System.out.printf("Throughput was %,d messages per second%n", (long) (messages * 1e9 / time));
Jvm.pause(100);
}
prints
Throughput was 962,942 messages per second
Throughput was 2,952,433 messages per second
Throughput was 4,776,337 messages per second
Throughput was 3,250,235 messages per second
Throughput was 3,514,863 messages per second
And for reading you can do
final ExcerptTailer tailer = queue.createTailer();
for (int t = 0; t < 5; t++) {
long start = System.nanoTime();
int messages = 100000;
for (int i = 0; i < messages; i++) {
try (DocumentContext dc = tailer.readingDocument()) {
if (!dc.isPresent())
throw new AssertionError("Missing t: " + t + ", i: " + i);
ValueIn in = dc.wire().getValueIn();
String buy = in.text();
Exchange exchange2 = in.asEnum(Exchange.class);
CurrencyPair currencyPair2 = in.asEnum(CurrencyPair.class);
long time = in.int64();
double price2 = in.float64();
}
}
long time = System.nanoTime() - start;
System.out.printf("Read Throughput was %,d messages per second%n", (long) (messages * 1e9 / time));
}
note: it reads the same number of messages as were written.
prints
Read Throughput was 477,849 messages per second
Read Throughput was 3,083,642 messages per second
Read Throughput was 5,100,516 messages per second
Read Throughput was 6,342,525 messages per second
Read Throughput was 6,672,971 messages per second