Search code examples
tcpjmslistenernetty

JMS listener with netty TCP


I'm trying to develop Netty using TCP. I am using the IBM MQ client to connect to the MQ broker, and the idea is I need to develop a TCP server that receives a message passes it to MQ and if the server responds send it to the client that sent the request. Therefore, I need to implement a JMS listener for async message. The problem is that the JMS listener is outside of the Netty channel and I'm trying to figure out how to read the message add it to a Netty channel and send it immediately to the client connected to TCP socket. I can send messages perfectly. The problem is when the server responds. I receive the message, get the context/channel from the clientConnectionProvider and I writeAndFlush, but I don't see the message arrive at the client.

I create the listener in the main class.

public class Main {

    private final Integer port;

    private final Destination sendDestination;
    private final JMSContext jmsSendContext;

    private final JMSConsumer consumer;
    private final JMSContext jmsRecieveContext;
    private final Destination consumerDestination;

    private final ClientConnectionProvider clientConnectionProvider;

    public Main(Properties properties)
            throws JMSException {
        
            if (properties.containsKey(ConfigurationEnum.SERVER_PORT) {
                this.port = properties.getProperty(ConfigurationEnum.SERVER_PORT)
            } else {
                log.error("server.port not defined in properties"
                throw new ConfigException(
                        String.format("server.port not defined in properties");
            }

        JmsFactoryFactory ff = JmsFactoryFactory.getInstance(JmsConstants.WMQ_PROVIDER);
        JmsConnectionFactory cf = ff.createConnectionFactory();

        // Set the properties
        cf.setStringProperty(CommonConstants.WMQ_HOST_NAME,
                properties.getProperty(ConfigurationEnum.IBM_MQ_HOST.getValue()));
        cf.setIntProperty(CommonConstants.WMQ_PORT,
                Integer.parseInt(properties.getProperty(ConfigurationEnum.IBM_MQ_PORT.getValue())));
        cf.setStringProperty(CommonConstants.WMQ_CHANNEL,
                properties.getProperty(ConfigurationEnum.IBM_MQ_CHANNEL.getValue()));
        cf.setIntProperty(CommonConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
        cf.setStringProperty(CommonConstants.WMQ_QUEUE_MANAGER,
                properties.getProperty(ConfigurationEnum.IBM_QUEUE_MANAGER.getValue()));
        cf.setStringProperty(CommonConstants.WMQ_APPLICATIONNAME, "FIX Orchestra Gateway");
        cf.setBooleanProperty(JmsConstants.USER_AUTHENTICATION_MQCSP, true);
        cf.setStringProperty(JmsConstants.USERID, properties.getProperty(ConfigurationEnum.IBM_APP_USER.getValue()));
        cf.setStringProperty(JmsConstants.PASSWORD, properties.getProperty(ConfigurationEnum.IBM_APP_PASS.getValue()));

        clientConnectionProvider = new ClientConnectionProvider();
        
        jmsRecieveContext = cf.createContext();
        consumerDestination = jmsRecieveContext
                .createQueue(properties.getProperty(ConfigurationEnum.IBM_QUEUE_CONSUMER.getValue()));
        consumer = jmsRecieveContext.createConsumer(consumerDestination);
        consumer.setMessageListener(new JMSMessageListener(clientConnectionProvider));
        jmsRecieveContext.start();

        jmsSendContext = cf.createContext();
        sendDestination = jmsSendContext
                .createQueue(properties.getProperty(ConfigurationEnum.IBM_QUEUE_TRANSACTION.getValue()));

    }

public void start() throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(10);

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100).option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new DefaultChannelInitializer());

            // Start the server.
            ChannelFuture f = serverBootstrap.bind(port).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
            jmsRecieveContext.stop();
            jmsRecieveContext.close();
            jmsSendContext.close();
        }
    }

    public static void main(String[] args) throws InterruptedException {

        Properties properties = new Properties();

        try (InputStream inputStream = new FileInputStream(args[0])) {
            properties.load(inputStream);

            new Main(properties).start();

        } catch (FileNotFoundException e) {
            log.error("Properties file specified in path {} was not found.", args[0], e);
        } catch (IOException e) {
            log.error("There was an IO error.", e);
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ConfigException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

The listener is a simple class.

@AllArgsConstructor
public class JMSMessageListener implements MessageListener {

    private final ClientConnectionProvider clientConnectionProvider;

    @Override
    public void onMessage(Message message) {

        try {
            String messageString = message.getBody(String.class);

            if (clientConnectionProvider.contains(ClientID.get(messageString))) {
                ClientConnection cc = clientConnectionProvider.getConnection(ClientID.get(messageString));
                if (cc.getCtx() == null) {
                    // TODO: Need to save message when client reconects
                } else {
                    cc.getCtx().channel().write(messageString);
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

Solution

  • You should call writeAndFlush(...) and attach a ChannelFutureListener to the ChannelFuture returned to it. In the listener you can check if the write did succeed or fail (and if so print the exception). In your current code you only call write(...) which only put the message in the outboundbuffer of the Channel but not actually flush it to the socket.