Search code examples
c++activemq-artemisflatbuffersactivemq-cpp

ActiveMQ consumer memory usage keep increasing after onMessage call


I'm in the process of trying out ActiveMQ with Flatbuffers. Everything seems to be working fine on the producer but the consumer memory keeps going up the longer the process runs.

The producer marks the message to be NON_PERSISTENT and send about 30 times per second. Each message is a byte message that around 3000 bytes.

Producer.cpp

void Producer::send_message(uint8_t* pointer, size_t size) {
    auto msg = std::unique_ptr<cms::BytesMessage>(session->createBytesMessage(pointer, size));
    producer->send(msg.get());
}

void Producer::run() {
    try {
        std::unique_ptr <activemq::core::ActiveMQConnectionFactory> connectionFactory(new activemq::core::ActiveMQConnectionFactory(brokerURI));

        connection.reset(connectionFactory->createConnection());
        session.reset(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
        destination.reset(session->createTopic(destURI));
        producer.reset(session->createProducer(destination.get()));
        producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
        connection->start();
    }
    catch (cms::CMSException& e) {
        e.printStackTrace();
    }
}

Consumer.cpp

void Consumer::onMessage(const cms::Message * message)
{
    try
    {
        const auto msg = dynamic_cast<const cms::BytesMessage*>(message);

        const auto data = msg->getBodyBytes();
        const auto size = msg->getBodyLength();
        flatbuffers::Verifier verifier((uint8_t*)(data), size);

        if (Ditto::VerifyDataBuffer(verifier)) {
            // Do something with the buffer
        }
    }
    catch (cms::CMSException& e) {
        e.printStackTrace();
    }
}

void Consumer::run()
{
    try {
        std::unique_ptr<activemq::core::ActiveMQConnectionFactory> connectionFactory(new activemq::core::ActiveMQConnectionFactory(brokerURI));

        connection.reset(connectionFactory->createConnection());

        std::shared_ptr<activemq::core::ActiveMQConnection> amqConnection = std::dynamic_pointer_cast<activemq::core::ActiveMQConnection>(connection);
        if (amqConnection != nullptr) {
            amqConnection->addTransportListener(this);
        }

        connection->start();
        connection->setExceptionListener(this);
        session.reset(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
        destination.reset(session->createTopic(destURI));
        consumer.reset(session->createConsumer(destination.get()));
        consumer->setMessageListener(this);
    }
    catch (cms::CMSException& e) {
        e.printStackTrace();
        activemq::library::ActiveMQCPP::shutdownLibrary();
    }
}

Then, I called the Consumer with:

int main()
{
    activemq::library::ActiveMQCPP::initializeLibrary();
    Consumer consumer("failover:(tcp://127.0.0.1:61616)", "Test-Topic");
    consumer.run();

    while (1) {}

    consumer.close();
    activemq::library::ActiveMQCPP::shutdownLibrary();
}

The Consumer was able to receive and process message. However, the memory of the Consumer keep going up. The memory was about 200MB just after 10 minutes run. In the CMS Overview, they mentioned that the pointer passed to the onMessage is own by the call, so I should not try and delete it. However, it seems that the caller never delete the message, which make the memory keep going up.

Is there any way that I can free the memory of the message after each onMessage call?

Thank you so much for your time and help.


Solution

  • I figured out.

    The getBodyBytes() return a pointer to array that I should cleanup after the call. So I just need to wrap it in a std::unique_ptr for it to be properly cleanup.

    The onMessage() should look like this:

    void Consumer::onMessage(const cms::Message * message)
    {
        try
        {
            const auto msg = dynamic_cast<const cms::BytesMessage*>(message);
    
            std::unique_ptr<unsigned char> data(msg->getBodyBytes());
    
            auto size = msg->getBodyLength();
            flatbuffers::Verifier verifier((uint8_t*)(data), size);
    
            if (Ditto::VerifyDataBuffer(verifier)) {
                // Do something with the buffer
            }
        }
        catch (cms::CMSException& e) {
            e.printStackTrace();
        }
    }