Search code examples
websphere-7jms-topic

JMS - Using Topics on WebSphere Application Server 7


I have a sender and a receiver for a particular Topic. I'm running the sender and receiver as servlets on WAS 7.0. The Topic and Topic Connection Factory is being setup on WAS. But I'm not able to receive the message that is send. It works fine when I try with Queue instead of Topic.

Below is the code that I use.

public class CommonServlet extends HttpServlet{

    private static final long serialVersionUID = 1L;
    private static boolean initialized = false;
    private static InitialContext initialContext = null;
    private ConnectionFactory connectionFactory = null;
    private Destination destination = null;

    protected final void initialize(){
        try{
            if( initialized == false ){
                // Get JNDI initial context
                initialContext = new InitialContext();

                // Flag as initialized
                initialized = true;
            }
        }
        catch( Throwable t ){
            System.out.println( t.getMessage() );
        }
    }

    /**
     * @return
     */
    protected final Destination getDestination(){
        if( destination != null ){
            return destination;
        }

        try{
            destination = (Destination) initialContext.lookup("jms/TestTopic" );
        }
        catch( NamingException e ){
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return destination;
    }

    /**
     * @return
     */
    protected final ConnectionFactory getConnectionFactory(){
        try{
            connectionFactory = (ConnectionFactory) initialContext.lookup("jms/TestTopicCF" );
        }
        catch( NamingException e ){
            e.printStackTrace();
        }
        return connectionFactory;
    }
}

Sender servlet class

public class Sender extends CommonServlet{

    private static final long serialVersionUID = 1L;

    @Override
    protected void doGet( HttpServletRequest req, HttpServletResponse resp ) throws ServletException, IOException{
        System.out.println("inside do get of sender");
        doPost( req, resp );
    }

    @Override
    protected void doPost( HttpServletRequest req, HttpServletResponse resp ) throws ServletException, IOException{
        String message = req.getParameter( "message" );
        sendMessage( message );
    }

    private void sendMessage( String messageText ){
        initialize();
        try{
            ConnectionFactory factory = getConnectionFactory();

            Destination destination = getDestination();

            Connection connection = factory.createConnection();
            connection.start();

            Session session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE );

            MessageProducer sender = session.createProducer( destination );

            TextMessage txtMsg = session.createTextMessage( messageText );

            sender.send( txtMsg );
        }
        catch( Exception ex ){
            ex.printStackTrace();
        }
    }
}

Receiver servlet class

public class Receiver extends CommonServlet{

    private static final long serialVersionUID = 1L;


    @Override
    protected void doGet( HttpServletRequest req, HttpServletResponse resp ) throws ServletException, IOException{
        doPost( req, resp );
    }

    @Override
    protected void doPost( HttpServletRequest req, HttpServletResponse resp ) throws ServletException, IOException{
        getMessage();
    }

    private void getMessage(){
        initialize();
        ConnectionFactory factory = getConnectionFactory();

        Destination destination = getDestination();

        try{
            Connection connection = factory.createConnection();

            connection.start();

            Session session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE );

            MessageConsumer receiver = session.createConsumer( destination );

            Message message = receiver.receive( 4000 );

            System.out.println( message );//COMING AS NULL
        }
        catch( JMSException e ){
            e.printStackTrace();
        }

    }

}

Both TestTopic and TestTopicCF is configured in WAS Administration console under Resources > JMS > Topic Connection Factory and Topics section. There are no exceptions while running the application. It works fine if I use the Queue and Queue Connection factory that is created. Is there something that I need to do specifically to get the Topic working?


Solution

  • Topics are different destinations than queues, by default they not persist messages and subscriber has to be connected, when the publisher sends the message. See some details here

    Publishers and subscribers have a timing dependency. A client that subscribes to a topic can consume only messages published after the client has created a subscription, and the subscriber must continue to be active in order for it to consume messages.

    So in short:

    • your current design is not correct for topics, you would have to first call the receiver servlet, have there very long receive timeout, and in second window try the sender servlet, as your message now is just lost.
    • much better approach would be to use MDB as message receiver instead of servlet
    • if you need to receive messages sends to topic while subscriber is inactive, you will need to configure durable subscriber and configure topic to be durable in WAS.