Search code examples
javamessage-queuezeromqjeromq

ZeroMQ producing meager results


I am testing out ZeroMQ and I am only getting around 1227 - 1276 messages per second. I have read however that these are supposed to be over 100x this amount.

What am I doing wrong? Is there some configuration I can specify to fix this?

I am using the following functionality:

public static final String SERVER_LOCATION = "127.0.0.1";
public static final int SERVER_BIND_PORT = 5570;

public static void receiveMessages() throws InvalidProtocolBufferException, FileNotFoundException, UnsupportedEncodingException{
    ZContext ctx = new ZContext();

    Socket frontend = ctx.createSocket(ZMQ.PULL);
    frontend.bind("tcp://*:"+SERVER_BIND_PORT);

    int i = 1;
    do{
        ZMsg msg = ZMsg.recvMsg(frontend);
        ZFrame content = msg.pop();
        if(content!= null){
            msg.destroy();
            System.out.println("Received: "+i);
            i++;
            content.destroy();
        }
    }while(true);
}

public static void sendMessages() throws FileNotFoundException, UnsupportedEncodingException{
    ZContext ctx = new ZContext();
    Socket client = ctx.createSocket(ZMQ.PUSH);

    client.setIdentity("i".getBytes());
    client.connect("tcp://"+SERVER_LOCATION+":"+SERVER_BIND_PORT);

    PollItem[] items = new PollItem[] { new PollItem(client, Poller.POLLIN) };
    int i = 1;
    Timer t = new Timer(timeToSpendSending);
    t.start();
    do{
        client.send(/* object to send*/ , 0);
        i++;
    }while(!t.isDone());

    System.out.println("Done with "+i);
}

Timer class used to limit time the program runs for:

class Timer extends Thread{
    int time;
    boolean done;
    public Timer(int time){
        this.time = time;
        done = false;
    }
    public void run(){
        try {
            this.sleep(time);
            done = true;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public boolean isDone(){
        return done;
    }
}

Edit: I am using jeroMQ

<dependency>
    <groupId>org.zeromq</groupId>
    <artifactId>jeromq</artifactId>
    <version>0.3.4</version>
</dependency>

Solution

  • I had to replace the connect method and removed the High Water Mark (set to 0 for unlimited messages in memory)

    The code would be as follows:

    public static final String SERVER_LOCATION = "127.0.0.1";
    public static final int SERVER_BIND_PORT = 5570;
    public static final String TOPIC = "topic1";
    
    public static void receiveMessages() throws InvalidProtocolBufferException, FileNotFoundException, UnsupportedEncodingException{
        // Prepare our context and subscribe
           Context context = ZMQ.context(1);
           Socket subscriber = context.socket(ZMQ.SUB);
    
           subscriber.connect("tcp://"+SERVER_LOCATION+":"+SERVER_BIND_PORT);
           subscriber.setRcvHWM(0);
           subscriber.subscribe(TOPIC.getBytes());
           System.out.println("subscribed to  "+TOPIC);
           int i = 1;
           boolean started = false;
           Timer t = new Timer(timeToSpendSending);
           do{
               String msg = subscriber.recvStr();
               if(!TOPIC.equals(msg)){
                   if(!started){
                       t.start();
                       started = true;
                   }
                   i++;
               }
           }while(!t.isDone());
           System.out.println("Done with: "+i);
           subscriber.close();
           context.term();
       }
       public static void sendMessages() throws FileNotFoundException, UnsupportedEncodingException{
           Context context = ZMQ.context(1);
           Socket publisher = context.socket(ZMQ.PUSH);
           publisher.bind("tcp://"+SERVER_LOCATION+":"+SERVER_BIND_PORT);
           publisher.setHWM(0);
           publisher.setSndHWM(0);
    
           int i = 1;
           Timer t = new Timer(timeToSpendSending);
           t.start();
           do{
              publisher.sendMore(TOPIC);
              publisher.send("Test Data number "+i);
              i++;
          }while(!t.isDone());
          System.out.println("Done with: "+i);
          publisher.close();
          context.term();
       }
    

    Like this I got message counts ranging in the 250,000 per second when sending and 145,000 per second when receiving.