I have a server (ROUTER socket) which binds and allows a single client (DEALER socket) to connect to it. The server then starts to send data.
Ideally, I would like to know when the router reaches its hwm setting and starts to drop messages. I have set the ZMQ_ROUTER_MANDATORY to 1 on the router but that doesn't help either. The router continues to report that the messages are sent even though I deliberatly didn't start the client (isAlive = false, so there is nothing to pull those meesages on the other end).
Am I doing something wrong or is the HWM setting simply unreliable on a ROUTER socket?
I am using jeromq version 0.3.1 with jdk 1.6.0_32 on Windows 7 64-bit
Thanks
public final class SenderSocket implements Runnable{
private final int total;
private final int sentHwm;
private final String address;
private final Socket sendSocket;
private final ExecutorService executor;
private final static String NAME = SenderSocket.class.getSimpleName( );
private final static Logger LOGGER = LoggerFactory.getLogger( NAME );
public SenderSocket( ZContext context, String address, int sentHwm, int total ){
this.address = address;
this.total = total;
this.sentHwm = sentHwm;
this.sendSocket = context.createSocket( ZMQ.ROUTER );
this.executor = Executors.newSingleThreadExecutor( );
}
public void init( ){
sendSocket.setSndHWM( sentHwm );
sendSocket.setRouterMandatory( true );
sendSocket.bind( address );
executor.execute( this );
LOGGER.info("ROUTER configured with HWM {} bound to {}.", sentHwm, address );
}
@Override
public void run( ){
for( int i =0; i <total; i++ ){
try{
String item = new StringBuilder(8).append(i).toString();
boolean result = sendSocket.send( item );
LOGGER.info("SENT>> [{}] [{}]", result, item );
}catch( ZMQException zmqEx ){
int errorCode = zmqEx.getErrorCode();
if( ZError.EHOSTUNREACH == errorCode ){
LOGGER.warn("Attempted to send message to but dealer is DOWN!");
}
if( ZMQ.Error.ETERM.getCode() == errorCode ){
LOGGER.error("Received error code [{}], terminating.");
stop();
}
LOGGER.error("ZMQException while sending message.", zmqEx);
}catch( Exception ex ){
LOGGER.error("Exception while sending message.", ex );
}
}
stop();
}
public void stop( ){
sendSocket.setLinger( 0 );
}
}
//CLIENT
public class ReceiverSocket implements Runnable{
private final int hwm;
private final String address;
private final Socket recvSocket;
private final ExecutorService executor;
private volatile boolean isAlive;
private final static String NAME = ReceiverSocket.class.getSimpleName( );
private final static Logger LOGGER = LoggerFactory.getLogger( NAME );
public ReceiverSocket( ZContext context, String address, int hwm ){
this.address = address;
this.hwm = hwm;
this.recvSocket = context.createSocket( ZMQ.DEALER );
this.executor = Executors.newSingleThreadExecutor( );
}
public void init( ){
this.isAlive = false;
recvSocket.setRcvHWM( hwm );
recvSocket.connect( address );
executor.execute( this );
LOGGER.info("DEALER configured with HWM {} connected to {}.", hwm, address );
}
@Override
public void run( ){
Poller poller = new Poller( 1 );
poller.register( recvSocket, Poller.POLLIN );
while( isAlive ){
try{
int pollCount = poller.poll( );
if( pollCount == NEGATIVE_ONE ){
LOGGER.warn("ERROR! Was the thread interrupted?", pollCount );
isAlive = false;
return;
}
if( poller.pollin( ZERO ) ){
String data = recvSocket.recvStr( );
LOGGER.info("RECVD >> {} {}", data, NEWLINE );
}
}catch( Exception e ){
LOGGER.error("Exception while receving message.", e);
}
}
}
public void stop( ){
recvSocket.setLinger( 0 );
LOGGER.info("{} Stopped!", NAME );
}
}
//MAIN
public static void main( String[ ] args ) throws InterruptedException{
int recvHwm = 5;
int sentHwm = 5;
int totalSent = 5000;
String address = "tcp://*:20000";
ZContext context = new ZContext( 1 );
ReceiverSocket recvr = new ReceiverSocket( context, address, recvHwm );
SenderSocket sender = new SenderSocket( context, address, sentHwm, totalSent );
recvr.init();
Thread.sleep( 1000 );
sender.init();
}
Router mandatory and high water mark have nothing to do with each other.
I have set the ZMQ_ROUTER_MANDATORY to 1 on the router but that doesn't help either. The router continues to report that the messages are sent even though I deliberatly didn't start the client
Router won't raise an exception even if no peers are connected to it unless you address the message for a specific client id.
//#1 no exception raised here, message dropped silently
rtr.setRouterMandatory(true)
rtr.bind("tcp://*:20000")
rtr.send("omg!")
//#2 exception raised here
rtr.setRouterMandatory(true)
rtr.bind("tcp://*:20000")
rtr.sendMore("client1")
rtr.sendMore("")
rtr.send("omg!")
Code sample #2 throws exception because you're telling router to send "omg" to a peer with identity client1
. Router sockets track all connections by assigning a random identity to each connected peer. If the router has no connection for client1
, or, if client1
previously disconnected, router will raise an exception in both cases.
You can assign an identity on the client to override the routers random identity assignment:
client.setIdentity("client1".getBytes())
client.connect("tcp://*:20000")
The above code stops the router socket from throwing an exception in sample #2
I suggest reading up on this, it explains message addressing and enveloping; understanding how it works is essential to using router sockets.