Search code examples
hazelcasthazelcast-imaphazelcast-jet

Sources::jmsQueueBuilder for hazelcast jet


I was trying to use Sources.jmsQueueBuilder from hazelcast, and while it works, I would like to be able to hook into it a bit more.

Specifically, I would like to be able to customize the jakarta.jms.Session of it. For example, I can create a custom connection, since such a method exists:

connectionFn (connectionFactory to connection)

or, another example, is that consumerFn exists, so I can create a custom MessageConsumer from a Session

What is missing, is a Connection to Session mapper. I've looked at the implementation of com.hazelcast.jet.impl.connector.StreamJmsP and it directly creates this via:

    @Override
    protected void init(@Nonnull Context context) throws JMSException {
        session = connection.createSession(guarantee != NONE, DUPS_OK_ACKNOWLEDGE);
        consumer = consumerFn.apply(session);
    }

So it does not seem like its possible at all.


My specific case, is that when I am consuming a message from rabbit-mq via jms (using Sources::jmsQueueBuilder), I am doing some other pipeline work on it using the hazelcast-jet, for example:


StreamSource<Message> jmsQueueSource = Sources.jmsQueueBuilder ....
pipeline.readFrom(jmsQueueSource)
        .withoutTimestamps()
        .map(...) // some mapping function
        . /// other operations
        .writeTo(Sinks.map("MY_IMAP_NAME")) 

This MY_IMAP_NAME is backed by a DB table, using MapStore/MapLoad and now the issue I have is that I want to acknowledge the JMS message, only after it was saved to the DB. As the very first step, I would like to be able to change the

connection.createSession(guarantee != NONE, DUPS_OK_ACKNOWLEDGE);

to a session that uses CLIENT_ACKNOWLEDGE.

UPDATE-1


What I ended up doing is something along the lines of :

  • first, I did enable offload: false in the map-store settings

  • then create the source pipeline by hand:

StreamSource<Message> sourceStage = SourceBuilder.stream("streamer", context -> {
         
      RMQConnectionFactory connectionFactory = ...
      RMQConnection connection = (RMQConnection) connectionFactory.createConnection();
      connection.start();
      RMQSession session = (RMQSession) connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
      // return come context that has the `RMQSession`
      
}).<Message>fillBufferFn((context, buf) -> {
      Message message = context.consumer().receiveNoWait();
      if (message != null) {
        buf.add(message);
      }
      else {
        // sleep for some seconds
      }
    }).destroyFn((context -> {
      context.consumer().close();
      context.session().close();
      context.connection().close();
    })).build();
  • then I create my pipeline:
 pipeline.readFrom(sourceStage)
            .withoutTimestamps()
            .map(m -> map(m)) // this creates a record with JMSMessage and the json that it contains
            .mapUsingService(ServiceFactories.iMapService("MY_IMAP_NAME"), (iMap, entry) -> {
                 // get json from record
                 // store json in the iMap
                 // get jms from record
                 // ack jms
            })
            .writeTo(Sinks.logger());

UPDATE-2

The answer is indeed correct and that can not be achieved


Solution

  • Each stage of a Hazelcast Jet job may run on any cluster member. For example, the readFrom() stage may run on node1, while the map() stage may run on node2.

    The stages are connected via queues, which makes the pipeline distributed and asynchronous by nature.

    Distributed Environment

    1. Data Serialization:
      The payload between stages is serialized for network transfer. This means non-serializable objects, such as a Message from a JMS source, cannot be directly passed between stages.
    2. Projection of Payload:
      When using a JMS source, you must project the payload (e.g., the message content) in the source stage and pass it to subsequent stages. As a result, the next stage only processes the extracted payload and has no access to the original JMS interface. This makes it impossible to acknowledge the message at later stages.
    3. Automatic Acknowledgment:
      Since the pipeline is asynchronous and distributed, the JMS source automatically acknowledges the message after reading it. This behavior ensures that the pipeline progresses without requiring user intervention for acknowledgment.
    4. What This Means:
      Delayed acknowledgment (acknowledging a message after it has been processed by subsequent stages) is not possible in a distributed Hazelcast Jet environment.

    Embedded Environment
    If you are running Hazelcast Jet in an embedded environment (a single-node setup), the constraints of distribution are no longer present. Here's how you can handle acknowledgment:

    1. Passing the JMS Object:
      You can pass the entire JMS Message object between stages, as there is no need for network serialization in a single-node setup.

    2. Custom Sink for Acknowledgment:
      Use a custom sink created with the Sinks.fromProcessor() method. In this custom processor, you can call the message.acknowledge() method to manually acknowledge the message after processing.