Search code examples
javascriptrxjsrxjs5reactivexredux-observable

Why complete is not invoked after performing a concatenation?


I need to query a device multiple times. Every query needs to be asynchronous and the device doesn't support simultaneous queries at a time. Moreover, once it is queried, it can not be queried again immediately after. It needs at least a 1 second pause to work properly.

My two queries, performed by saveClock() and saveConfig(), return a Promise and both resolve by returning undefined as expected.

In the following code why removing take() prevents toArray() from being called?
What's happening here, is there a better way to achieve the same behavior?

export const saveEpic = (action$, store) =>
  action$.ofType(SAVE)
    .map(action => {
      // access store and create object data
      // ...
      return data;
    })
    .mergeMap(data =>
      Rx.Observable.from([
        Rx.Observable.of(data).mergeMap(data => saveClock(data.id, data.clock)),
        Rx.Observable.timer(1000),
        Rx.Observable.of(data).mergeMap(data => saveConfig(data.id, data.config)),
        Rx.Observable.of(data.id)
     ])
    )
    .concatAll()
    .take(4)
    .toArray()
    // [undefined, 0, undefined, "id"]
    .map(x => { type: COMPLETED, id: x[3] });

Solution

  • There are a couple things I see:

    Your final .map() is missing parenthesis, which in its current form is a syntax error but a subtle change could make it accidentally a labeled statement instead of returning an object. Because in its current form it's a syntax error, I imagine this is just a bug in this post, not in your code (which wouldn't even run), but double check!

    // before
    .map(x => { type: COMPLETED, id: x[3] });
    
    // after
    .map(x => ({ type: COMPLETED, id: x[3] }));
    

    With that fixed, the example does run with a simple redux-observable test case: http://jsbin.com/hunale/edit?js,output So if there's nothing notable I did differently than you, problem appears to be in code not provided. Feel free to add additional insight or even better, reproduce it in a JSBin/git repo for us.


    One thing you didn't mention but is very very noteworthy is that in redux-observable, your epics will typically be long-lived "process managers". This epic will actually only process one of these saves, then complete(), which is probably not what you actually want? Can the user only save something one time per application boot? Seems unlikely.

    Instead, you'll want to keep the top-level stream your epic returns alive and listening for future actions by encapsulating this logic inside the mergeMap. The take(4) and passing the data.id then become extraneous:

    const saveEpic = (action$, store) =>
      action$.ofType(SAVE)
        .mergeMap(data =>
          Rx.Observable.from([
            Rx.Observable.of(data).mergeMap(data => saveClock(data.id, data.clock)),
            Rx.Observable.timer(1000),
            Rx.Observable.of(data).mergeMap(data => saveConfig(data.id, data.config))
          ])
          .concatAll()
          .toArray()
          .map(() => ({ type: COMPLETED, id: data.id }))
        );
    

    This separation of streams is described by Ben Lesh in his recent AngularConnect talks, in the context of errors but it's still applicable: https://youtu.be/3LKMwkuK0ZE?t=20m (don't worry, this isn't Angular specific!)

    Next, I wanted to share some unsolicited refactoring advice that may make your life easier, but certainly this is opinionated so feel free to ignore:

    I would refactor to more accurately reflect the order of events visually, and reduce the complexity:

    const saveEpic = (action$, store) =>
      action$.ofType(SAVE)
        .mergeMap(data =>
          Rx.Observable.from(saveClock(data.id, data.clock))
            .delay(1000)
            .mergeMap(() => saveConfig(data.id, data.config))
            .map(() => ({ type: COMPLETED, id: data.id }))
        );
    

    Here we're consuming the Promise returned by saveClock, delaying it's output for 1000ms, the mergeMapping the result to a call to saveConfig() which also returns a Promise that will be consumed. Then finally mapping the result of that to our COMPLETE action.

    Finally, keep in mind that if your Epic does stay alive and is long lived, there's nothing in this epic as-is to stop it from receiving multiple SAVE requests while other ones are still in-flight or have not yet exhausted the required 1000ms delay between requests. i.e. if that 1000ms space between any request is indeed required, your epic itself does not entirely prevent your UI code from breaking that. In that case, you may want to consider adding a more complex buffered backpressure mechanism, for example using the .zip() operator with a BehaviorSubject.

    http://jsbin.com/waqipol/edit?js,output

    const saveEpic = (action$, store) => {
      // used to control how many we want to take,
      // the rest will be buffered by .zip()
      const requestCount$ = new Rx.BehaviorSubject(1)
        .mergeMap(count => new Array(count));
    
      return action$.ofType(SAVE)
        .zip(requestCount$, action => action)
        .mergeMap(data =>
          Rx.Observable.from(saveClock(data.id, data.clock))
            .delay(1000)
            .mergeMap(() => saveConfig(data.id, data.config))
            .map(() => ({ type: COMPLETED, id: data.id }))
            // we're ready to take the next one, when available
            .do(() => requestCount$.next(1))
        );
    };
    

    This makes it so that requests to save that come in while we're still processing an existing one is buffered, and we only take one of them at a time. Keep in mind though that this is an unbounded buffer--meaning that the queue of pending actions can potentially grow infinitely quicker than the buffer is flushed. This is unavoidable unless you adopted a strategy for lossy backpressure, like dropping requests that overlap, etc.

    If you have other epics which have overlapping requirements to not sending requests more than once a second, you would need to create some sort of single supervisor that makes this guarantee for all the epics.

    This may all seem very complex, but perhaps ironically this is much easier to do in RxJS than with traditional imperative code. The hardest part is actually knowing the patterns.