Search code examples
javaintellij-ideadata-distribution-servicerti-dds

Java DDS Unbounded Sequence Causes Out Of Memory Error


I am new to DDS, and trying to write a simple Java program in Intellij-IDEA that consists of 3 parts:

  1. Client Simulator that sends data.
  2. My program simulator that receive data from the client, manipulate it and sends it back to the client.
  3. Client Simulator that reads the manipulated data.

All the data that I am trying to send in my example is a simple String.

I am using RTI Code Gen to auto-generate most of the code.

Without and unboundedSupport flag (the string is limited to 255 characters) everything worked just fine. However, when applying the unboundedSupport flag, I am getting the following out-of-memory error:

java.lang.OutOfMemoryError: Java heap space
    at com.rti.dds.cdr.CdrBuffer.<init>(Unknown Source)
    at com.rti.dds.cdr.CdrOutputStream.<init>(Unknown Source)
    at com.rti.dds.cdr.CdrOutputStream.<init>(Unknown Source)
    at com.rti.dds.cdr.CdrOutputStream.<init>(Unknown Source)
    at com.rti.dds.infrastructure.EntityImpl.DDS_Entity_enable(Native Method)
    at com.rti.dds.infrastructure.EntityImpl.enable(Unknown Source)
    at com.rti.dds.infrastructure.NativeFactoryMixin.create_entityI(Unknown Source)
    at com.rti.dds.subscription.SubscriberImpl.create_datareader(Unknown Source)
    at json_dds.JsonMessageSubscriber.<init>(JsonMessageSubscriber.java:71)
    at results_consumers.ResultsConsumersMain.main(ResultsConsumersMain.java:10)
create_datareader error

I am activating the client simulator that reads data first.

This is my .idl file:

struct JsonMessage {
    string msg;
};

This is my main program (line 10 is the initialization of subscriber1):

public static void main(String... args) {
    ClientResultsConsumer clientResultsConsumer = new ClientResultsConsumer();
    JsonMessageSubscriber subscriber1 = new JsonMessageSubscriber(0, clientResultsConsumer,
                                                                               Topics.CLIENT_TOPIC_OUTPUT_1);
    subscriber1.consume();
    ClientResultsConsumer2 clientResultsConsumer2 = new ClientResultsConsumer2();
    JsonMessageSubscriber subscriber2 = new JsonMessageSubscriber(0, clientResultsConsumer2,
                                                                               Topics.CLIENT_TOPIC_OUTPUT_1);
    subscriber2.consume();
    ClientResultsConsumer3 clientResultsConsumer3 = new ClientResultsConsumer3();
    JsonMessageSubscriber subscriber3 =
        new JsonMessageSubscriber(0, clientResultsConsumer3, Topics.CLIENT_TOPIC_OUTPUT_2);
    subscriber3.consume();
  }

This is my ClientResultsConsumer class:

public class ClientResultsConsumer implements Consumer {

  @Override
  public void consume(String msg) {
    System.out.println("Client results consumer got " + msg);
  }
}

This is my JsonMessageSubscriber class (line 71 is subscriber.create_datareader():

public class JsonMessageSubscriber implements DataConsumer {

  ExecutorService executor = Executors.newSingleThreadExecutor();

  public JsonMessageSubscriber(int domainId, Consumer consumer, String topicName) {

    DomainParticipant participant = DomainParticipantFactory.TheParticipantFactory
        .create_participant(domainId,
                            DomainParticipantFactory.PARTICIPANT_QOS_DEFAULT,
                            null /* listener */,
                            StatusKind.STATUS_MASK_NONE);
    if (participant == null) {
      System.err.println("create_participant error\n");
      System.exit(-1);
    }

    // --- Create subscriber --- //

            /* To customize subscriber QoS, use
            the configuration file USER_QOS_PROFILES.xml */

    Subscriber subscriber = participant.create_subscriber(
        DomainParticipant.SUBSCRIBER_QOS_DEFAULT, null /* listener */,
        StatusKind.STATUS_MASK_NONE);
    if (subscriber == null) {
      System.err.println("create_subscriber error\n");
      System.exit(-1);
    }

    // --- Create topic --- //

    /* Register type before creating topic */
    String typeName = JsonMessageTypeSupport.get_type_name();
    JsonMessageTypeSupport.register_type(participant, typeName);

            /* To customize topic QoS, use
            the configuration file USER_QOS_PROFILES.xml */

    Topic topic = participant.create_topic(
        topicName,
        typeName, DomainParticipant.TOPIC_QOS_DEFAULT,
        null /* listener */, StatusKind.STATUS_MASK_NONE);
    if (topic == null) {
      System.err.println("create_topic error\n");
      System.exit(-1);
    }

    // --- Create reader --- //

    DataReaderListener listener = new JsonMessageListener(consumer);

            /* To customize data reader QoS, use
            the configuration file USER_QOS_PROFILES.xml */

    JsonMessageDataReader reader = (JsonMessageDataReader)
        subscriber.create_datareader(
            topic, Subscriber.DATAREADER_QOS_DEFAULT, listener,
            StatusKind.STATUS_MASK_ALL);
    if (reader == null) {
      System.err.println("create_datareader error\n");
      System.exit(-1);
    }
  }

  // -----------------------------------------------------------------------

  @Override
  public void consume() {
    final long scanTimeMillis = 1000;
    Runnable task = () -> {
      while (true) {
        try {
          TimeUnit.MILLISECONDS.sleep(scanTimeMillis);
        } catch (Exception e) {
          System.err.println(e.getMessage());
        }
      }
    };
    executor.submit(task);
  }
}

Unfortunately, I didn't find a solution to that except limiting the sequence size, but I understood that limiting it to a large enough number will solve my problem, it will also require a lot of memory, and I would rather it not taking more than the minimum required for each message.

Any help will be appreciated, Thanks


Solution

  • I managed to solve the problem using the example here

    All it took was passing the auto generated qos file path to the subscriber/publisher constructor, and than writing these lines before initializing the domain participant (this is different than the example provided in the link above, the provided example did not work for me):

    DomainParticipantFactoryQos factoryQos = new DomainParticipantFactoryQos();
    DomainParticipantFactory.TheParticipantFactory.get_qos(factoryQos);
    factoryQos.profile.url_profile.add(0, qosPolicyPath);
    factoryQos.profile.url_profile.setMaximum(1);
    DomainParticipantFactory.TheParticipantFactory.set_qos(factoryQos);