Search code examples
reactjsreduxrxjsredux-observablebackpressure

Backpressure implementation with redux-observable


I'm trying to implement backpressure logic in my react application. I find a nice post about this here and trying to add this to my app. Now i have some code:

// epic.ts
import { ofType } from 'redux-observable';
import { mapTo, tap, delay, switchMap } from 'rxjs/operators';
import { createIteratorSubject } from './createIteratorSubject';

// fake generator which in real application is supposed to pull data from a server
function* generator() {
  yield 1;
  yield 2;
  yield Promise.resolve(3);
  yield Promise.resolve(4);
  yield 5;
}

const iterator$ = createIteratorSubject(generator());

export function epic(action$: any): any {
  return action$.pipe(
    ofType('TAKE'),
    switchMap(() => {
      return iterator$
        .pipe(
          tap((value) => console.info('INCOMING VALUE', value)),
          delay(1000), // Some hard calculations here
          tap((value) => console.info('DONE PROCESSING VALUE', value))
        )
        .subscribe({
          next: iterator$.push,
          complete: () => {
            console.info('DONE PROCESSING ALL VALUES');
          },
        });
    }),
    mapTo((value: number) => {
      return { type: 'PUT', payload: value };
    })
  );
}
// createIteratorSubject.ts
import { BehaviorSubject } from 'rxjs';

export function createIteratorSubject(iterator: any) {
  const iterator$ = new BehaviorSubject();

  const pushNextValue = async ({ done, value }: any) => {
    if (done && value === undefined) {
      iterator$.complete();
    } else {
      iterator$.next(await value);
    }
  };

  iterator$.push = (value: any) => {
    return pushNextValue(iterator.next(value));
  };

  iterator$.push();

  return iterator$;
}

The problem i'm faced with is i don't know how to dispatch result value to redux. And now i have following error.

redux error


Solution

  • You're returning a Subscription in your switchMap where an ObservableInput is expected. You can change your code like this to make it work:

    export function epic(action$: any): any {
      return action$.pipe(
        ofType('TAKE'),
        switchMap(() => {
          return iterator$
            .pipe(
              tap((value) => console.info('INCOMING VALUE', value)),
              delay(1000), // Some hard calculations here
              tap((value) => console.info('DONE PROCESSING VALUE', value)),
              tap({
                next: iterator$.push,
                complete: () => {
                  console.info('DONE PROCESSING ALL VALUES');
                }
              })
            );
        }),
        map((value: number) => {
          return { type: 'PUT', payload: value };
        })
      );
    }