Search code examples
messagingzeromqjeromq

ZeroMq Router silently drops messages


I have a server (ROUTER socket) which binds and allows a single client (DEALER socket) to connect to it. The server then starts to send data.

Ideally, I would like to know when the router reaches its hwm setting and starts to drop messages. I have set the ZMQ_ROUTER_MANDATORY to 1 on the router but that doesn't help either. The router continues to report that the messages are sent even though I deliberatly didn't start the client (isAlive = false, so there is nothing to pull those meesages on the other end).

Am I doing something wrong or is the HWM setting simply unreliable on a ROUTER socket?

I am using jeromq version 0.3.1 with jdk 1.6.0_32 on Windows 7 64-bit

Thanks

public final class SenderSocket implements Runnable{

    private final int total;
    private final int sentHwm;
    private final String address;
    private final Socket sendSocket;
    private final ExecutorService executor;

    private final static String NAME        = SenderSocket.class.getSimpleName( );
    private final static Logger LOGGER      = LoggerFactory.getLogger( NAME );


    public SenderSocket( ZContext context, String address, int sentHwm, int total ){
        this.address        = address;
        this.total          = total;
        this.sentHwm        = sentHwm;
        this.sendSocket     = context.createSocket( ZMQ.ROUTER );
        this.executor       = Executors.newSingleThreadExecutor( );
    }


    public void init( ){

        sendSocket.setSndHWM( sentHwm );
        sendSocket.setRouterMandatory( true );
        sendSocket.bind( address );

        executor.execute( this );
        LOGGER.info("ROUTER configured with HWM {} bound to {}.", sentHwm, address );

    }



    @Override
    public void run( ){         

        for( int i =0; i <total; i++ ){      

            try{

                String item     = new StringBuilder(8).append(i).toString();
                boolean result  = sendSocket.send( item );

                LOGGER.info("SENT>> [{}] [{}]", result, item );

            }catch( ZMQException zmqEx ){

                int errorCode = zmqEx.getErrorCode();

                if( ZError.EHOSTUNREACH == errorCode ){
                    LOGGER.warn("Attempted to send message to but dealer is DOWN!");
                }

                if( ZMQ.Error.ETERM.getCode() == errorCode ){
                    LOGGER.error("Received error code [{}], terminating.");
                    stop();
                }

                LOGGER.error("ZMQException while sending message.", zmqEx);

            }catch( Exception ex ){
                LOGGER.error("Exception while sending message.", ex );
            }

        }

        stop();

    }


    public void stop( ){
        sendSocket.setLinger( 0 );
    }


}

//CLIENT

    public class ReceiverSocket implements Runnable{

        private final int hwm;
        private final String address;
        private final Socket recvSocket;
        private final ExecutorService executor;

        private volatile boolean isAlive;

        private final static String NAME        = ReceiverSocket.class.getSimpleName( );
        private final static Logger LOGGER      = LoggerFactory.getLogger( NAME );


        public ReceiverSocket( ZContext context, String address, int hwm ){
            this.address        = address;
            this.hwm            = hwm;
            this.recvSocket     = context.createSocket( ZMQ.DEALER );
            this.executor       = Executors.newSingleThreadExecutor( );
        }


        public void init( ){

            this.isAlive = false;

            recvSocket.setRcvHWM( hwm );
            recvSocket.connect( address );
            executor.execute( this );

            LOGGER.info("DEALER configured with HWM {} connected to {}.", hwm, address );

        }



        @Override
        public void run( ){         

            Poller poller       = new Poller( 1 );
            poller.register( recvSocket, Poller.POLLIN );

            while(  isAlive ){      

                try{

                    int pollCount       = poller.poll( );

                    if( pollCount == NEGATIVE_ONE ){
                        LOGGER.warn("ERROR! Was the thread interrupted?", pollCount );
                        isAlive = false;
                        return;
                    }

                    if( poller.pollin( ZERO ) ){
                        String data = recvSocket.recvStr( );
                        LOGGER.info("RECVD >> {} {}", data, NEWLINE );
                    }

                }catch( Exception e ){
                    LOGGER.error("Exception while receving message.", e);
                }

            }

        }


        public void stop( ){
            recvSocket.setLinger( 0 );
            LOGGER.info("{} Stopped!", NAME );
        }
}

//MAIN

public static void main( String[ ] args ) throws InterruptedException{

        int recvHwm          = 5;
        int sentHwm          = 5;
        int totalSent        = 5000;
        String address       = "tcp://*:20000";
        ZContext context     = new ZContext( 1 );

        ReceiverSocket recvr = new ReceiverSocket( context, address, recvHwm );
        SenderSocket sender  = new SenderSocket( context, address, sentHwm, totalSent );

        recvr.init();
        Thread.sleep( 1000 );

        sender.init();

    }

Solution

  • Router mandatory and high water mark have nothing to do with each other.

    I have set the ZMQ_ROUTER_MANDATORY to 1 on the router but that doesn't help either. The router continues to report that the messages are sent even though I deliberatly didn't start the client

    Router won't raise an exception even if no peers are connected to it unless you address the message for a specific client id.

    //#1 no exception raised here, message dropped silently
    rtr.setRouterMandatory(true)
    rtr.bind("tcp://*:20000")
    rtr.send("omg!")
    
    //#2 exception raised here
    rtr.setRouterMandatory(true)
    rtr.bind("tcp://*:20000")
    rtr.sendMore("client1")
    rtr.sendMore("")
    rtr.send("omg!")
    

    Code sample #2 throws exception because you're telling router to send "omg" to a peer with identity client1. Router sockets track all connections by assigning a random identity to each connected peer. If the router has no connection for client1, or, if client1 previously disconnected, router will raise an exception in both cases.

    You can assign an identity on the client to override the routers random identity assignment:

    client.setIdentity("client1".getBytes())
    client.connect("tcp://*:20000")
    

    The above code stops the router socket from throwing an exception in sample #2

    I suggest reading up on this, it explains message addressing and enveloping; understanding how it works is essential to using router sockets.