Search code examples
angulartypescriptrxjsrxjs-observablesrxjs-pipeable-operators

RxJS Make API calls if the max batch size is reached or there is no new input for more than a specified time period using Subject,bufferSize,takeuntil


I am working on an angular application.

Instead of making an API call for every record, I want to make API calls for a batch of items after a certain time period when there are no new items being added to the queue(observable queue).

In my case if there are 'NO' new items being added to the batchChangesQueue observable for more than 5 seconds then I want to execute an API call with all the accumulated items in the batch.

However with the code I tried I always get a single item instead of an array of items. I tried various code samples which uses switchmap,concatmap,toArray, etc

switchMap(() => this.batchChangesQueue.pipe(toArray()))

concatMap(() => this.batchChangesQueue.pipe(toArray()))

also used timer(5000).pipe(take(1)) and takeUntil(timer(5000))

But nothing seems to be working, can any one suggest what is being done wrong in my code or if there is a simpler way to implement what I wanted to achieve?

My service class has the following code.

The changeListener$ is being called from a different component using .next()

@Injectable({
  providedIn: 'root'
})
export class EventPublisher {
  private changeListener$ = new Subject < any > ();
  private batchChangesQueue = new Subject < unknown > ();
  private bufferSize = 3;

  constructor() {
    this.processBatch();
    this.subscribeToDataChanges();
  }
  private subscribeToDataChanges() {
    this.changeListener$.subscribe(async(data: {
      name: string,
      changes: any
    }) => {
      await this.saveChanges(data.name, this.convertToString(data.changes));
    });
  }
  public async processBatch() {
    this.batchChangesQueue.pipe(
      debounceTime(5000),
      switchMap(() => this.batchChangesQueue.pipe(buffer(this.batchCellChangesQueue)))
    ).subscribe({
      next: batch => {
        console.log('Processing batch:', batch);
        // Perform API calls here
      },
      error: err => console.error('Error processing batch:', err),
    });
  }
  public async saveChanges(sheetName: string, cellChanges: any) {
    this.batchChangesQueue.next({
      name,
      cellChanges
    });
  }
}


Solution

  • I think you can simply use bufferTime operator to specify a time and a buffer limit.

    bufferTime(5000, null, 3), // emits array every 5000ms or when 3 items collected
    

    Implementation could look something like this:

      private changeListener$ = new Subject<{name: string, changes: any}>();
      private changesQueue = new Subject<{sheetName: string, cellChanges: any}>();
      private bufferSize = 3;
      private bufferInterval = 5000;
    
      private batchChangesQueue = this.changesQueue.pipe(
        bufferTime(this.bufferInterval, null, this.bufferSize),
        filter(batch => !!batch.length) // Ignore empty batches
      );
    
      public processBatch() {
        this.batchChangesQueue.subscribe({
          next: batch => { console.log('Processing batch:', batch); /* Perform API calls here */ },
          error:  err => console.error('Error processing batch:', err),
        });
      }
    

    Note: It looks like you could probably simplify down to use a single subject since you are subscribing to one only to emit the results into the other one.