Search code examples
angulartypescriptrxjs5ngrx-storengrx-effects

Polling in rxjs and ngrx


Hey I am new to rxjs and ngrx, and I am building an app using these technologies. I am trying to think on how to create a Polling system using rxjs observables and operators.

I have created a basic Polling system which contains a map of subscriptions of the observables. each observable dispatch an action every 5 seconds to ngrx-effects which handle the action and perform the side effects like http call using a service.

My problem is that I want to create a specific mechanisim for current pooling system which has the following conditions:

1.The first pool happens right away,I am using timer(0,poolingTime) for this, or interval with pipe of stratwith(null).

2.The pool know to delay its next request according to the time of the previous request.I mean that when the previous request finished then the second request occur.

The first conidition I have acheived alone , the second condition(2) I need help in achieving this. I thoungth about debounce or throttle inorder to complete the second condition, but as I said in the first place I don't have alot of expriences with rxjs.

Here is the code of my simple pooling system

import { Injectable } from '@angular/core';
import { Observable } from 'rxjs/Observable';
import { Subscription } from 'rxjs/Subscription';
import { timer } from 'rxjs/observable/timer';
import { interval } from 'rxjs/observable/interval';
import { throttleTime, debounceTime, startWith, tap, delay } from 'rxjs/operators';
import { Utils } from '../utils';
@Injectable()
export class PoolingService {

  private subscriptions: { [id: string]: Subscription };

  constructor() {
    this.subscriptions = {};
  }

  public startPooling(time: number, callback: Function): string {
    const id = Utils.guid();
    const interval$ = interval(time).pipe(tap(tick => console.log("tick", tick))).pipe(startWith(null));
    // const interval$ = timer(0, time).pipe(tap(tick => console.log("tick", tick)));

    const subscription = interval$.subscribe(() => { callback() });
    this.subscriptions[id] = subscription;
    return id;
  }

  public stopPooling(id: string) {
    const subscription = this.subscriptions[id];
    if (!subscription) {
      return;
    }
    subscription.unsubscribe();
  }

}

Here is the use of the Poll Service:

ngOnInit() {

    this.store.select('domains').subscribe((state: any) => {
      const { list, lastAddedDomain } = state;
      this.markers = list;
      this.roots = Utils.list_to_tree(list);
    });

    this.poolService.startPooling(5000, () => {
      this.store.dispatch(new AllHttpActions.HttpActionGet({}, HttpMethods.GET, "/getDomainsForMap", AllDomainActions.FETCH_DOMAINS, Utils.guid()));
    });

  }

Solution

  • I would probably try out something like this. I added comments throughout the code which should help you understand why I did certain things.

    import { Injectable, OnDestroy } from '@angular/core';
    import { Subject } from 'rxjs/Subject';
    import { Observable } from 'rxjs/Observable';
    import { timer } from 'rxjs/observable/timer';
    import { interval } from 'rxjs/observable/interval';
    import { startWith, tap, mergeMap, take, takeUntil, filter, map, catchError, delay } from 'rxjs/operators';
    import { HttpClient } from '@angular/common/http';
    import { of } from 'rxjs/observable/of';
    import { Subscription } from 'rxjs/Subscription';
    
    @Injectable()
    export class PollingService implements OnDestroy {
    
        private destroyed$ = new Subject<any>();
    
        poll<PollResultType>(intervalTime: number, pollFunction: () => Observable<PollResultType>): Observable<any> {
            let isRequesting = false;
            return timer(0, intervalTime)
                .pipe(
                    // When the service is destroyed, all polls will be unsubscribed from
                    takeUntil(this.destroyed$)),
                    tap(tick => console.log('tick', tick))),
                    // Only continue if isRequesting is false
                    filter(() => !isRequesting)),
                    // Set isRequesting to true before requesting data
                    tap(() => isRequesting = true)),
                    // Execute your poll function
                    mergeMap(pollFunction)),
                    // Set isRequesting to false, so the next poll can come through
                    tap(() => isRequesting = false)
                );
        }
    
        ngOnDestroy() {
            // When the service gets destroyed, all existing polls will be destroyed as well
            this.destroyed$.next();
            this.destroyed$.complete();
        }
    }
    
    // In this example this is a service. But this could also be a component, directive etc.
    @Injectable()
    export class ConsumerService {
    
        private subscription: Subscription;
    
        private requester: Observable<any>;
    
        constructor(private polling: PollingService, private http: HttpClient) {
            // Instead of calling poll and subscribing directly we do not subscribe.
            // Like that we can have a requester where we can subscribe to activate
            // the polling. You might not need that.
            this.requester = this.polling.poll(
                500,
                // This is our polling function which should return another observable
                () => this.http.get('https://cors-test.appspot.com/test')
                    .pipe(
                        // Uncomment following line to add artificial delay for the request
                        // delay(2000),
                        // Don't forget to handle errors
                        catchError(error => {
                            return of('Failed');
                        })
                    )
            );
    
            // Let's activate our poll right away
            this.activate();
        }
    
        activate() {
            // Deactivate on activation to deactivate any subscriptions that are already running
            this.deactivate();
    
            // Subscribe to activate polling and do something with the result
            this.subscription = this.requester
                // This is for testing purposes. We don't want to overload the server ;)
                .pipe(take(10))
                .subscribe(res => console.log(res));
        }
    
        deactivate() {
            if (this.subscription) {
                this.subscription.unsubscribe();
                this.subscription = undefined;
            }
        }
    }
    

    Maybe some general things to note:

    • To run this code you need to do following things:
    • Copy the code into a ts file in your source.
    • Add the PollingService and the ConsumerService to your app modules provider.
    • Add the ConsumerService as dependency somewhere, so it gets executed.
    • I set the polling time to 500 ms for testing purposes.
    • In the constructor of the ConsumerService, there is a commented out line with a delay statement. If you uncomment that line, you can simulate what happens if a request takes longer to execute. You should see the effect in the console, as long as the delay is longer as the intervalTime
    • In the ConsumerService.activate method, I limited the polls to 10, in order to not annoy the server behind the test url I'm calling.
    • It might help understand better what is going on by adding tap(() => ...) with console log statements between the different steps.

    I hope this helps.