Search code examples
perlrabbitmqstomp

Subscribers connecting to a STOMP queue don't get added to round-robin distribution


I am using perl's Net::Stomp on top of RabbitMQ to rig up a very simple cross-platform distributed work queue. The idea is to submit a bunch of jobs to a single queue name and have a number of workers pull off jobs and execute them.

If I start up "workers" and submit the frames/jobs to the queue, I get exactly what I expect: the workers pop the frames off the queue in a round-robin fashion.

If I submit jobs to the persistent queue and then start up a number of workers, only the first pulls jobs off the queue.

# submit.pl
use Net::Stomp;
use JSON::PP;

my $stomp = Net::Stomp->new( { hostname => 'localhost', port => '61613' } );

$stomp->connect( { login => 'a', passcode => 'a' } ) or die;

for (my $i=0; $i < 20; $i++) {
    my $package = encode_json( { seed => $i } );
    $stomp->send( { destination => '/queue/runs', body => $package } );
}

$stomp->disconnect;

# worker.pl
my $stomp = Net::Stomp->new( { hostname => 'localhost', port => '61613' } );
$stomp->connect( { login => 'a', passcode => 'a' } );
$stomp->subscribe({
        destination             => '/queue/runs',
        ack                     => 'client',
        'activemq.prefetchSize' => 1 });


while (my $frame = $stomp->receive_frame) {
    next unless defined $frame;
    $stomp->ack( { frame => $frame } );
    next unless $frame->body;
    my $spec = decode_json $frame->body;
    say $$ . " " . $spec->{seed};
    sleep 1;
}

Running this:

$ perl submit.pl && perl worker.pl & perl worker.pl

What I would expect to see is something like:

30026 0
30024 1
30026 2
30024 3
...

Instead I see only the first "worker" pulling frames off the queue. If I kill the first "worker" process, the second then begins pulling frames.

30024 0
30024 1
30024 2
30024 3
...

I would like it to be the case that immediately as workers subscribe to the queue they begin pulling frames in a round-robin fashion. I would rather not have to write explicit stuff to handle this. I assume there is some mechanism in the protocol already that I am overlooking, or perhaps a bug in Net::Stomp?

As I said, the round-robin dispatch works perfectly if the workers are running before submit.pl is run.


Solution

  • On Rabbitmq's STOMP plugin the activemq.prefetchSize key shown in the Net::Stomp documentation does nothing. All I had to do was change it to 'prefetch-count' => 1 and then it all worked as I expected.