I'm trying to implement backpressure logic in my react application. I find a nice post about this here and trying to add this to my app. Now i have some code:
// epic.ts
import { ofType } from 'redux-observable';
import { mapTo, tap, delay, switchMap } from 'rxjs/operators';
import { createIteratorSubject } from './createIteratorSubject';
// fake generator which in real application is supposed to pull data from a server
function* generator() {
yield 1;
yield 2;
yield Promise.resolve(3);
yield Promise.resolve(4);
yield 5;
}
const iterator$ = createIteratorSubject(generator());
export function epic(action$: any): any {
return action$.pipe(
ofType('TAKE'),
switchMap(() => {
return iterator$
.pipe(
tap((value) => console.info('INCOMING VALUE', value)),
delay(1000), // Some hard calculations here
tap((value) => console.info('DONE PROCESSING VALUE', value))
)
.subscribe({
next: iterator$.push,
complete: () => {
console.info('DONE PROCESSING ALL VALUES');
},
});
}),
mapTo((value: number) => {
return { type: 'PUT', payload: value };
})
);
}
// createIteratorSubject.ts
import { BehaviorSubject } from 'rxjs';
export function createIteratorSubject(iterator: any) {
const iterator$ = new BehaviorSubject();
const pushNextValue = async ({ done, value }: any) => {
if (done && value === undefined) {
iterator$.complete();
} else {
iterator$.next(await value);
}
};
iterator$.push = (value: any) => {
return pushNextValue(iterator.next(value));
};
iterator$.push();
return iterator$;
}
The problem i'm faced with is i don't know how to dispatch result value to redux. And now i have following error.
You're returning a Subscription in your switchMap
where an ObservableInput
is expected. You can change your code like this to make it work:
export function epic(action$: any): any {
return action$.pipe(
ofType('TAKE'),
switchMap(() => {
return iterator$
.pipe(
tap((value) => console.info('INCOMING VALUE', value)),
delay(1000), // Some hard calculations here
tap((value) => console.info('DONE PROCESSING VALUE', value)),
tap({
next: iterator$.push,
complete: () => {
console.info('DONE PROCESSING ALL VALUES');
}
})
);
}),
map((value: number) => {
return { type: 'PUT', payload: value };
})
);
}