I have been trying to use Rxjs in a node app. fileList$
is the return from fs.readdirsync
(an array of strings).
The first map()
has a parameter called filename.
flatMap() readFileAsObservable()
uses bindNodeCallback(fs.readFile)
to read the file.
My class Testian
takes 2 args; The object created by yaml-js
from reading the file and the filename
from the first map. How can I access filename
in the pipe where I have indicated?
fileList$
.pipe(
map((filename: string) => `${resolvedDirPath}/${filename}`),
flatMap(
(filePath: string) => readFileAsObservable(filePath, 'utf8') as Observable<string>
),
map((fileData: string) => yaml.safeLoad(fileData) as ITestYaml),
map((testYaml: ITestYaml) => new Testian(testYaml, [I want to use filename here])),
flatMap((testYaml: Testian) => {
const prom: Promise<{}> = activeTests.set(testYaml);
outgoing.sendTest(testYaml);
return from(prom);
})
)
This is treated similarly in any API that involve chained functions, e.g. promises.
A temporary variable can be used to store a value that is out of the scope of a function that should access it. This is an easy but non-idiomatic workaround:
let filename;
fileList$.pipe(
map((_filename) => {
filename = _filename;
return `${resolvedDirPath}/${filename}`;
}),
flatMap((filePath) => readFileAsObservable(filePath, 'utf8')),
map((fileData) => yaml.safeLoad(fileData)),
map((testYaml) => new Testian(testYaml, filename)),
flatMap((testYaml) => {
const prom = activeTests.set(testYaml);
outgoing.sendTest(testYaml);
return from(prom);
})
)
There may be problems with race conditions, depending on particular observable.
A function that uses filename
can be nested to access the variable from parent scope:
fileList$.pipe(
flatMap((filename) => of(`${resolvedDirPath}/${filename}`).pipe(
flatMap((filePath) => readFileAsObservable(filePath, 'utf8')),
map((fileData) => yaml.safeLoad(fileData)),
map((testYaml) => new Testian(testYaml, filename)
),
flatMap((testYaml) => {
const prom = activeTests.set(testYaml);
outgoing.sendTest(testYaml);
return from(prom);
})
)
The variable can be passed altogether with other other results where possible:
fileList$.pipe(
map((filename) => [filename, `${resolvedDirPath}/${filename}`]),
flatMap(
([filename, filePath]) => forkJoin(filename, readFileAsObservable(filePath, 'utf8')),
),
map(([filename, fileData]) => [filename, yaml.safeLoad(fileData) as ITestYaml)],
map(([filename, testYaml]) => new Testian(testYaml, filename)),
flatMap((testYaml) => {
const prom = activeTests.set(testYaml);
outgoing.sendTest(testYaml);
return from(prom);
})
)
If a stream allows to switch to promises and async..await
, this can be done because the problem with function scopes doesn't exist in async
function.
fileList$.pipe(
flatMap(async (filename) => {
const filePath = `${resolvedDirPath}/${filename}`;
const fileData = await readFileAsObservable(filePath, 'utf8').toPromise();
let testYaml = yaml.safeLoad(fileData);
testYaml = new Testian(testYaml, filename);
const prom = activeTests.set(testYaml);
outgoing.sendTest(testYaml);
return prom;
})
)
Since this observable already uses flatMap
and promises, it could be safely written with promises alone. RxJS observables have use cases that aren't suitable for promises, but this isn't one of them.