Search code examples
chroniclechronicle-queue

Chronicle Queue - Read from last read position and delete files if read by all consumers


I am using Chronicle 4.5.27 for writing and reading market data. I have got a single writer but multiple readers. Development OS is Windows followed by Linux for Prod deployment.

How to implement following uses cases?

  1. How to start reading queues form last read position ? e.g. if reader has read 15 records from a file which has 100 records and crashed/stopped how to start reading from 16th record from next restart? Is there an inbuilt durable support in CQ?
  2. Delete files which are read by all consumers to save disk space.

For this I have implemented but seems the files are not deleted on windows due to some open issue. Is there any built support in CQ where files can be deleted only if proceeded by all interested consumers?

public static long readMarketData(String pathForMarketDataFile, long indexFrom) {
SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(pathForMarketDataFile).rollCycle(RollCycles.MINUTELY).storeFileListener(new StoreFileListener() {
    @Override
    public void onReleased(int i, File file) {
        System.out.println("File is not in use and is ready for deletion: " + file.getName());
        try {
            file.delete();
        } catch (IOException e) {
            e.printStackTrace();
        }
        ;
        System.out.println("File deleted:  " + file.getName() );
    }

    @Override
    public void onAcquired(int cycle, File file) {
        System.out.println("File is use for reading: " + file.getName());
    }
}).build();

I have read few blogs and posts on this topic e.g.

https://vanilla-java.github.io/2016/03/29/Microservices-in-the-Chronicle-world-Part-4.html

https://groups.google.com/forum/#!topic/java-chronicle/0Nz5P-nvLgM

But still want to know if any one has implemented this use case.


Solution

  • Tracking of the consumer high-water mark is performed using MessageHistory, however, this requires that your consumers are also writing output to a chronicle queue (essentially storing the consumer read-sequence in the output queue).

    Alternatively, you would need to implement your own mechanism for recording the highest sequence (index) each consumer has seen.

    In terms of deleting files, there may be other processes holding open file handles to the queue files. If reader-A is no longer using queue file 15.cq4, then your code will attempt to call file.delete(), but reader-B may still have a reference to that file, stopping it from being deleted.

    A more robust policy would be to have some sort of event from each reader to another service/process that is responsible for deleting the files once all readers have finished processing them.