Search code examples
angulartypescriptrecursionrxjsangular2-observables

RXJS expand never exit from recursive call


I'm created recursive call to listening GCP trigger logs after uploaded a file to firebase, which is basically working very well. My problem, my recursive expand function never exit. As you can see from the code, I have checked the values, more than once, seems everything is good, and I see the ENTERED message in console log too, but the recursive call never finish, without any error. What can I do, if i want to break or force break the recursive calls, if the condition is true?

public getLog(filePath: string): Observable<object[]> {
    try {
      ...
        return this.getLogChunk()
          .pipe(
            expand((data: any) => {

              if (!environment.production && data && data.entries && data.entries.length > 0) {
                console.groupCollapsed('GCP Service Info [getLog]');
                console.info('[fileName]', fileName);
                console.info('[some - Finished]', data.entries.some((x: any) => x.textPayload.includes('Finished')));
                console.info('[some - Filename]', data.entries.some((x: any) => x.textPayload.includes(fileName)));
                console.info('[some - Finished - Filename]', data.entries.some((x: any) => x.textPayload.includes('Finished') && x.textPayload.includes(fileName)));
                console.info('[filter - Filename]', data.entries.filter((x: any) => x.textPayload.includes(fileName)));
                console.groupEnd();
              }

              if (data &&
                data.entries &&
                data.entries.some((x: any) => x.textPayload.includes('Finished') && x.textPayload.includes(fileName))) {
                console.log('ENTERED!!!!!');
                return of({});
              }
              return this.getLogChunk().pipe(delay(2000));

            }),
            map(res => res),
            reduce((acc: object[], val: any) => acc.concat(val), new Array<object>())
          )
      }
      return new Observable<object[]>();
    } catch (e) {
      if (!environment.production) {
        console.groupCollapsed('GCP Service Error [getLog]');
        console.error('[error]', e);
        console.groupEnd();
      }
      throw new Error(e);
    }
  }

Where I call the getlog, (you can see I checked the snapshot change to fire the getlog only once, when upload finished, I debugged it, its ok) and therefore the [complete] ENTERED never fired, because the unlimited recursive loop in getlog.

...
const uploadTask: AngularFireUploadTask = this.storage.upload(filePath, fileToUpload);
...
uploadTask.snapshotChanges()
              .pipe(
                filter(task => {
                  if (task) {
                    return task.state === firebase.storage.TaskState.SUCCESS
                  }
                  return false;
                })
              )
              .subscribe(val => {
                this.gcpService.getLog(filePath)
                  .subscribe({
                    next: data => console.log('[next] ENTERED'),
                    complete: () => {
                      console.log('[complete] ENTERED');
                    }
                  });
              })
...


Solution

  • I see the problem lies in the return of({}) call - this also expands, since it provides a value. If you want the expansion to end, call return EMPTY (import from 'Rxjs') instead. EMTPY does not return a value and immediately completes, therefore expand will not continue listening.