I'm in the process of trying out ActiveMQ with Flatbuffers. Everything seems to be working fine on the producer but the consumer memory keeps going up the longer the process runs.
The producer marks the message to be NON_PERSISTENT
and send about 30 times per second. Each message is a byte message that around 3000 bytes.
Producer.cpp
void Producer::send_message(uint8_t* pointer, size_t size) {
auto msg = std::unique_ptr<cms::BytesMessage>(session->createBytesMessage(pointer, size));
producer->send(msg.get());
}
void Producer::run() {
try {
std::unique_ptr <activemq::core::ActiveMQConnectionFactory> connectionFactory(new activemq::core::ActiveMQConnectionFactory(brokerURI));
connection.reset(connectionFactory->createConnection());
session.reset(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
destination.reset(session->createTopic(destURI));
producer.reset(session->createProducer(destination.get()));
producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
connection->start();
}
catch (cms::CMSException& e) {
e.printStackTrace();
}
}
Consumer.cpp
void Consumer::onMessage(const cms::Message * message)
{
try
{
const auto msg = dynamic_cast<const cms::BytesMessage*>(message);
const auto data = msg->getBodyBytes();
const auto size = msg->getBodyLength();
flatbuffers::Verifier verifier((uint8_t*)(data), size);
if (Ditto::VerifyDataBuffer(verifier)) {
// Do something with the buffer
}
}
catch (cms::CMSException& e) {
e.printStackTrace();
}
}
void Consumer::run()
{
try {
std::unique_ptr<activemq::core::ActiveMQConnectionFactory> connectionFactory(new activemq::core::ActiveMQConnectionFactory(brokerURI));
connection.reset(connectionFactory->createConnection());
std::shared_ptr<activemq::core::ActiveMQConnection> amqConnection = std::dynamic_pointer_cast<activemq::core::ActiveMQConnection>(connection);
if (amqConnection != nullptr) {
amqConnection->addTransportListener(this);
}
connection->start();
connection->setExceptionListener(this);
session.reset(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
destination.reset(session->createTopic(destURI));
consumer.reset(session->createConsumer(destination.get()));
consumer->setMessageListener(this);
}
catch (cms::CMSException& e) {
e.printStackTrace();
activemq::library::ActiveMQCPP::shutdownLibrary();
}
}
Then, I called the Consumer
with:
int main()
{
activemq::library::ActiveMQCPP::initializeLibrary();
Consumer consumer("failover:(tcp://127.0.0.1:61616)", "Test-Topic");
consumer.run();
while (1) {}
consumer.close();
activemq::library::ActiveMQCPP::shutdownLibrary();
}
The Consumer
was able to receive and process message. However, the memory of the Consumer
keep going up. The memory was about 200MB just after 10 minutes run. In the CMS Overview, they mentioned that the pointer passed to the onMessage
is own by the call, so I should not try and delete it. However, it seems that the caller never delete the message, which make the memory keep going up.
Is there any way that I can free the memory of the message after each onMessage
call?
Thank you so much for your time and help.
I figured out.
The getBodyBytes()
return a pointer to array that I should cleanup after the call. So I just need to wrap it in a std::unique_ptr
for it to be properly cleanup.
The onMessage()
should look like this:
void Consumer::onMessage(const cms::Message * message)
{
try
{
const auto msg = dynamic_cast<const cms::BytesMessage*>(message);
std::unique_ptr<unsigned char> data(msg->getBodyBytes());
auto size = msg->getBodyLength();
flatbuffers::Verifier verifier((uint8_t*)(data), size);
if (Ditto::VerifyDataBuffer(verifier)) {
// Do something with the buffer
}
}
catch (cms::CMSException& e) {
e.printStackTrace();
}
}