Search code examples
chroniclechronicle-queue

How can I simulate real time replay of data with chronicle queue and Excerpt Tailer?


Currently I am able to read a chronicle queue's excerpt appender and read all excerpts in it. However, I want to be able to simulate the real time reading of data when Chronicle Queue persisted the data when I read the data from the Tailer. So for example, if the excerpt appender appended an item at 3pm and there is no data received for 10 minutes, when reading from the tailer, I want to wait for 10 minutes or a dynamic amount of time before reading the next excerpt.

Is this possible? Currently I am just reading everything that is available and I think it is read with uniform time.

public class Replayer implements Runnable {
    private static final Logger LOGGER = LogManager.getLogger(Replayer.class);
    private final ChronicleQueue QUEUE;
    private final ExcerptTailer TAILER;
    private final String CYCLE_MODE;
    private final String CYCLE_STR;
    private final String REPLAY;
    private final String NETWORK_INTERFACE;

    public Replayer(Properties prop) {
        this(prop, prop.getProperty("CHRONICLE_PATH"));
    }

    public Replayer(Properties properties, String path) {
        String PATH = path;
        CYCLE_MODE = properties.getProperty("CYCLE_MODE");
        CYCLE_STR = properties.getProperty("CYCLE");
        REPLAY = properties.getProperty("REPLAY");
        NETWORK_INTERFACE = properties.getProperty("NETWORK_INTERFACE");

        QUEUE = SingleChronicleQueueBuilder.single(PATH).rollCycle(RollCycles.FAST_DAILY).build();
        TAILER = QUEUE.createTailer();
    }

    private MoldUdpHeader moldUdpHeader = new MoldUdpHeader();
    private final ByteBuffer byteBuffer = ByteBuffer.allocate(1500);
    private final byte[] remoteAdd = new byte[4];
    private int remotePort;
    private int remaining;
    private final ReadBytesMarshallable marshallable = (b) -> {
        b.read(remoteAdd);
        remotePort = b.readInt();
        remaining = b.readInt();
        byteBuffer.put(b.bytesForRead().toByteArray(), 0, remaining);
    };

    private final HashMap<Pair<String, String>, UDPClient> mapping = new HashMap<>();
    private final Pair<String, String> pair = new Pair<>("", "");

    @Override
    public void run() {
        moveToCycle();
        System.out.println(currentCycle);  //if cycle is not available it prints Integer.MIN_VALUE -2147483648
        int counter =0;

        while (TAILER.readBytes(marshallable)) {
            if (checkStop()) {
                break;
            }
            pair.setType1(IntStream.range(0, remoteAdd.length).mapToObj(i -> String.valueOf((remoteAdd[i] & 0xFF))).collect(Collectors.joining(".")));
            pair.setType2(String.valueOf(remotePort));
//            System.out.println(IntStream.range(0, remoteAdd.length).mapToObj(i -> String.valueOf((remoteAdd[i] & 0xFF))).collect(Collectors.joining(".")));
//            System.out.println(" " + remotePort+" "+remaining);

            if (!mapping.containsKey(pair)) {
                try {
                    mapping.put(pair.copy(), new UDPClient(remotePort-5000, 1500, remoteAdd, NETWORK_INTERFACE));
                } catch (IOException e) {
                    LOGGER.warn(mainMarker, e.getMessage());
                }
            }


            if (REPLAY.equals("TRUE")) {
                try {
                    byteBuffer.flip();
                    mapping.get(pair).sendMessage(byteBuffer);
                    if ((counter = counter % 3) == 0) Thread.sleep(1);  //no seq gap
                } catch (IOException | InterruptedException e) {
                    LOGGER.warn(mainMarker, e.getMessage());
                }
            }
            moldUdpHeader = (MoldUdpHeader) moldUdpHeader.decode(byteBuffer, 0);
            System.out.println(moldUdpHeader);
            byteBuffer.clear();
            counter++;
        }

        TAILER.close();
        QUEUE.close();
    }

    private int currentCycle;
    private int cycleCounter;

    public void moveToCycle() {
        long time;
        if (CYCLE_STR.equals("TODAY")) {
            time = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
        } else {
            time = LocalDate.parse(CYCLE_STR, DateTimeFormatter.ofPattern("yyyyMMdd", Locale.US)).toEpochDay();
        }

        if (TAILER.moveToCycle((int) time)) {
            currentCycle = (int) time;
            if (!CYCLE_MODE.equals("END")) {
                try {
                    cycleCounter = currentCycle + Integer.parseInt(CYCLE_MODE);
                } catch (NumberFormatException e) {
                    LOGGER.warn(mainMarker, e.getMessage());
                }
            }
        } else {
            LOGGER.info(mainMarker, "failed to move to specified cycle. Moving to most recent cycle instead");
            TAILER.toEnd();
            TAILER.moveToCycle(TAILER.cycle());
            currentCycle = TAILER.cycle();
        }
    }

    public boolean checkStop() {
        int cycle = TAILER.cycle();
        if (CYCLE_MODE.equals("END")) {
            return false;
        } else {
            return cycle == cycleCounter;
        }
    }
}

I have not tried anything so far, as I m not sure what part of the library will let me simulate replaying/reading data in real time


Solution

  • You need to add a timestamp to the data stored in the excerpt. When you read the message, before you process it, wait an appropriate amount of time. If you are trying to simulate real-time, you may find that delays of greater than threshold make no difference, they just make the whole thing take longer.

    i.e. you simulate delays between messages up to but not more than 1 ms, 10 ms, 100 ms, 1 s, 10 s, or 1 min.