Search code examples

ActiveMQ C++ Synchronous Client

I have the following code from apache's svn. As you can see this is an asynchronous client. What i want is a synchronous client, so that clients, can ask my consumer "can i get a message please", and my consumer "certainly, sir, here it is". So i dont need aysnchronous client but I cant find a synchronous example, and when i code I get segmentation fail all the time.

If you look at this code, there is a method called onMessage, this is due to listener, and i cant even return message there. Cant change the return type.

Question is, how can i make this example class, synchronous, without getting segmentation fails. the question with segmentation fails are here..

 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.

#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Integer.h>
#include <activemq/util/Config.h>
#include <decaf/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>

using namespace activemq;
using namespace activemq::core;
using namespace activemq::transport;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;

class SimpleAsyncConsumer : public ExceptionListener,
                            public MessageListener,
                            public DefaultTransportListener {

    Connection* connection;
    Session* session;
    Destination* destination;
    MessageConsumer* consumer;
    bool useTopic;
    std::string brokerURI;
    std::string destURI;
    bool clientAck;


    SimpleAsyncConsumer( const SimpleAsyncConsumer& );
    SimpleAsyncConsumer& operator= ( const SimpleAsyncConsumer& );


    SimpleAsyncConsumer( const std::string& brokerURI,
                         const std::string& destURI,
                         bool useTopic = false,
                         bool clientAck = false ) :
        clientAck(clientAck) {

    virtual ~SimpleAsyncConsumer() throw() {

    void close() {

    void runConsumer() {

        try {

            // Create a ConnectionFactory
            ActiveMQConnectionFactory* connectionFactory =
                new ActiveMQConnectionFactory( brokerURI );

            // Create a Connection
            connection = connectionFactory->createConnection();
            delete connectionFactory;

            ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>( connection );
            if( amqConnection != NULL ) {
                amqConnection->addTransportListener( this );



            // Create a Session
            if( clientAck ) {
                session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
            } else {
                session = connection->createSession( Session::AUTO_ACKNOWLEDGE );

            // Create the destination (Topic or Queue)
            if( useTopic ) {
                destination = session->createTopic( destURI );
            } else {
                destination = session->createQueue( destURI );

            // Create a MessageConsumer from the Session to the Topic or Queue
            consumer = session->createConsumer( destination );
            consumer->setMessageListener( this );

        } catch (CMSException& e) {

    // Called from the consumer since this class is a registered MessageListener.
    virtual void onMessage( const Message* message ) throw() {

        static int count = 0;

            const TextMessage* textMessage =
                dynamic_cast< const TextMessage* >( message );
            string text = "";

            if( textMessage != NULL ) {
                text = textMessage->getText();
            } else {
                text = "NOT A TEXTMESSAGE!";

            if( clientAck ) {

            printf( "Message #%d Received: %s\n", count, text.c_str() );
        } catch (CMSException& e) {

    // If something bad happens you see it here as this class is also been
    // registered as an ExceptionListener with the connection.
    virtual void onException( const CMSException& ex AMQCPP_UNUSED ) {
        printf("CMS Exception occurred.  Shutting down client.\n");

    virtual void transportInterrupted() {
        std::cout << "The Connection's Transport has been Interrupted." << std::endl;

    virtual void transportResumed() {
        std::cout << "The Connection's Transport has been Restored." << std::endl;


    void cleanup(){

        // Always close destination, consumers and producers before
        // you destroy their sessions and connection.

        // Destroy resources.
            if( destination != NULL ) delete destination;
        }catch (CMSException& e) {}
        destination = NULL;

            if( consumer != NULL ) delete consumer;
        }catch (CMSException& e) {}
        consumer = NULL;

        // Close open resources.
            if( session != NULL ) session->close();
            if( connection != NULL ) connection->close();
        }catch (CMSException& e) {}

        // Now Destroy them
            if( session != NULL ) delete session;
        }catch (CMSException& e) {}
        session = NULL;

            if( connection != NULL ) delete connection;
        }catch (CMSException& e) {}
        connection = NULL;

int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {


    std::cout << "=====================================================\n";
    std::cout << "Starting the example:" << std::endl;
    std::cout << "-----------------------------------------------------\n";

    // Set the URI to point to the IPAddress of your broker.
    // add any optional params to the url to enable things like
    // tightMarshalling or tcp logging etc.  See the CMS web site for
    // a full list of configuration options.
    // Wire Format Options:
    // =====================
    // Use either stomp or openwire, the default ports are different for each
    // Examples:
    //    tcp://                      default to openwire
    //    tcp://  same as above
    //    tcp://     use stomp instead
    std::string brokerURI =
//        "?wireFormat=openwire"
//        "&connection.useAsyncSend=true"
//        "&transport.commandTracingEnabled=true"
//        "&transport.tcpTracingEnabled=true"
//        "&wireFormat.tightEncodingEnabled=true"

    // This is the Destination Name and URI options.  Use this to
    // customize where the consumer listens, to have the consumer
    // use a topic or queue set the 'useTopics' flag.
    std::string destURI = "TEST.FOO"; //?consumer.prefetchSize=1";

    // set to true to use topics instead of queues
    // Note in the code above that this causes createTopic or
    // createQueue to be used in the consumer.
    bool useTopics = false;

    // set to true if you want the consumer to use client ack mode
    // instead of the default auto ack mode.
    bool clientAck = false;

    // Create the consumer
    SimpleAsyncConsumer consumer( brokerURI, destURI, useTopics, clientAck );

    // Start it up and it will listen forever.

    // Wait to exit.
    std::cout << "Press 'q' to quit" << std::endl;
    while( std::cin.get() != 'q') {}

    // All CMS resources should be closed before the library is shutdown.

    std::cout << "-----------------------------------------------------\n";
    std::cout << "Finished with the example." << std::endl;
    std::cout << "=====================================================\n";



  • From your previous question I see that you found the receive() functionality for the MessageConsumer that's the way to go to create a synchronous receiver. I'm not sure why you are getting the segmentation fault though. A couple of suggestions I can make for that:

    Try setting a clientID not sure if line 129 on the SessionExecutor is the culprit but it's trying to access the consumer id there and maybe the stomp client is not setting one.

    std::string clientID = "someid";
    Connection* conn = connectionFactory->createConnection("", "", clientID);

    Try switching to the openwire protocol as opposed stomp.

    If you take the receive portion out of that code, does it connect to the broker? Sorry if this doesn't exactly answer your question, I'm trying to narrow down the problem.