Search code examples
javachroniclechronicle-queue

Chronicle Queue slows down and runs out of memory


I'm evaluating Chronicle Queue for usage in our software and I must be doing something wrong.
I have an appender that will write about 650k entries very quick. After that it grinds to a halt, at which point the memory has spiked to max allowed and eventually hits OutOfMemory.

Here is my code:

final class LogEntryOutput implements WriteBytesMarshallable
{
  private final int maxMessageSize;
  private TLogEntry logEntry;

  LogEntryOutput(final int maxMessageSize)
  {
    this.maxMessageSize = maxMessageSize;
  }

  public void setMarshallable(final TLogEntry logEntry)
  {
    this.logEntry = logEntry;
  }

  @Override
  @SuppressWarnings({"rawtypes", "No way to provide generic type and override WriteBytesMarshallable."})
  public void writeMarshallable(final BytesOut bytes)
  {
    bytes.writeLong(this.logEntry.getSessionId());
    bytes.writeInt(this.logEntry.getLogLevel());
    bytes.writeInt(this.logEntry.getSecurityLevel());
    bytes.writeLong(this.logEntry.getPosixTimestamp());

    // Limit size of string messages.
    final int messageSize = Math.min(this.logEntry.getMessage().length(), this.maxMessageSize);

    // Write message length
    bytes.writeStopBit((long)messageSize);

    // Write message bytes.
    bytes.write(this.logEntry.getMessage(), 0, messageSize);
  }
}    

final TLogEntry entry = new TLogEntry();
entry.setSessionId(321234L);
entry.setLogLevel(77);
entry.setSecurityLevel(1234);
entry.setPosixTimestamp(6141234321L);
entry.setMessage("This is a test message for the system................................ A");

final LogEntryOutput output = new LogEntryOutput(1024);
output.setMarshallable(entry);

final ChronicleQueue queue = SingleChronicleQueueBuilder.binary(config.getQueueDirectory())
  .rollCycle(RollCycles.HOURLY)
  .build();
final ExcerptAppender appender = queue.acquireAppender();

for (int j = 0; j < 100; ++j)
{
  for (int i = 0; i < 10000; ++i)
  {
    appender.writeBytes(output);
  }

  System.out.println((j+1) * 10000);
  Jvm.pause(100L);
}

queue.close();

This is running in Windows 7 x64 with a 64bit JVM using:-Xmx1024m
Any ideas what I might be doing wrong?

EDIT: I have additional information. I took a snapshot of object allocation just after the memory spike. Lots of object arrays and such. enter image description here enter image description here And a stack trace when I get the OOM error.

java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.concurrent.ConcurrentLinkedQueue.offer(ConcurrentLinkedQueue.java:328)
    at java.util.concurrent.ConcurrentLinkedQueue.add(ConcurrentLinkedQueue.java:297)
    at net.openhft.chronicle.core.ReferenceCounter.recordRelease(ReferenceCounter.java:88)
    at net.openhft.chronicle.core.ReferenceCounter.release(ReferenceCounter.java:79)
    at net.openhft.chronicle.bytes.NativeBytesStore.release(NativeBytesStore.java:267)
    at net.openhft.chronicle.bytes.MappedBytes.acquireNextByteStore(MappedBytes.java:186)
    at net.openhft.chronicle.bytes.MappedBytes.peekVolatileInt(MappedBytes.java:388)
    at net.openhft.chronicle.wire.AbstractWire.readMetaDataHeader(AbstractWire.java:222)
    at net.openhft.chronicle.queue.impl.single.SCQIndexing.arrayForAddress(SCQIndexing.java:190)
    at net.openhft.chronicle.queue.impl.single.SCQIndexing.sequenceForPosition(SCQIndexing.java:492)
    at net.openhft.chronicle.queue.impl.single.SingleChronicleQueueStore.sequenceForPosition(SingleChronicleQueueStore.java:272)
    at net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts$StoreAppender.checkWritePositionHeaderNumber(SingleChronicleQueueExcerpts.java:339)
    at net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts$StoreAppender.writingDocument(SingleChronicleQueueExcerpts.java:267)
    at net.openhft.chronicle.wire.MarshallableOut.writingDocument(MarshallableOut.java:55)
    at net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts$StoreAppender.writeBytes(SingleChronicleQueueExcerpts.java:117)
    at com.selinc.winchester.ledger.writer.harness.queue.LogEntryConsumers$LogEntryChronicle.accept(LogEntryConsumers.java:78)
    at com.selinc.winchester.ledger.writer.harness.queue.LogEntryConsumers$LogEntryChronicle.accept(LogEntryConsumers.java:45)
    at com.selinc.winchester.ledger.writer.harness.queue.LogEntryConsumersTest.test(LogEntryConsumersTest.java:56)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:86)
    at org.testng.internal.Invoker.invokeMethod(Invoker.java:643)
    at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:820)
    at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1128)
    at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:129)
    at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:112)
    at org.testng.TestRunner.privateRun(TestRunner.java:782)
    at org.testng.TestRunner.run(TestRunner.java:632)
    at org.testng.SuiteRunner.runTest(SuiteRunner.java:366)
    at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:361)

Solution

  • Chronicle Queue has some additional checks to detect memory leaks which are turned on with -ea. If you run with these extra checks on, queue does slow down around 90,000 messages in your case. If you turn off assertions you get it running for much longer.

    This runs to 10,000,000 entries in 5.5 seconds on a Windows laptop with 8 GB memory, with assertions off

    It also does 100 million record in 66 seconds.

    public class ATest {
        static class TLogEntry {
    
            private long sessionId;
            private int logLevel;
            private int securityLevel;
            private long posixTimestamp;
            private CharSequence message;
    
            public long getSessionId() {
                return sessionId;
            }
    
            public void setSessionId(long sessionId) {
                this.sessionId = sessionId;
            }
    
            public int getLogLevel() {
                return logLevel;
            }
    
            public void setLogLevel(int logLevel) {
                this.logLevel = logLevel;
            }
    
            public int getSecurityLevel() {
                return securityLevel;
            }
    
            public void setSecurityLevel(int securityLevel) {
                this.securityLevel = securityLevel;
            }
    
            public long getPosixTimestamp() {
                return posixTimestamp;
            }
    
            public void setPosixTimestamp(long posixTimestamp) {
                this.posixTimestamp = posixTimestamp;
            }
    
            public CharSequence getMessage() {
                return message;
            }
    
            public void setMessage(CharSequence message) {
                this.message = message;
            }
        }
    
        static class LogEntryOutput implements WriteBytesMarshallable {
            private final int maxMessageSize;
            private TLogEntry logEntry;
    
            LogEntryOutput(final int maxMessageSize) {
                this.maxMessageSize = maxMessageSize;
            }
    
            public void setMarshallable(final TLogEntry logEntry) {
                this.logEntry = logEntry;
            }
    
            @Override
            @SuppressWarnings({"rawtypes", "No way to provide generic type and override WriteBytesMarshallable."})
            public void writeMarshallable(final BytesOut bytes) {
                bytes.writeLong(this.logEntry.getSessionId());
                bytes.writeInt(this.logEntry.getLogLevel());
                bytes.writeInt(this.logEntry.getSecurityLevel());
                bytes.writeLong(this.logEntry.getPosixTimestamp());
    
                // Limit size of string messages.
                final int messageSize = Math.min(this.logEntry.getMessage().length(), this.maxMessageSize);
    
                // Write message length
                bytes.writeStopBit((long) messageSize);
    
                // Write message bytes.
                bytes.write(this.logEntry.getMessage(), 0, messageSize);
            }
        }
    
        @Test
        public void test() {
            final TLogEntry entry = new TLogEntry();
            entry.setSessionId(321234L);
            entry.setLogLevel(77);
            entry.setSecurityLevel(1234);
            entry.setPosixTimestamp(6141234321L);
            entry.setMessage("This is a test message for the system................................ A");
    
            final LogEntryOutput output = new LogEntryOutput(1024);
            output.setMarshallable(entry);
    
            final ChronicleQueue queue = SingleChronicleQueueBuilder.binary(
                    OS.TARGET + "/test-" + System.nanoTime())
                    .rollCycle(RollCycles.HOURLY)
                    .build();
            final ExcerptAppender appender = queue.acquireAppender();
            Jvm.setExceptionHandlers(Slf4jExceptionHandler.FATAL, Slf4jExceptionHandler.WARN, Slf4jExceptionHandler.WARN);
            for (int j = 0; j < 1000; ++j) {
                for (int i = 0; i < 10000; ++i) {
                    appender.writeBytes(output);
                }
    
                System.out.println((j + 1) * 10000);
                // Jvm.pause(100L);
            }
    
            queue.close();
        }
    }