Search code examples
javajmsibm-mqmessaging

How do JMS transactions work with concurrent consumers?


I have a queue of messages which need sequential processing. Now this processing involves calling a Web service (which might be down sometimes) so I have to make the message retrieval transactional. As far as I know, when there's any exception midway, the whole transaction rolls back and the message isn't lost right?

But what's also needed is high availability (HA) on the message consumer, so I have two instances of the listener listening in on my queue. Now will the transaction ensure that a second message isn't retrieved by the other instance of the listener until the first one is completely done processing the first message? Or will I have to do something more to make sure that no message is sent out of the queue until the one before that is fully done processing.

If any additional configuration is needed, would it be in the MQ or on the listener?


I'm using WebSphere MQ as the message broker and Spring integration for retrieving the messages.

With the token thing, the first concern would be high availability on the queue manager itself. The queue which holds this token has to be part of some queue manager. Now, if we have a failover, that controls the queue will no longer be accessible. Which kind of means that that we need another control queue ready in case of a failover.

We can't have listeners listening in on that DR control queue during normal operations though. (Let's say we have a mechanism to actually make sure that the "data" queue is perfectly replicated). The listener instances should know that a failover has been initiated, so that it can stop listening to the control queue during normal operations and switch over to the secondary. I can't do this using the listener instance alone. The actual producer which puts messages into the queue will have to notify the listener instances to stop listening to the normal operations control queue and switch over to the secondary.

This would be kind of tricky if there's any intermediate connection problem (and the normal operations queue manager isn't really down), but that's too much of a corner case.

With high availability of the control queue taken care of, we kind of have the same problem as the nonsharable setting during low load scenarios. Now we have occasional spikes in load, but there are slump periods (during the night and stuff). This token system is not really reactive, right? It's more of periodic thing. So let's say we don't get any messages for a few hours. The listeners will still be constantly checking the queue because the token message keeps triggering one instance after another. Which more or less makes it a poller, really. I might as well have multiple listener instances each polling at like different times of the hour, right? It's not really event driven per se.

Third would really be the question of inserting the token message. During first install or during a failback, we'll have that extra manual step of manually inserting this token (since the token would be lost in failover sometimes). We can't really make one of the listener instances do it since if a listener instance doesn't find the message it kind of means that some other listener instance has the token. So this logic has to be separate. And if we actually put some meaningful information into this token message, it has to be a utility that has to be triggered rather than an insertion through the UI.

I guess the first and third aren't really problems, but just extra overhead which wouldn't be needed if we went for a poller implementation. The second one is what's bothering me most.


Solution

  • You need to be passing tokens. Here's how that works:

    First, create a second queue and place a single message into it. Now start up each program with the following logic.

    1. Get the token message off the token queue under syncpoint using an unlimited or long wait interval and the FAIL_IF_QUIESCING option.
    2. Put the token message back on the token queue in the same UOW.
    3. Get the next message off of the application queue under the same UOW.
    4. Process the application's message normally.
    5. Commit the UOW.

    You can use as many application instances as you want. You will see one input handle on each of the two queues for each application instance. No application instance will have to handle errors due to exclusive use of a queue.

    Since there is only one token message and only one application can hold it under syncpoint at a time, only one of the applications can be actively processing an application message. Since the GET off the application queue is dependent on a successful GET off the token queue, all application messages are processed in strict sequence.

    Note: The application will process the application messages with as many concurrent threads as there are token messages on the token queue. If anyone ever adds another token message to the token queue, strict sequence processing is lost. For this reason read/write access to that queue must not be granted to anyone other than the application service account. Also, it is common for that token message to be structured so that the application can recognize it. If a stray unrelated message lands there the application should ignore it and throw a warning.

    You will see a fairly even distribution of messages between the two applications. If you use more than two applications you might see a wildly uneven distribution because queue handles are managed in a stack. As an instance commits its UOW the next instance's handle is at the top of the stack so it gets the next message. While it is handling that message the instance that just committed will have it's GET placed on top of the stack. If you have 3 or more listening instances chances are only the first two will see any traffic.

    This assures that messages are processed off the queue one at a time. It does not assure that you won't get dupes.

    If you do everything under syncpoint, no messages will ever be lost. However there's a scenario in which a message is retrieved and processed, then the COMMIT call fails. In that case the transaction is rolled back and the same message becomes available again. If you are using 1-phase commits and not XA, the processing for that message will not be rolled back.

    The nice thing is that the token message will be under syncpoint too and that fixes the issue in which an orphaned client channel takes a while to release the transaction. A new connection will get messages which are older than the message held under syncpoint by the orphan transaction. Eventually the channel agent times out releasing the message back to the queue but effectively changing its position to be behind any messages that were processed while it was sequestered.

    In this scenario the token message is also sequestered so after this type of connection loss message processing temporarily stops and waits for the channel agent to time out. If that were ever to happen, just issue a STOP CHANNEL command on the instance with the UOW.


    Update based on additional question details specific to this answer

    The queue which holds this token has to be part of some queue manager. Now if we have a failover, that control queue will no longer be accessible. Which kinda means that that we need another control queue ready in case of a failover.

    The token queue is as available or as unavailable as the application queue. Only one is needed. If the application requires high availability (HA), then a Multi-Instance QMgr or a hardware high availability cluster should be used. These share disk so the QMgr that comes up in the failover is the same one the application has been connected to, just at a different physical location.

    If the application needs DR, it's possible to replicate the disk under the QMgr's logs and data directories to a DR site. However, nothing should be listening on those instances while processing is going on in the primary data center.

    The listener instances should know that a failover has initiated so that it can stop listening to the control queue during normal ops and switch over to the secondary. I can't do this using the listener instance alone.

    Why not? WebSphere MQ has had reconnectable clients for a really long time and the multi-instance features in v7.0.1 made reconnecting drop-dead simple. As an administrator, your job is to make sure that no more than one instance of the application and token (not trigger!) queue are available. During an outage, the client goes into retry without requiring any application code to drive it. It finds whichever of the instances is up and connects.

    The actual producer which puts messages into the queue will have to notify the listener instances to stop listening to the normal ops control queue and switch over to the secondary.

    The question was about serialization with concurrent consumers. This is about a design in which producers and consumers have to rendezvous at a common location. Different problem that happens to overlap this one only in that it is complicated somewhat by serialization. Ask a different question if you need to explore topologies.

    This token system is not really reactive right? It's more of periodic thing. So let's say we don't get any messages for a few hours. The listeners will still be constantly checking the queue coz the token message keeps triggering one instance after another.

    This does not use triggering. It uses a token (not trigger!) message the way a filesystem or database uses a lock in order to facilitate serialization. Whichever listener gets the token message then does a get with unlimited wait on the application queue. The other listeners have a get with unlimited wait on the token (not trigger!) queue. Basically, they sit around idle until a message arrives. Zero reconnections, zero polls, zero CPU cycles. If you need to know they are alive, let them time out on the application queue once in a while. This rolls back their UOW on the token queue which passes the token to another listener.

    Third would really be the question of inserting the token message. During first install or during a failback, we'll have that extra manual step of manually inserting this token (since the token would be lost in failover sometimes).

    Why? Do you experience MQ losing persistent messages under syncpoint often? If so you are doing it wrong. ;-) In a situation with strict serialization requirements there can be only one active instance of a queue. If for some reason there are other instances of the application queue pre-defined other than through disk replication there would be one instance of the token (not trigger!) queue also predefined alongside it and one token (not trigger!) message waiting in each queue. Surely nobody would do such a thing in the face of strict message order requirements, but if they did those queues would surely be get-disabled while not in use.

    We can't really make one of the listener instances do it since if a listener instance doesn't find the message it kinda means that some other listener instance has the token.

    Correct. The listeners could check queue depth, transactions, input handles, etc, but it is generally wise to avoid mixing application logic with control logic.

    So this logic has to be separate. And if we actually put some meaningful info into this token message, it has to be a utility that has to be triggered rather than an insertion through the UI.

    Why? Your coders handle structured data in application messages, right? If this is perceived to be significantly more difficult, someone's doing it wrong. ;-) Write an instance of a formatted token (not trigger!) message to a queue, then offload that to a file. When you need to reinitialize the queue, use Q or QLoad to first clear the queue, then load the file into it. That utility would be the one to open the queue for exclusive use, check for depth, check for handles, etc. prior to performing its magic. When I do this for consulting clients I typically define a service that initializes the queue on MQ startup and also provide a feature in the application GUI for the operations and support staff. So long as the application managing the token (not trigger) queue gets it for exclusive access during these operations it really doesn't matter how it's done or by how many instances of the control application.

    As a rule I also use the structure in the message to send commands to the listeners. There's the real token message, and then there's messages that command the application instances to do things. For example, it's really nice to have a non-transactional "ping" capability. I if I drop more ping messages on the token (not trigger!) queue in a single UOW than I have application instances listening, I am guaranteed to contact all of them. In this way I can detect zombies. Depending on how much instrumentation is required the listeners can react to the ping by providing a status (uptime, messages processed, etc.) in the log, to the console, to an event queue, etc.

    I guess the first and third aren't really problems, but just extra overhead which wouldn't be needed if we went for a poller implementation.

    That's good because this is all pretty standard stuff. The problem lies mainly with the requirements for serialization conflicting with those for HA/DR. What you are looking for is global transactional atomicity to implement a single logical queue across multiple physical locations. IBM MQ has never attempted to provide that, although the WebSphere Application Server (WAS) Messaging Engine has. The closest IBM MQ comes is to use two IBM MQ appliances with memory-to-memory replication of message and transaction data but that is good to only a few miles before light-speed latency begins to significantly impact throughput. It doesn't handle your DR needs. In fact, nothing short of synchronous replication does that if you want a zero recovery point at the DR datacenter.