I have implemented a simple Spring Boot application which receive a network message, queue it into SingleChronicleQueue using appender.writeText(str), another thread polls for a message using tailer.readText(). After some processing a processed message is place in another SingleChronicleQueue to be sent away. I have three queues in the application.
The application rotates the files every night and the first weird thing is that the file sizes (for each Q) are the same (different for every Q). The largest cq4 file is about 220MB per day.
The problem that I face is that in three days from start until now the memory grew from 480MB to 1.6GB and it just unreasonable.
I have a notion that I am missing something in configuration, or a naive/bad implementation on my part. (I don't close the appender and tailer after every use, should I).
Here is a stripped down example, maybe someone can shed some light.
@Service
public class QueuesService {
private static Logger LOG = LoggerFactory.getLogger(QueuesService.class);
@Autowired
AppConfiguration conf;
private SingleChronicleQueue Q = null;
private ExcerptAppender QAppender = null;
private ExcerptTailer QTailer = null;
public QueuesService() {
}
@PostConstruct
private void init() {
Q = SingleChronicleQueueBuilder.binary(conf.getQueuePath()).indexSpacing(1).build();
QAppender = Q.acquireAppender();
QTailer = Q.createTailer();
}
public ExcerptAppender getQAppender() {
return QAppender;
}
public ExcerptTailer getQTailer() {
return QTailer;
}
}
@Service
public class ProcessingService {
private static Logger LOG = LoggerFactory.getLogger(ProcessingService.class);
@Autowired
AppConfiguration conf;
@Autowired
private TaskExecutor taskExecutor;
@Autowired
private QueuesService queueService;
private QueueProcessor processor = null;
public ProcessingService() {
}
@PostConstruct
private void init() {
processor = new QueueProcessor();
processor.start();
}
@Override
public Message processMessage(Message msg, Map<String, Object> metadata) throws SomeException {
String strMsg = msg.getMessage().toString();
if (LOG.isInfoEnabled()) {
LOG.info("\n" + strMsg);
}
try {
queueService.getQAppender().writeText(strMsg);
if (LOG.isInfoEnabled()) {
LOG.info("Added new message to queue. index: " + queueService.getQAppender().lastIndexAppended());
}
}
catch(Exception e) {
LOG.error("Unkbown error. reason: " + e.getMessage(), e);
}
}
class QueueProcessor extends Thread {
public void run() {
while (!interrupted()) {
try {
String msg = queueService.getEpicQTailer().readText();
if (msg != null) {
long index = queueService.getEpicQTailer().index();
// process
}
else {
Thread.sleep(10);
}
}
catch (InterruptedException e) {
LOG.warn(e);
this.interrupt();
break;
}
}
ThreadPoolTaskExecutor tp = (ThreadPoolTaskExecutor) taskExecutor;
tp.shutdown();
}
}
}
Chronicle Queue is designed to use virtual memory which can be much larger than main memory (or the heap) without a significant impact on your system. This allows you to access the data at random quickly.
Here is an example of a process writing 1 TB in 3 hours.
This shows how much slower it gets as the queue grows
Even after it is 1 TB in size on a machine with 128 GB, it write 1 GB under 2 seconds pretty consistently.
While this doesn't cause a technical problem, we are aware this does concern people who also find this "weird", and we plan to have a mode which reduces virtual memory use (even if a little slower for some use cases)