Search code examples
angularrxjsreactivex

Periodic saving of changes using Reactive Extensions


I have an Angular app that has a very large mat-table. Most cells of this table are mat-select components (from Angular Material), and when a value changes in the mat-select, it needs to be saved, but according to the following rules:

  1. Save all changes that have been made since the last save every 7 seconds.
  2. If a row is considered complete, trigger the save at that moment, with all of the changes that have been made since the last save.

My plan of attack is like this:

  1. On any selectionChange of any mat-select, emit that whole data object onto a Subject.
  2. Subscribe to that Subject, running it through the following pipe:
    • Buffer the observable stream, according to the conditions above
    • Filter out empty arrays (nothing was emitted in the buffer window)
    • Post the whole array of changes to the server. The server knows what to do with it.
  3. The buffer condition is itself an Observable, which triggers the buffer whenever it emits anything.
    • I want it to emit when either the 7 seconds has passed, or when a data row has been completed. So I use combineLatest to combine two Observables, an interval(7000), and the Subject of data to save, filtered to take only the data that's complete; and only take the last emitted by either.

Here's what I've done (simplified):

my.component.html

  <mat-table>
  <ng-container matColumnDef='someData'>
    <mat-header-cell *matHeaderCellDef>Some Data</mat-header-cell>
    <mat-cell *matCellDef='let row'>
      <mat-form-field>
        <mat-select [(ngModel)]='row.someData' (selectionChange)='dataToSave.next(row)'>
          <mat-option [value]='3'>High</mat-option>
          <mat-option [value]='2'>Medium</mat-option>
          <mat-option [value]='1'>Low</mat-option>
        </mat-select>
      </mat-form-field>
    </mat-cell>
  </ng-container>
  ...
</mat-table>

my.component.ts

import { Component, OnInit, AfterViewInit, OnDestroy } from '@angular/core';
import { DataService } from '../service/data.service';
import { Subject } from 'rxjs/Subject';
import { filter, switchMap, buffer, startWith } from 'rxjs/operators';
import { interval } from 'rxjs/observable/interval';
import { combineLatest } from 'rxjs/observable/combineLatest';

@Component({
  selector: 'app-data-grid',
  templateUrl: './data-grid.component.html',
  styleUrls: ['./data-grid.component.scss']
})
export class DataGridComponent implements OnInit, AfterViewInit, OnDestroy {
  dataToSave = new Subject<any>();

  // trigger the buffer either when 7 seconds has passed, or a row has been completed.
  bufferCondition = combineLatest(
    interval(7000).pipe(
      startWith(0)
    ),
    this.dataToSave.pipe(
      startWith({}),
      filter(row => this.checkAllFieldsCompleted(row))
    )
  );

  constructor(private dataService: DataService) { }

  ngAfterViewInit() {
    this.dataToSave.pipe(
      buffer(this.bufferCondition),
      filter(dataArray => data && dataArray.length > 0),
      switchMap(dataArray => this.dataService.saveData(dataArray))
    ).subscribe(dataSaved => { 
      /* success */ 
    }, err => { 
      /* error */
    });
  }

  checkAllFieldsCompleted(row: any): boolean {
    return /* logic to check if complete */
  }
    ...
}

I thought that this would be the perfect use of reactive extensions! But I can't quite seem to get this to work. If I use the bufferTime operator alone, everything works fine. So the error, I believe, is in how I defined my custom buffer condition.


Solution

  • After some digging and a lot of trial and error, I have my solution.

    From this website I learned that

    Be aware that combineLatest will not emit an initial value until each observable emits at least one value.

    In my buffer condition, I'm trying to combine two observables: the 7 second interval, and when a row gets completed. I noticed that the expected behavior was starting only after I had completed a row for the first time - only then would it start saving every 7 seconds. The problem was the filter operator on the dataToSave observable. combineLatest wasn't emitting anything until both observables emitted something.

    I changed this code:

    bufferCondition = combineLatest(
      interval(7000).pipe(
        startWith(0)
      ),
      this.dataToSave.pipe(
        startWith({}),
        filter(row => this.checkAllFieldsCompleted(row))
      )
    );
    

    to this:

    bufferCondition = combineLatest(
      interval(7000),
      this.dataToSave.pipe(
        filter((row, index) => index === 0 || this.checkAllFieldsCompleted(row))
      )
    );
    

    and it worked perfectly! Basically, I didn't need the startWith operator, and on the filter, I had that observable emit the very first change to the data, but no other changes until a row was complete.