Search code examples
asynchronousjmsweblogicintegrationibm-mq

How to wrap a JMS to WebSphere MQ bridge in a synchronous call using the request-reply pattern?


I am just dealing with a new scenario for me, which I believe might be common to some :)..

As per requirements I need to build a user experience to be like a synchronous on-line transaction for a web service call, which actually delegates the call to a IBM MQ Series using an asynchronous JMS-MQ Bridge.

The client calls the web service and than his message should be published in a JMS queue on the App server which will be delivered to WebSphere MQ and than after processing a response will delivered back to App server in a FIXED JMS queue endpoint.

The requirement deals with this transaction that will need to time out in case WebSphere MQ does not delivery the response in a defined amount of time, than the web service should send a time-out signal to client and ignore this transaction.

The sketch of the problem follows.

I need to block the request on the web service until the response arrives or time-out.

Than I am looking for some open library to help me on this task. Or the only solution is blocking a thread and keep pooling for the response? Maybe I could implement some block with a listener to be notified when the response arrives?

A bit of discussion would be very helpful for me now to try to clear my ideas on this. Any suggestions?

I have a sketch that I hope will help clearing the picture ;)

alt text


Solution

  • after a couple of days coding I got to a solution for this. I am using standard EJB3 with JAX-WS annotations and Standard JMS.

    The code I have written so far to meet the requirements follows. It is a Stateless Session Bean with bean managed transaction(BMT) as using standart container managed transaction (CMT) was causing some kind of hang on it, I believe because I was trying to put both JMS interactions in the same transaction as they are in the same method so notice I had to start and finish transactions for each interaction with the JMS queues. I am using weblogic for this solution. And I have also coded an MDB which basically consumes the message from queue endpoint jms/Pergunta and places a response message on the jms/Resposta queue I did this to mock the expected behavior on the MQ side of this problem. Actually in a real scenario we would probably have some COBOL application on the mainframe or even other java application dealing with the messages and placing the response on the response queue.

    If someone need to try this code basically all you need is to have a container J2EE5 and configure 2 queues with jndi names: jms/Pergunta and jms/Resposta.

    The EJB/Webservice code:

    @Stateless
    @TransactionManagement(TransactionManagementType.BEAN)
    @WebService(name="DJOWebService")
    public class DJOSessionBeanWS implements DJOSessionBeanWSLocal {
    
        Logger log = Logger.getLogger(DJOSessionBeanWS.class.getName());
    
        @Resource
        SessionContext ejbContext;
    
        // Defines the JMS connection factory.
        public final static String JMS_FACTORY = "weblogic.jms.ConnectionFactory";
    
        // Defines request queue
        public final static String QUEUE_PERG = "jms/Pergunta";
    
        // Defines response queue
        public final static String QUEUE_RESP = "jms/Resposta";
    
    
        Context ctx;
        QueueConnectionFactory qconFactory;
    
        /**
         * Default constructor. 
         */
        public DJOSessionBeanWS() {
            log.info("Construtor DJOSessionBeanWS");
        }
    
        @WebMethod(operationName = "processaMensagem")
        public String processaMensagem(String mensagemEntrada, String idUnica)
        {
            //gets UserTransaction reference as this is a BMT EJB.
            UserTransaction ut = ejbContext.getUserTransaction();
            try {
    
                ctx = new InitialContext();
                //get the factory before any transaction it is a weblogic resource.
                qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
                log.info("Got QueueConnectionFactory");
                ut.begin();
                QueueConnection qcon = qconFactory.createQueueConnection();
                QueueSession qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
                Queue qs = (Queue) (new InitialContext().lookup("jms/Pergunta"));
                TextMessage message = qsession.createTextMessage("this is a request message");
                message.setJMSCorrelationID(idUnica);
                qsession.createSender(qs).send(message);
                ut.commit();
                qcon.close();
                //had to finish and start a new transaction, I decided also get new references for all JMS related objects, not sure if this is REALLY required
                ut.begin();
                QueueConnection queuecon = qconFactory.createQueueConnection();
                Queue qreceive = (Queue) (new InitialContext().lookup("jms/Resposta"));
                QueueSession queuesession = queuecon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
                String messageSelector = "JMSCorrelationID = '" + idUnica + "'";
                //creates que receiver and sets a message selector to get only related message from the response queue.
                        QueueReceiver qr = queuesession.createReceiver(qreceive, messageSelector);
                queuecon.start();
                //sets the timeout to keep waiting for the response...
                TextMessage tresposta = (TextMessage) qr.receive(10000);
                if(tresposta != null)
                {
                    ut.commit();
                    queuecon.close();
                    return(tresposta.toString());
                }
                else{
                    //commints anyway.. does not have a response though 
                    ut.commit();
                    queuecon.close();
                    log.info("null reply, returned by timeout..");
                    return "Got no reponse message.";
                }
    
    
    
            } catch (Exception e) {
                log.severe("Unexpected error occurred ==>> " + e.getMessage());
                e.printStackTrace();
                try {
                    ut.commit();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
                return "Error committing transaction after some other error executing ==> " + e.getMessage();
            } 
    
        }
    }   
    

    And this is the code for the MDB which mocks the MQ side of this problem. I had a Thread.sleep fragment during my tests to simulate and test the timeout on the client side to validate the solution but it is not present in this version.

    /**
     * Mock to get message from request queue and publish a new one on the response queue.
     */
    @MessageDriven(
            activationConfig = { @ActivationConfigProperty(
                    propertyName = "destinationType", propertyValue = "javax.jms.Queue"
            ) }, 
            mappedName = "jms/Pergunta")
    public class ConsomePerguntaPublicaRespostaMDB implements MessageListener {
    
        Logger log = Logger.getLogger(ConsomePerguntaPublicaRespostaMDB.class.getName());
    
        // Defines the JMS connection factory.
        public final static String JMS_FACTORY = "weblogic.jms.ConnectionFactory";
    
        // Define Queue de resposta
        public final static String QUEUE_RESP = "jms/Resposta";
    
    
        Context ctx;
        QueueConnectionFactory qconFactory;
    
    
    
        /**
         * Default constructor. 
         */
        public ConsomePerguntaPublicaRespostaMDB() {
            log.info("Executou construtor ConsomePerguntaPublicaRespostaMDB");
            try {
                ctx = new InitialContext();
            } catch (NamingException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * @see MessageListener#onMessage(Message)
         */
        public void onMessage(Message message) {
            log.info("Recuperou mensagem da fila jms/FilaPergunta, executando ConsomePerguntaPublicaRespostaMDB.onMessage");
            TextMessage tm = (TextMessage) message;
    
            try {
                log.info("Mensagem recebida no onMessage ==>> " + tm.getText());
    
                //pega id da mensagem na fila de pergunta para setar corretamente na fila de resposta.
                 String idMensagem = tm.getJMSCorrelationID();
                 log.info("Id de mensagem que sera usada na resposta ==>> " + idMensagem);
    
                qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
                log.info("Inicializou contexto jndi e deu lookup na QueueConnectionFactory do weblogic com sucesso. Enviando mensagem");
                QueueConnection qcon = qconFactory.createQueueConnection();
                QueueSession qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
                Queue queue = (Queue) (ctx.lookup("jms/Resposta"));
                TextMessage tmessage = qsession.createTextMessage("Mensagem jms para postar na fila de resposta...");
                tmessage.setJMSCorrelationID(idMensagem);
                qsession.createSender(queue).send(tmessage);
            } catch (JMSException e) {
                log.severe("Erro no onMessage ==>> " + e.getMessage());
                e.printStackTrace();
            }  catch (NamingException e) {
                log.severe("Erro no lookup ==>> " + e.getMessage());
                e.printStackTrace();
            }
    
        }
    
    }
    

    []s