Search code examples
javachroniclechronicle-queue

Chronicle Consumer not reading records correctly?


I am using chronicle-queue (5.16.13) to write and read json values to a chronicle file. To write objects I use the following in a loop

try (final DocumentContext dc = appender.writingDocument()) {
        dc.wire().write(() -> "msg").text("Hallo asdf");
        System.out.println("your data was store to index="+ dc.index());
        return true;
    } catch (Exception e) {
        logger.warn("Unable to store value to chronicle", e);
        return false;
    }

and to read items I do the following call in a loop

DocumentContext documentContext;
    do {
        documentContext = tailer.readingDocument();
        currentOffset = documentContext.index();
        System.out.println("Current offset: " + currentOffset);
    } while (!documentContext.isData());

What I observe is that the variable currentOffsetdoes not change and after some time (depending on the payload size, it seems) the loop goes infinite and the current offset has crazy values. The output (shortened) for the first loop is

Writing 0
your data was store to index=76385993359360
Writing 1
your data was store to index=76385993359361
Writing 2
your data was store to index=76385993359362
Writing 3
your data was store to index=76385993359363
Writing 4
your data was store to index=76385993359364
Writing 5
your data was store to index=76385993359365
Writing 6
your data was store to index=76385993359366
Writing 7
your data was store to index=76385993359367
Writing 8
your data was store to index=76385993359368
Writing 9
your data was store to index=76385993359369
Writing 10
your data was store to index=76385993359370
Writing 11
your data was store to index=76385993359371
Writing 12
your data was store to index=76385993359372
Writing 13
your data was store to index=76385993359373
Writing 14
your data was store to index=76385993359374
Writing 15
your data was store to index=76385993359375
Writing 16
your data was store to index=76385993359376
Writing 17
your data was store to index=76385993359377
Writing 18
your data was store to index=76385993359378
Writing 19
your data was store to index=76385993359379
Writing 20
your data was store to index=76385993359380
Writing 21
your data was store to index=76385993359381
Writing 22
your data was store to index=76385993359382
Writing 23
your data was store to index=76385993359383
Writing 24
your data was store to index=76385993359384
Writing 25
your data was store to index=76385993359385
Writing 26
your data was store to index=76385993359386

And for the second loop

Reading 0
Current offset: 76385993359360
Reading 1
Current offset: 76385993359360
Reading 2
Current offset: 76385993359360
Reading 3
Current offset: 76385993359360
Reading 4
Current offset: 76385993359360
Reading 5
Current offset: 76385993359360
Reading 6
Current offset: 76385993359360
Reading 7
Current offset: 76385993359360
Reading 8
Current offset: 76385993359360
Reading 9
Current offset: 76385993359360
Reading 10
Current offset: 76385993359360
Reading 11
Current offset: 76385993359360
Reading 12
Current offset: 76385993359360
Reading 13
Current offset: 76385993359360
Reading 14
Current offset: 76385993359360
Reading 15
Current offset: 76385993359360
Reading 16
Current offset: 76385993359360
Reading 17
Current offset: 76385993359360
Reading 18
Current offset: 76385993359360
Reading 19
Current offset: 76385993359360
Reading 20
Current offset: 76385993359360
Reading 21
Current offset: 76385993359360
Reading 22
Current offset: 76385993359360
Reading 23
Current offset: 76385993359360
Reading 24
Current offset: 76385993359360
Reading 25
Current offset: -9223372036854775808

Am I doing something completely wrong? Can then anyobody hint me to the correct usage?

Thanks a lot!

Edit: Added minimal working example

The following unit test fails for me.

@Test
public void fails() throws Exception {
    String basePath = System.getProperty("java.io.tmpdir");
    String path = Files.createTempDirectory(Paths.get(basePath), "chronicle-")
            .toAbsolutePath()
            .toString();
    logger.info("Using temp path '{}'", path);

    SingleChronicleQueue chronicleQueue = SingleChronicleQueueBuilder
            .single()
            .path(path)
            .build();

    // Create Appender
    ExcerptAppender appender = chronicleQueue.acquireAppender();

    // Create Tailer
    ExcerptTailer tailer = chronicleQueue.createTailer();
    tailer.toStart();

    int numberOfRecords = 10;

    // Write
    for (int i = 0; i <= numberOfRecords; i++) {
        System.out.println("Writing " + i);
        try (final DocumentContext dc = appender.writingDocument()) {
            dc.wire().write(() -> "msg").text("Hello World!");
            System.out.println("your data was store to index=" + dc.index());
        } catch (Exception e) {
            logger.warn("Unable to store value to chronicle", e);
        }
    }
    // Read
    for (int i = 0; i <= numberOfRecords; i++) {
        System.out.println("Reading " + i);
        DocumentContext documentContext = tailer.readingDocument();
        long currentOffset = documentContext.index();
        System.out.println("Current offset: " + currentOffset);

        Wire wire = documentContext.wire();

        if (wire != null) {
            String msg = wire
                    .read("msg")
                    .text();
        }
    }

    chronicleQueue.close();
} 

Output is

Writing 0
your data was store to index=76385993359360
Writing 1
your data was store to index=76385993359361
Writing 2
your data was store to index=76385993359362
Writing 3
your data was store to index=76385993359363
Writing 4
your data was store to index=76385993359364
Writing 5
your data was store to index=76385993359365
Writing 6
your data was store to index=76385993359366
Writing 7
your data was store to index=76385993359367
Writing 8
your data was store to index=76385993359368
Writing 9
your data was store to index=76385993359369
Writing 10
your data was store to index=76385993359370
Reading 0
Current offset: 76385993359360
Reading 1
Current offset: 76385993359360
Reading 2
Current offset: 76385993359360
Reading 3
Current offset: 76385993359360
Reading 4
Current offset: -9223372036854775808
Reading 5
Current offset: -9223372036854775808
Reading 6
Current offset: -9223372036854775808
Reading 7
Current offset: -9223372036854775808
Reading 8
Current offset: -9223372036854775808
Reading 9
Current offset: -9223372036854775808
Reading 10
Current offset: -9223372036854775808

Solution

  • Using DocumentContext is intended to be one of the lower level interfaces and not to everyone's taste. I favour using the MethodReader/MethodWriter approach unless you have a reason to work at the lower level.

    @Test
    public void works() {
        String path = OS.TMP + "/chronicle-" + System.nanoTime();
        System.out.println("Using temp path " + path);
    
        try (SingleChronicleQueue queue = SingleChronicleQueueBuilder
                .single()
                .path(path)
                .build()) {
    
            ExcerptAppender appender = queue.acquireAppender();
            Messager messager = appender.methodWriter(Messager.class);
    
            int numberOfRecords = 10;
    
            // Write
            for (int i = 0; i <= numberOfRecords; i++) {
                System.out.print("Writing " + i);
                messager.msg("Hello World!");
                System.out.println(", your data was stored at index=" + appender.lastIndexAppended());
            }
    
            ExcerptTailer tailer = queue.createTailer();
            MethodReader reader = tailer.methodReader((Messager) msg -> {
                System.out.println("Current offset: " + tailer.index()
                        + " msg: " + msg);
            });
    
            // Read
            while (reader.readOne()) {
                // busy wait.
            }
        }
    }
    

    This prints

    Using temp path C:\Users\peter\AppData\Local\Temp\/chronicle-412979753710181
    [main] DEBUG net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in C:\Users\peter\AppData\Local\Temp\chronicle-412979753710181\metadata.cq4t took 15.418 ms.
    [main] DEBUG net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in C:\Users\peter\AppData\Local\Temp\chronicle-412979753710181\20181226.cq4 took 25.061 ms.
    Writing 0, your data was stored at index=76841259892736
    Writing 1, your data was stored at index=76841259892737
    Writing 2, your data was stored at index=76841259892738
    Writing 3, your data was stored at index=76841259892739
    Writing 4, your data was stored at index=76841259892740
    Writing 5, your data was stored at index=76841259892741
    Writing 6, your data was stored at index=76841259892742
    Writing 7, your data was stored at index=76841259892743
    Writing 8, your data was stored at index=76841259892744
    Writing 9, your data was stored at index=76841259892745
    Writing 10, your data was stored at index=76841259892746
    
    Current offset: 76841259892736 msg: Hello World!
    Current offset: 76841259892737 msg: Hello World!
    Current offset: 76841259892738 msg: Hello World!
    Current offset: 76841259892739 msg: Hello World!
    Current offset: 76841259892740 msg: Hello World!
    Current offset: 76841259892741 msg: Hello World!
    Current offset: 76841259892742 msg: Hello World!
    Current offset: 76841259892743 msg: Hello World!
    Current offset: 76841259892744 msg: Hello World!
    Current offset: 76841259892745 msg: Hello World!
    Current offset: 76841259892746 msg: Hello World!
    [main] DEBUG net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder - File released C:\Users\peter\AppData\Local\Temp\chronicle-412979753710181\20181226.cq4
    [main] DEBUG net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder - File released C:\Users\peter\AppData\Local\Temp\chronicle-412979753710181\20181226.cq4
    

    NOTE: This writes the same data as the original post.

    An advantage of using this interface approach is that you can implement your business component entirely using interfaces of methods with DTOs and not use Chronicle (or at transport) at all. This simplifies testing business logic as you remove the transport from the tests.