I was trying to use Sources.jmsQueueBuilder
from hazelcast, and while it works, I would like to be able to hook into it a bit more.
Specifically, I would like to be able to customize the jakarta.jms.Session
of it. For example, I can create a custom connection, since such a method exists:
connectionFn (connectionFactory to connection)
or, another example, is that consumerFn
exists, so I can create a custom MessageConsumer
from a Session
What is missing, is a Connection
to Session
mapper. I've looked at the implementation of com.hazelcast.jet.impl.connector.StreamJmsP
and it directly creates this via:
@Override
protected void init(@Nonnull Context context) throws JMSException {
session = connection.createSession(guarantee != NONE, DUPS_OK_ACKNOWLEDGE);
consumer = consumerFn.apply(session);
}
So it does not seem like its possible at all.
My specific case, is that when I am consuming a message from rabbit-mq via jms (using Sources::jmsQueueBuilder
), I am doing some other pipeline work on it using the hazelcast-jet, for example:
StreamSource<Message> jmsQueueSource = Sources.jmsQueueBuilder ....
pipeline.readFrom(jmsQueueSource)
.withoutTimestamps()
.map(...) // some mapping function
. /// other operations
.writeTo(Sinks.map("MY_IMAP_NAME"))
This MY_IMAP_NAME
is backed by a DB table, using MapStore/MapLoad
and now the issue I have is that I want to acknowledge the JMS message, only after it was saved to the DB. As the very first step, I would like to be able to change the
connection.createSession(guarantee != NONE, DUPS_OK_ACKNOWLEDGE);
to a session that uses CLIENT_ACKNOWLEDGE
.
UPDATE-1
What I ended up doing is something along the lines of :
first, I did enable offload: false
in the map-store settings
then create the source pipeline by hand:
StreamSource<Message> sourceStage = SourceBuilder.stream("streamer", context -> {
RMQConnectionFactory connectionFactory = ...
RMQConnection connection = (RMQConnection) connectionFactory.createConnection();
connection.start();
RMQSession session = (RMQSession) connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// return come context that has the `RMQSession`
}).<Message>fillBufferFn((context, buf) -> {
Message message = context.consumer().receiveNoWait();
if (message != null) {
buf.add(message);
}
else {
// sleep for some seconds
}
}).destroyFn((context -> {
context.consumer().close();
context.session().close();
context.connection().close();
})).build();
pipeline.readFrom(sourceStage)
.withoutTimestamps()
.map(m -> map(m)) // this creates a record with JMSMessage and the json that it contains
.mapUsingService(ServiceFactories.iMapService("MY_IMAP_NAME"), (iMap, entry) -> {
// get json from record
// store json in the iMap
// get jms from record
// ack jms
})
.writeTo(Sinks.logger());
UPDATE-2
The answer is indeed correct and that can not be achieved
Each stage of a Hazelcast Jet job may run on any cluster member. For example, the readFrom() stage may run on node1, while the map() stage may run on node2.
The stages are connected via queues, which makes the pipeline distributed and asynchronous by nature.
Distributed Environment
Embedded Environment
If you are running Hazelcast Jet in an embedded environment (a single-node setup), the constraints of distribution are no longer present. Here's how you can handle acknowledgment:
Passing the JMS Object:
You can pass the entire JMS Message object between stages, as there is no need for network serialization in a single-node setup.
Custom Sink for Acknowledgment:
Use a custom sink created with the Sinks.fromProcessor() method. In this custom processor, you can call the message.acknowledge() method to manually acknowledge the message after processing.