Search code examples
rxjsrxjs6

RxJS 6 expand not infinite?


I've been playing around with RxJS to explore the edges and came across something unexpected.

Consider the following code:

of(1)
.pipe(expand(x => of(x + 1)))
.subscribe(
    x => console.log(x), 
    err => console.log(err), 
    () => console.log('complete'));

When run in node, I expected this to produce an infinite list of numbers, each increasing by 1 that would eventually error out when it exceeded the max number in JavaScript, or I would kill it with Ctrl-C (on Windows). What I didn't expect was that it would stop without an error after less than 1000 numbers had been logged.

What I got was the numbers 1 - 783 written to the console.

Then it stopped.

Every time.

I run it. It stops at 783 (which is not a special number - 256, 1024, etc). It doesn't log an error or 'complete'. It just stops.

I would have expected it to run 'forever', throw an error, or at least log out 'complete'. But it just stops and returns me to a command line.

I did some digging and thought that perhaps it had something to do with the optional concurrency parameter, which defaults to Number.POSITIVE_INFINITY according to the docs for expand.

However, if I explicitly specify values for this parameter, I get equally weird results - i.e. a concurrency of 1 results in more values (consistently ~860) than a concurrency value of 1,000,000 (back to 783), more than 100 (~850), and more than 200 (~840). It seems like the most values are emitted with a concurrency of 1, and then it slowly meanders down to a low of 783 values as you increase the concurrency. That said, I have no idea if it is actually the concurrency that is affecting this, though it does seem related.

I took a quick peek at the source code for expand and didn't see anything that would explain this.

This is not code that I would run in production, but it has piqued my curiosity. Does anyone have any ideas for what could explain this?

  • RxJS 6.3.2

  • Node 8.9.4

Thanks,

TTE


Solution

  • This is expected behavior. I'm not sure why you don't get any error message from node.js, but it's actually like below:

    RangeError: Maximum call stack size exceeded
    at SafeSubscriber.__tryOrUnsub (/Users/ojkwon/github/goj/node_modules/rxjs/internal/Subscriber.js:212:18)
    ...
    

    In short, it's hitting stack overflow.

    when you invoke expand, you are returning observable created via of, and it recurses via expand operator. Rx v5 and above it's scheduling behavior is not scheduling anything by default - means, you created synchronous observable and it creates synchronous stack over again, eventually blows up.

    You could make infinite expansion via either using asynchronous source, or given value to be scheduled asynchronously explicitly: i.e if you schedule returned observable via expand in async scheduler

    const { of, asyncScheduler } = require('rxjs');
    const { expand } = require('rxjs/operators');
    
    of(1)
      .pipe(expand(x => of(x + 1, asyncScheduler)))
      .subscribe(
        x => console.log(x),
        err => console.log(err),
        () => console.log('complete'));
    

    then you can observe operator won't stop.