Search code examples

Can a shared supply run multiple tap blocks simultaneously?

Consider this code where a tap takes awhile to complete. All the blocks are running simultaneously (immediately outputting) then sleeping. Most don't finish because the program ends sooner then they do:

my $supply = Supply.interval(0.2);
my $tap = $supply.tap: { say "1 $^a"; sleep 5;  };
sleep 5;

The output (elided) has 25 lines (one for each tick of 0.2 in 5 seconds):

1. 0
1. 1
1. 24

Then I change that supply to .share:

my $supply = Supply.interval(0.2).share;
my $tap = $supply.tap: { say "1. $^a"; sleep 5 };
sleep 5;

I only see one line of input but I expected the same output:

1. 1

The .share makes it possible for multiple taps to get the same values.

my $supply = Supply.interval(0.2).share;
my $tap  = $supply.tap: { say "1. $^a"; sleep 5 };
my $tap2 = $supply.tap: { say "2. $^a";  };
sleep 5;

Still the output has output only for the first tap and still has only one line. I expected 25 lines for each:

1. 1


  • The basic rules for Supply are:

    1. No introduction of concurrency without it being explicitly asked for
    2. Back-pressure through a sender-pays model
    3. A message is processed in full before the next one (so .map({ ...something with state... }) can be trusted not to cause conflicts over the state)

    Rule 3 doesn't really apply to share since there's separate downstream operation chains after that point, but rules 1 and 2 do. The purpose of share is to allow publish/subscribe, and also to provide for re-use of a chunk of processing by multiple downstream message processors. Introducing parallel message processing is a separate concern from this.

    The are various options. One is to have the messages for parallel processing stuck into a Channel. This explicitly introduces a place for the messages to be buffered (well, until you run out of memory...which is exactly why Supply comes with a sender-pays back-pressure model). Coercing a Channel back into a Supply gets the values pulled from the Channel and emitted on that Supply on a pool thread. That way looks like:

    my $supply = Supply.interval(0.2).share;
    my $tap  = $supply.Channel.Supply.tap: { say "1. $^a"; sleep 5 };
    my $tap2 = $supply.tap: { say "2. $^a";  };
    sleep 5;

    Note that since whenever automatically coerces the thing it's asked to react to to a Supply, then that'd look like whenever $supply.Channel { }, which makes it a pretty short solution - but at the same time nicely explicit in that it indicates how the normal back-pressure mechanism is being side-stepped. The other property of this solution is that it retains the order of the messages and still gives one-at-a-time processing downstream of the Channel.

    The alternative is to react to each message by instead starting some asynchronous piece of work to handle it. The start operation on a Supply schedules the block it is passed to run on the thread pool for each message that is received, thus not blocking the arrival of the next message. The result is a Supply of Supply. This forces one to tap each inner Supply to actually make anything happen, which seems slightly counter-intuitive at first, but actually is for the good of the programmer: it makes it clear there's an extra bit of async work to keep track of. I very strongly suggest using this in combination with the react/whenever syntax, which does subscription management and error propagation automatically. The most direct transformation of the code in the question is:

    my $supply = Supply.interval(0.2).share;
    my $tap  = supply { whenever $supply.start({ say "1. $^a"; sleep 5 }) { whenever $_ {} } }.tap;
    my $tap2 = $supply.tap: { say "2. $^a";  };
    sleep 5;

    Although it's also possible to instead write it as:

    my $supply = Supply.interval(0.2).share;
    my $tap  = supply { whenever $supply -> $a { whenever start { say "1. $a"; sleep 5 } {} } }.tap;
    my $tap2 = $supply.tap: { say "2. $^a";  };
    sleep 5;

    Which points to the possibility writing a parallelize Supply combinator:

    my $supply = Supply.interval(0.2).share;
    my $tap  = parallelize($supply, { say "1. $^a"; sleep 5 }).tap;
    my $tap2 = $supply.tap: { say "2. $^a";  };
    sleep 5;
    sub parallelize(Supply $messages, &operation) {
        supply {
            whenever $messages -> $value {
                whenever start operation($value) {
                    emit $_;

    The output of this approach is rather different from the Channel one, since the operations are all kicked off as soon as the message comes in. Also it doesn't retain message order. There's still an implicit queue (unlike the explicit one with the Channel approach), it's just that now it's the thread pool scheduler's work queue and the OS scheduler that has to keep track of the in-progress work. And again, there's no back-pressure, but notice that it would be entirely possible to implement that by keeping track of outstanding Promises and blocking further incoming messages with an await Promise.anyof(@outstanding).

    Finally, I'll note that there is some consideration of hyper whenever and race whenever constructs to provide some language-level mechanism for dealing with parallel processing of Supply messages. However the semantics of such, and how they play into the supply-block design goals and safety properties, represent significant design challenges.