Search code examples
node.jsrxjs6

Keep access to variables between pipe operators


I have been trying to use Rxjs in a node app. fileList$ is the return from fs.readdirsync (an array of strings).

The first map() has a parameter called filename.

flatMap() readFileAsObservable() uses bindNodeCallback(fs.readFile) to read the file.

My class Testian takes 2 args; The object created by yaml-js from reading the file and the filename from the first map. How can I access filename in the pipe where I have indicated?

fileList$
    .pipe(
        map((filename: string) => `${resolvedDirPath}/${filename}`),
        flatMap(
            (filePath: string) => readFileAsObservable(filePath, 'utf8') as Observable<string>
        ),
        map((fileData: string) => yaml.safeLoad(fileData) as ITestYaml),
        map((testYaml: ITestYaml) => new Testian(testYaml, [I want to use filename here])),
        flatMap((testYaml: Testian) => {
            const prom: Promise<{}> = activeTests.set(testYaml);
            outgoing.sendTest(testYaml);
            return from(prom);
        })
    )

Solution

  • This is treated similarly in any API that involve chained functions, e.g. promises.

    Temporary variable

    A temporary variable can be used to store a value that is out of the scope of a function that should access it. This is an easy but non-idiomatic workaround:

    let filename;
    
    fileList$.pipe(
        map((_filename) => {
          filename = _filename;
          return `${resolvedDirPath}/${filename}`;
        }),
        flatMap((filePath) => readFileAsObservable(filePath, 'utf8')),
        map((fileData) => yaml.safeLoad(fileData)),
        map((testYaml) => new Testian(testYaml, filename)),
        flatMap((testYaml) => {
            const prom = activeTests.set(testYaml);
            outgoing.sendTest(testYaml);
            return from(prom);
        })
    )
    

    There may be problems with race conditions, depending on particular observable.

    Nested function

    A function that uses filename can be nested to access the variable from parent scope:

    fileList$.pipe(
        flatMap((filename) => of(`${resolvedDirPath}/${filename}`).pipe(
            flatMap((filePath) => readFileAsObservable(filePath, 'utf8')),
            map((fileData) => yaml.safeLoad(fileData)),
            map((testYaml) => new Testian(testYaml, filename)
        ),
        flatMap((testYaml) => {
            const prom = activeTests.set(testYaml);
            outgoing.sendTest(testYaml);
            return from(prom);
        })
    )
    

    Pass-through value

    The variable can be passed altogether with other other results where possible:

    fileList$.pipe(
        map((filename) => [filename, `${resolvedDirPath}/${filename}`]),
        flatMap(
            ([filename, filePath]) => forkJoin(filename, readFileAsObservable(filePath, 'utf8')),
        ),
        map(([filename, fileData]) => [filename, yaml.safeLoad(fileData) as ITestYaml)],
        map(([filename, testYaml]) => new Testian(testYaml, filename)),
        flatMap((testYaml) => {
            const prom = activeTests.set(testYaml);
            outgoing.sendTest(testYaml);
            return from(prom);
        })
    )
    

    async..await

    If a stream allows to switch to promises and async..await, this can be done because the problem with function scopes doesn't exist in async function.

    fileList$.pipe(
        flatMap(async (filename) => {
          const filePath = `${resolvedDirPath}/${filename}`;
          const fileData = await readFileAsObservable(filePath, 'utf8').toPromise();
          let testYaml = yaml.safeLoad(fileData);
          testYaml = new Testian(testYaml, filename);
          const prom = activeTests.set(testYaml);
          outgoing.sendTest(testYaml);
          return prom;
        })
    )
    

    Since this observable already uses flatMap and promises, it could be safely written with promises alone. RxJS observables have use cases that aren't suitable for promises, but this isn't one of them.